Question

baparker on Mon, 19 Jun 2017 22:53:37


I am doing the following:

1. Making a call to a HTTP based Azure Function.

    a. Azure Function adds 200 queue items to a Azure Storage Queue.

2. Second Azure Function listens to Azure Storage Queue. For each queue item:

    a. Reads items from a file and creates 5000 distinct JSON items.

    b. Writes each individual item to Cosmos DB using CreateDocumentAsync.

 

I have set Cosmos DB to be scaled to 100,000 RU/s throughput. Metrics in Cosmos DB indicates well under throughput.

A total of 990,500 documents are attempted to be written to Cosmos DB from the Azure Function. Cosmos DB does not get all of them. It is random how many I do get on different executions.

I tried using an Azure Functions Document DB output. I get multiple "A task was canceled. " messages.

Adding an await keyword to CreateDocumentAsync causes the function to fail due to thread exhaustion which I expected after reading the "Tips for improving the performance and reliability of Azure Functions article. 

It appears either Azure Functions or Cosmos DB cannot handle this kind of volume. I have not found a way to bulk insert documents into Cosmos DB from Azure Functions.

Any advise?



Sponsored



Replies

Brett Samblanet on Tue, 20 Jun 2017 14:02:21


Can you share your function code? I can see if anything jumps out.

baparker on Tue, 20 Jun 2017 15:20:33


https://github.com/bparkerhsd/Functions

Brett Samblanet on Tue, 20 Jun 2017 15:26:34


You should use Tasks and async/await everywhere you can:

1. Your function should be 'public static async Task Run(...'

2. CreateDocuments should be 'private static async Task CreateDocuments(...'

3. You should call 'await CreateDocuments()'

4. You should await 'client.CreateDocumentsAsync()'

The current code isn't waiting on any of these calls, which means that you're scheduling all of the requests to fire off at once. If you're running on the consumption plan, this can be problematic because your total connection limit is restricted to 300. That may be what you're hitting.

Give those a try (and make everything async if you can) and let me know if it helps.

baparker on Tue, 20 Jun 2017 17:09:00


Thanks for the help! I am no longer losing records.

I am now struggling with what appears to be queue messages timing out and being resubmitted to the function which is causing duplicates. I have been using a P3, Disabled autoscale, 8 instance count App Service plan. Live application insight metrics show some of the functions running past 400,000 ms with most of the time all of the Servers running close to 400% CPU total.

More great advice appreciated!

Brett Samblanet on Tue, 20 Jun 2017 17:21:51


You're likely hitting a known bug (fix is in -- will be deployed with next release): https://github.com/Azure/azure-webjobs-sdk-script/issues/1600

Brett Samblanet -- Azure Functions


baparker on Tue, 20 Jun 2017 17:42:45


Where do I find when your team does another release?

When I had autoscale enabled I noticed even though I had 1000+ queue messages, it would only autoscale to 2 to 3 servers even though I indicated upwards to 10 to be provisioned. What is the expected behavior?

Brett Samblanet on Tue, 20 Jun 2017 18:16:53


This issue explains it (I'll update the related issues for this bug when the release is out): https://github.com/Azure/azure-webjobs-sdk-script/issues/1587#issuecomment-309767341

If you're using an App Service plan -- what metric did you have it scaling on? I believe App Service autoscaling limits itself to scale up slowly (I'm trying to find the docs) to prevent spikes in instance counts due to momentary spikes in traffic.

Consumption plans will scale automatically for you -- and monitor your queue length to decide whether scaling is required.

baparker on Tue, 20 Jun 2017 18:28:05


Thanks for all the info and help!

I would use a consumption plan but I have other functions that run longer than five minutes. It would be nice to use the consumption plan if we didn't have this limit. Also switching service plans requires redeploying to a new app service and then losing HTTP urls that are used elsewhere.

baparker on Thu, 22 Jun 2017 15:01:13


I have redeployed my Functions for testing using the Consumption plan. I found that the five minute timeout can now be changed to ten minutes in the host.json file. 

The issue I am seeing is periodic Task canceled exception messages when calling CreateDocumentAsync to CosmosDB. Ideas on the cause of this and how I might be able to resolve?

