Question How to handle error in Parallel.ForEachAsync?

raysefo

Well-known member
Joined
Feb 22, 2019
Messages
361
Programming Experience
10+
Hello friends,
There is a 3rd party web API that returns game codes. A single request can return a max of 10 codes. I am calling an asp.net core web API of mine as follows in a worker. (The worker and my API are on the same server.) Basically, I'm sending requests to process 100 codes and return them.

C#:
var num = amount;
var firstNum = 50;
var secondNum = 50;

if (num < 100)
{
  firstNum = (num + 1) / 2;
  secondNum = num - firstNum;
}
var quantities = new List<int> { firstNum, secondNum};
                    var cts = new CancellationTokenSource();
                    ParallelOptions parallelOptions = new()
                    {
                        MaxDegreeOfParallelism = 2,
                        CancellationToken = cts.Token
                    };
                    try
                    {
                        await Parallel.ForEachAsync(quantities, parallelOptions, async (quantity, ct) =>
                        {
                            var content = new FormUrlEncodedContent(new[]
                            {
                                new KeyValuePair<string, string>("productCode", productCode),
                                new KeyValuePair<string, string>("quantity", quantity.ToString()),
                                new KeyValuePair<string, string>("clientTrxRef", bulkId.ToString())
                            });

                            using var response =
                                await httpClient.PostAsync(_configuration["Razer:Production"], content, ct);

                            if ((int)response.StatusCode == 200)
                            {
                                var coupon = await response.Content.ReadFromJsonAsync<Root>(cancellationToken: ct);

                                _logger.LogInformation("REFERENCE ID: {referenceId}", coupon.ReferenceId);

                                await UpdateData(id);
                            }
                            else
                            {
                                _logger.LogError("Purchase ServiceError: {statusCode}",
                                    (int)response.StatusCode);
                            }
                        });
                    }
                    catch (OperationCanceledException ex)
                    {
                        _logger.LogError("Operation canceled: {Message}",
                            ex.Message);
                    }

My web API is cut into small pieces (10) and calls this 3rd party web API.

