ParallelMap extension method that maintains order

Writing

In a previous blog post I created a ParallelMap method that allowed you to perform an operation on an IEnumerable just like a normal Map method but it used the ThreadPool class in order to run the operations in parallel. This worked well with one small issue that since the results were being generated in parallel the results could end up in a different order than they went in. This can be a problem for sure. I also promised that I would remedy this, and so here is me keeping that promise.

The first thing I thought was that the easiest way to do this would be to just pass my index as state to my closure along with the current item I was operating on. So I defined this struct:

private struct ParallelMapState<TArg>
  {
    private int index;
    private TArg item;
 
    public ParallelMapState(int Index, TArg item)
    {
      this.index = Index;
      this.item = item;
    }
 
    public int Index
    {
      get { return index; }
    }
 
    public TArg Item
    {
      get { return item; }        
    }
  }

This will allow me to pass an index and an item in as state to the ThreadPool. Next I modified the ParallelMap function to initialize the result to an array with the same size as the input IEnumerable (and yes, I know that calling the "Count" extension method on an IEnumerable can be slow and sometimes desctructive, but it was either that or change my extension method to operate on an ICollection which would break compatibility with my previous extension method). The new ParallelMap method looks like this:

public static IEnumerable<TResult> ParallelMap<TArg, TResult>(
    this IEnumerable<TArg> list,
    Func<TArg, TResult> func)
  {
 
    TResult[] result = new TResult[list.Count()];
    using (var resetEvent = new ManualResetEvent(false))
    {
      int count = result.Length;
      WaitCallback run =
          n =>
          {
            var state = (ParallelMapState<TArg>)n;
            TResult value = func(state.Item);
            lock (result)
            {              
              result[state.Index] = value;
            }
            if (Interlocked.Decrement(ref count) == 0)
            {
              resetEvent.Set();
            }
          };
 
      int i = 0;
      foreach (TArg item in list)
      {
        var state = new ParallelMapState<TArg>(i, item);
        ThreadPool.QueueUserWorkItem(run, state);
        i++;
      }
      resetEvent.WaitOne();
    }
    return result;
  }

Here you can see at the top we initialize a TResult array using "list.Count()". I then changed the cast inside of the lambda to a "ParallelMapState" and I am now directly assigning an item in the array using our new "state.Index" variable. Then at the bottom where we iterate through our list I have added a new counter variable and I use this and the current item to create a new ParallelMapState struct that I pass into the QueueUserWorkItem.

This ParallelMap extension method can be called identically to my previous ParallelMap method:

  IEnumerable<string> result2 = values.ParallelMap(
    n =>
    {
      return n.ToLower();
    }
  );

Overall the process is very similar to the previous method, but now we can guarantee that if something was in the first spot of the input list, then its result will be in the first spot of the resulting list. I hope that this helps and makes the ParallelMap method even more useful for you.

Loved the article? Hated it? Didn’t even read it?

We’d love to hear from you.

Reach Out

Leave a comment

Leave a Reply

Your email address will not be published.

More Insights

View All