Brett Samblanet on Thu, 22 Jun 2017 16:42:56


Is your function running up to the 5 (or 10, if you adjusted it) minute limit when it fails? Or does this happen before this?

If you put an updated function script in your github repo above, I can take another look to see if I see anything that jumps out.

baparker on Thu, 22 Jun 2017 17:17:41


Start Time: 16:57:05

Error Time starts occurring at: 16:59:23

Around 10 servers have been provisioned at this point in time with request duration of each item under 150,000 ms.

1000 queue items put into storage queue.

1000 rows are to be read from Excel and written to Cosmos DB for each queue instance.

I put the code back into github. 

Brett Samblanet on Thu, 22 Jun 2017 19:27:41


I was able to find your backend logs and took a look, but it looks like you're catching any exceptions from CreateDocumentAsync so they don't bubble up to our backend logs (and your function reports 'success' rather than 'failure'). So I can't see the exact details of that exception.

However, I do see you getting throttled by what I *think* is excessive SSL negotiations. I'd recommend that you move the CosmosDBClient to be static -- that way you have a single client instance making all of your calls. That should be the most efficient way to handle it and hopefully that helps. 

Run that for a bit and see if things improve.

baparker on Fri, 23 Jun 2017 14:46:36


I made CosmosDBClient static and added a log message indicating when it is created. It appears the client was created for each "server" that was spun up to run the function. I still get the Task canceled error message. I put a StopWatch around the call to CreateDocumentAsync. It is like the call gets hung up. 

I put some sample exceptions into github including "Elapsed Time" that shows how long the call to CreateDocumentAsync was running before it returns the exception. There is also a capture.png that is a snip of metrics from Cosmos DB.


Brett Samblanet on Fri, 23 Jun 2017 15:03:06


