Saturday 28 February 2015

Ordered Parallel Processing in C#

In one of the projects I've been working on, we need process a bunch of files containing invoice data. Processing these can be time consuming, as the files can be quite large and although the usage given to this data seems to suggest that it can be done overnight, the business has insisted in processing the files during the online day, at 17:00.

The problem is that that the files tend to contain the invoice journey through the various states and for audit purposes we need to process them all.

So, for instance if the first record on a file is an on hold invoice, we want to process this, but we also want to process the same invoice showing as paid further down the file. We can't just process the paid event. Furthermore, we also want the invoice record to end with a status of paid, which is fairly reasonable.

The problem is that if we process the invoices in parallel, we have no guarantees that they will be processed in the right order, so a paid invoice record might end up with a state of issued, which is not great, so we just went for the quick and easy solution and thus processed the files serially.

I gave the matter a little bit more thought and came up with this:

private void UpdateInvoices(IEnumerable<IInvoice> invoices)
{
    var groupedInvoices = invoices.GroupBy(x => x.Status)
        .OrderBy(x => x.Key)
        .Select(y => y.Select(x => x));

    foreach (var invoiceGroup in groupedInvoices)
    {
        Parallel.ForEach(invoiceGroup, po, (invoice) =>
        {
           UpdateInvoice(invoice);
        });
    }
}
What we do is, we group all the invoices by status and order them by status. We then process all of the invoices in a status group in parallel, so that all invoices with status issued, get processed first, and paid last, a few more get processed in between.

It is, of course, possible to have multiple parallel for each loops for each status, but I feel that this solution is more elegant and easier to maintain.

PLinq does have an AsOrdered method, but the UpdateInvoice method doesn't return anything, if it fails to update the database, it simple logs it and it's for the server boys and girls to worry about.

Furthermore, it simply doesn't quite work as I might have expected it to work.

The code from this sample has been modified to better simulate what we're trying to achieve:

var source = Enumerable.Range(9, 50);

var parallelQuery = source.AsParallel().AsOrdered()
    .Where(x => x % 3 == 0)
    .Select(x => { System.Diagnostics.Debug.WriteLine("{0} ", x); return x; });

// Use foreach to preserve order at execution time. 
foreach (var v in parallelQuery)
{
    System.Diagnostics.Debug.WriteLine("Project");
    break;
}

// Some operators expect an ordered source sequence. 
var source = Enumerable.Range(9, 30);

var parallelQuery = source.AsParallel().AsOrdered()
    .Where(x => x % 3 == 0)
    .Select(x => { System.Diagnostics.Debug.WriteLine("{0} ", x); return x; });

// Use foreach to preserve order at execution time. 
foreach (var v in parallelQuery)
{
    System.Diagnostics.Debug.WriteLine("Project");
    break;
}

// Some operators expect an ordered source sequence. 
var lowValues = parallelQuery.Take(10);

int counter = 0;
foreach (var v in lowValues)
{
    System.Diagnostics.Debug.WriteLine("{0}-{1}", counter, v);
    counter++;
}
The call to Debug.WriteLine is the same as UpdateInvoice in the code above, in the sense that they are both void methods that cause side effects.

This is what the above prints:
9 15 18 12 30 33 36 21 24 27
Project
9 15 18 12 30 21 36 27 24 33 
0-9 1-12 2-15 3-18 4-21 5-24 6-27 7-30 8-33 9-36 


As you can see the end result is ordered but the getting there isn't, and the getting there is what we're interested in, which is why we could not use PLinq.


No comments:

Post a Comment