Web API:
while (requestedAmount > 0)
{
      var gameRequest = _mapper.Map<RequestDto, GameRequest>(requestDto);

      var count = Math.Min(requestedAmount, 10);
      gameRequest.quantity = count;
...

I couldn't visualize the solution how to check if there is a problem with one of your requests from the 3rd party web API. How should I handle it in Parallel.ForEachAsync? I mean let's say I want 100 codes (quantity), I am sending the requests to the 3rd party service 10 by 10. How to deal with an error let's say in the 4th request in the while loop?

Thank you.
 
Summary:

I have written a ASP.NET Core Web API that retrieves game codes from another external API. The process works like this: the game code and the number of codes to be retrieved are sent in the request. However, there is a restriction in the external API I am retrieving the game codes from, which is that only 10 game codes can be retrieved in one request. This process is currently being done in various chain stores' cash registers. Only one game code purchase transaction is made from the cash register.

However, there can be customers who want to bulk retrieve thousands of game codes online. To achieve this, I added a new method to the API to enable multiple purchases by looping. The requests are sent to the external API in small pieces, with 10 being the requested amount of game codes. This process works without any problems because each successful request and response is recorded in the database. This process is carried out through the ASP.NET Core interface and has a limitation: if the user inputs the amount of game codes requested through the interface, it takes a long time to retrieve thousands of game codes as the maximum is 100 (to avoid time-out issues, etc.).

To improve this situation, I created a worker service that operates in the background. The user inputs the total request through the web interface, which is converted into 100s and recorded in the database. The worker service retrieves these requests one by one randomly, then sends the requests to the API I created and then to the external API. The new process in the worker service is as follows: when 100 game code requests are made, the maximum parallelism is 2 and they are sent in Parallel.ForEachAsync, divided into 50/50. The requests are processed in the manner described in 10s, as previously mentioned. My concern here is if 100 game codes are successfully sent and retrieved, I update the related record in the database. However, if an error occurs somewhere in the process of processing the external API in 10s, my API will return a 500 error. I'm not exactly sure whether the Parallel.ForEachAsync will continue processing the other requests or if the operation will be cancelled. I was unable to test this scenario. What logic would be appropriate to construct here? Especially for the update scenario. Is there a way to mock the service in order to get errors once in a while so that I can test the logic?
 
Is there a way to mock the service in order to get errors once in a while so that I can test the logic?

Yes there should be. I'm surprised that you had not set this up already. Does that mean you have not been doing unit testing?
 
If you are using the the newer versions of .NET, I read that the HTTPClient can now be mocked by using an interface instead of the class directly. That should make it easy to using almost any making Framework.

Alternatively, just wrap your REST API call, and put in logic to tell it when to fail (e.g every 5th call, or only on even minutes, etc.)

As for the error handling, I thought that MS covered that in their docs:
 
Thank you @Skydiver, I actually wrote a mock web API that returns an error and a success once in a while. I didn't figure out how to solve the update problem in my case.

If the response is a success, I am updating the status = 1 to the database saying that this job is completed. In this particular scenario, if both threads return success there is no problem. There will be 2 requests and responses in the database. (status = 1) If both threads are failed again no problem there won't be any requests and responses in the database which is fine, errors are logged. (status = 0, worker service will try it again after a random time interval) But what if one thread is a success and the other one is failed. I mocked my API so that it returns a success and a error. In this case, 1 response was received from the external API and saved in the database, but the update method in the worker service updated the record as status=1. Only 10 game codes have been received. Since the update in the Worker service is running, the relevant record in the database table has been updated as status = 1. 20 requested game codes were not met. How can I resolve this situation?


I'm thinking of sending to the external API 10 at a time instead of 20 by 20, but is there a better way?
 
Is it possible to await UpdateData(id) in my code block if I get an error from the 3rd party web service call in my case?


C#:
                   var num = 20;
                    var firstNum = 10;
                    var secondNum = 10;

                    if (num < 20)
                    {
                        firstNum = (num + 1) / 2;
                        secondNum = num - firstNum;
                    }

                    #endregion

                    var quantities = new List<int> { firstNum, secondNum };
                    var cts = new CancellationTokenSource();
                    ParallelOptions parallelOptions = new()
                    {
                        MaxDegreeOfParallelism = Convert.ToInt32(_configuration["MaxDegreeOfParallelism:Max"]), // it is 2
                        CancellationToken = cts.Token
                    };
                    try
                    {
                        await Parallel.ForEachAsync(quantities, parallelOptions, async (quantity, ct) =>
                        {
                            var content = new FormUrlEncodedContent(new[]
                            {
                                new KeyValuePair<string, string>("productCode", productCode),
                                new KeyValuePair<string, string>("quantity", quantity.ToString()),
                                new KeyValuePair<string, string>("clientTrxRef", bulkId.ToString())
                            });

                            try
                            {
                                using var response =
                                    await httpClient.PostAsync(_configuration["Razer:Production"], content, ct);

                                if ((int)response.StatusCode == 200)
                                {
                                    var coupon = await response.Content.ReadFromJsonAsync<Root>(cancellationToken: ct);

                                    _logger.LogInformation("REFERENCE ID: {referenceId}", coupon.ReferenceId);

                                    await UpdateData(id);
                                }
                                else
                                {
                                    _logger.LogError("Purchase ServiceError: {statusCode}",
                                        (int)response.StatusCode);
                                }
                            }
                            catch (HttpRequestException ex)
                            {
                                _logger.LogError("HTTP client exception: {Message}", ex.Message);
                            }
                            catch (JsonException ex)
                            {
                                _logger.LogError("JSON serialization exception: {Message}", ex.Message);
                            }
                            catch (Exception ex)
                            {
                                _logger.LogError("Unexpected exception: {Message}", ex.Message);
                            }
                        });
                    }
                    catch (OperationCanceledException ex)
                    {
                        _logger.LogError("Operation canceled: {Message}", ex.Message);
                    }
 
If the compiler lets you, then syntax wise yes. The question will be if it logically makes sense to do that in parallel. Will you run into lock contention or race conditions?
 
There is a poor design problem I think in my logic. The 20 codes are sent to the web service 10 codes by 10 in two concurrent parallel tasks (Max Degree Of Parallelism: 2) in order to retrieve codes faster from the 3rd party web service. I think, even if one of the parallel tasks fails UpdateData is being called which is not good. Can I somehow check if anyone parallel task is getting an error?
A short reminder; the user enters the total amount of code that he/she wants to purchase. This total amount of codes saved into a database by divided by 20. (If the total amount is 100, then there will be 5 rows each with 20 amounts.) Then the code I shared starts working and gets every row one by one with status = 0 and calls the 3rd party web service. My intention was to update status = 1 which means complete if the result of the parallel tasks is OK.
 
Back
Top Bottom