Looking at the stack, I see a bunch of these: Microsoft.Azure.Documents.BackoffRetryUtility`1.<ExecuteRetry>d__1b.MoveNext()

That seems to indicate that the DocumentDB client is retrying internally and eventually giving up with a Task canceled message.

I would assume that you'd been getting 429s from their server to cause this, but I don't see any indication of that in your screenshot.

I'll try to get a DocumentClient expert to see if they can help diagnose what's happening.

Brett Samblanet on Fri, 23 Jun 2017 18:40:49


Would you be able to send me (email if you prefer -- brettsam at microsoft dot com) the details of your DocDB collection? What's the endpoint?

baparker on Fri, 07 Jul 2017 17:27:10


I am able to successfully write to Cosmos DB.

Here are my observations:

Total Queue Items: 991

Rows Processed per Queue Item: 1000

Total Rows to Process: 990594

Requires App Service Plan P3 Premium (Autoscale enabled: Min: 1 Max: 20 Default: 1, CPU > 80 add 1)

  • P3 Premium - All queue items processed - no retries. 990594 rows processed to Cosmos DB Documents.
  • P2 Premium - All queue items process - 58 queue retries occurred. 724381 rows process to Cosmos DB. (Note: this number varies from run to run). No exceptions found in Application Insights.
  • Consumption Plan  - After waiting several hours all queue items still did not process - multiple queue retries. 123823 rows process to Cosmos DB (Note: this number varies from run to run). The functions running on consumption plan did scale out but never scaled up. It appears to only run with 1 core per instance.

Cosmos DB has to be configured with a high enough Throughput (RU/s). If the throughput is not high enough all documents will be not processed. 

Using P3 Premium App Service Plan for Function 

  • 100000 RU/s - All documents are processed.
  • 75000 RU/s -  All documents are processed.
  • 50000 RU/s -  968260 documents processed (Note: this number varies from run to run).
  • 25000 RU/s -  973689 documents processed (Note: this number varies from run to run).
  • 10000 RU/s -  668990 documents processed (Note: this number varies from run to run).

Cost is a definite concern with Cosmos DB using high Throughput (100000 RU/s - 8.00/hr). Possible solution is to write code to scale throughput up and down when doing processing. This is definitely not a preference. 


A.Baig on Tue, 24 Oct 2017 12:54:00


facing similar kind of issue

i have a function triggered when an item is inserted in the queue. currently(for testing purpose) i am adding 100 messages to the queue using a console service. each function calls a bulk insert method that adds 5000 documents to a collection using a stored procedure but less than half documents gets inserted in the collection. the collection has been scaled to 100,000 RU/s throughput with storage capacity set to unlimited. below is the code for function and bulk insert stored procedure.

the stored procedure returns an error "Errors":["Request rate is large"] for most of the functions.

can't figure out the problem. kindly help.

  //function code
  public static void Run([QueueTrigger("bulkoperationfunction", Connection = "AzureWebJobsStorage")]string myQueueItem, TraceWriter log)
        {
            var start = DateTime.Now;
            log.Info($"C# Queue trigger function processed: {myQueueItem} - {start} - millisecond {DateTime.Now.Millisecond}");
            Guid Id = new Guid(myQueueItem);

            try
            {
                AddBulkData_Unlimited(Id).Wait();
            }
            catch (Exception ex)
            {
                log.Info(string.Format("exception in Bulk Operation {0}", ex));
            }
            log.Info($"End Time : {myQueueItem} - {DateTime.Now}");
            log.Info($"Total Time : {myQueueItem} - {DateTime.Now - start}");
        }

//AddBulkData_Unlimited 
public async Task AddBulkData_Unlimited(Guid Id)
        {

            try
            {
                EndpointUrl = (ConfigurationManager.AppSettings["CosmoDbEndPointURL"]);
                PrimaryKey = (ConfigurationManager.AppSettings["CosmoDbPrimaryKey"]);

                Tempdata Tempdata = new Tempdata();
                Tempdata.Id = Id;
                Tempdata.DateTimeUtc = DateTime.UtcNow;
                Tempdata.DateTime = DateTime.Now;

                List<BColumn> BColumns = new List<BColumn>();
                for (int i = 1; i <= 5000; i++)
                {
                    BColumns.Add(new BColumn()
                    {
                        BId = Id, 
                        FileColumnName = string.Format("Test_Column_{0}", i),
                        Id = Guid.NewGuid(),
                        OrderNumber = i
                    });
                }

                this.client = new DocumentClient(new Uri(EndpointUrl), PrimaryKey);
                Database database = await this.client.CreateDatabaseIfNotExistsAsync(new Database { Id = AzureFunctionConstants.cosmoDbName });

                DocumentCollection DocumentCollection = new DocumentCollection();
                DocumentCollection.Id = AzureFunctionConstants.BColumn_UL;
                DocumentCollection.PartitionKey.Paths.Add("/BId");
                DocumentCollection BColumnCollection = await this.client.CreateDocumentCollectionIfNotExistsAsync(UriFactory.CreateDatabaseUri(AzureFunctionConstants.cosmoDbName), DocumentCollection);

                //calling bulkImport sp
                StoredProcedure spForSheet = GetDocumentStoredProcedure(this.client, BColumnCollection, AzureFunctionConstants.BColumn_UL, AzureFunctionConstants.bulkImport).Result;
                var Sheetresponse = await this.client.ExecuteStoredProcedureAsync<dynamic>(spForSheet.SelfLink, new RequestOptions { PartitionKey = new PartitionKey(Id.ToString()) }, BColumns);

                DocumentCollection = new DocumentCollection();
                DocumentCollection.Id = "Tempdata";
                DocumentCollection.PartitionKey.Paths.Add("/BId");
                DocumentCollection Tempdatacol = await this.client.CreateDocumentCollectionIfNotExistsAsync(UriFactory.CreateDatabaseUri(AzureFunctionConstants.cosmoDbName), DocumentCollection);

                await this.client.CreateDocumentAsync(UriFactory.CreateDocumentCollectionUri(AzureFunctionConstants.cosmoDbName, "Tempdata"), Tempdata );
            }
            catch (Exception ex)
            {
                Console.WriteLine(ex.Message);
            }
        }

//Bulkimport Stored Procedure.

function bulkImport(docs) { var collection = getContext().getCollection(); var collectionLink = collection.getSelfLink(); // The count of imported docs, also used as current doc index. var count = 0; // Validate input. if (!docs) throw new Error("The array is undefined or null."); var docsLength = docs.length; if (docsLength == 0) { getContext().getResponse().setBody(0); } // Call the create API to create a document. tryCreate(docs[count], callback); // Note that there are 2 exit conditions: // 1) The createDocument request was not accepted. // In this case the callback will not be called, we just call setBody and we are done. // 2) The callback was called docs.length times. // In this case all documents were created and we don’t need to call tryCreate anymore. Just call setBody and we are done. function tryCreate(doc, callback) { var isAccepted = collection.createDocument(collectionLink, doc, callback); // If the request was accepted, callback will be called. // Otherwise report current count back to the client, // which will call the script again with remaining set of docs. if (!isAccepted) getContext().getResponse().setBody(count); } // This is called when collection.createDocument is done in order to process the result. function callback(err, doc, options) { if (err) throw err; // One more document has been inserted, increment the count. count++; if (count >= docsLength) { // If we created all documents, we are done. Just set the response. getContext().getResponse().setBody(count); } else { // Create next document. tryCreate(docs[count], callback); } } }



Brett Samblanet on Tue, 24 Oct 2017 16:12:03


Unrelated to the issue (I think) -- but make sure your function is returning an 'async Task' rather than 'void' and remove that .Wait() call and make it 'await'. That's not the underlying issue here, but is a best practice.

My guess is that you're hitting your RU limit from within the Stored Procedure. If you pull this code into a console application and run it on ~10 threads, do you see the same thing? If so, then we can remove Functions from the equation and try to figure out a better approach.

Some links I found when researching this:
- https://stackoverflow.com/questions/45632778/cosmosdb-documentdb-bulk-insert-without-saturating-collection-ru

https://stackoverflow.com/questions/41744582/fastest-way-to-insert-100-000-records-into-documentdb

A.Baig on Thu, 26 Oct 2017 16:48:11


i tested with 20 msgs in a queue of which 4 failed with the exception error message- Request Rate is large. so i called the stored procedure logic in a recursive function, which worked for me(all the documents got inserted in the collection). Is it an appropriate way?

Also need to know more about scaling of functions triggered by queue. i am checking for 3000 message in the queue at a same time, my functions do not scale, take almost more than 30-45 minutes to insert 10,000 records into the cosmos Db(as per my previous code.) which is not feasible. i also checked with options in host.j son file for the queues, but the 'batch-Size' allowed is max 32 and new 'new-Batch-threshold'  set to 100 also dint not work.

How many instance of a function( triggered with different types of trigger) can work at the same time? how do i make my functions scale out as per the number of messages in the queue.

need help for scaling my functions both in app service plan and consumption plan. Kindly help. Many Thanks.


baparker on Thu, 26 Oct 2017 17:10:44


A tip given to me from Brett while I was at Ignite, was to set batchSize to 1 in a consumption plan. Processing 1000 queue items resulted in 42 instances scaling out in the consumption plan.

For transient errors I added this code block that I lifted from another post:

public static class Retry
    {
        public static void Do(
            Action action,
            TimeSpan retryInterval,
            int maxAttemptCount = 3)
        {
            Do<object>(() =>
            {
                action();
                return null;
            }, retryInterval, maxAttemptCount);
        }

        public static T Do<T>(
            Func<T> action,
            TimeSpan retryInterval,
            int maxAttemptCount = 3)
        {
            var exceptions = new List<Exception>();

            for (int attempted = 0; attempted < maxAttemptCount; attempted++)
            {
                try
                {
                    if (attempted > 0)
                    {
                        Thread.Sleep(retryInterval);
                    }
                    return action();
                }
                catch (Exception ex)
                {
                    exceptions.Add(ex);
                }
            }
            throw new AggregateException(exceptions);
        }
    }

And then when you make a call that can possibly fail:

Retry.Do(() => dataLakeWriter.Write(dataLakeStoreAccount, dataLakeFilePath, dataLakeFileName, false), TimeSpan.FromSeconds(1));

 

A.Baig on Sat, 28 Oct 2017 10:31:26


hie, found a solution for Request Rate is large issue.

https://github.com/Azure/azure-documentdb-dotnet/issues/86