BlockingCollection and IProducerConsumerCollection

Writing

In .NET 4.0 there is a new namespace on the block, and it is called System.Collections.Concurrent. In this namespace you will find a pretty decent number of goodies that will help you to more easily write application which can leverage multiple threads without having to resort to manual locking. We looked previously at the ConcurrentBag, ConcurrentStack, and ConcurrentQueue. Well, I have two more goodies to show off, and those are the BlockingCollection class and the IProducerConsumerCollection interface.

In order to get this party started, let me explain exactly what the IProducerConsumerCollection is and why we would want to use it. The IProducerConsumerCollection interface is pretty simply and really exposes only two methods that we care about: TryAdd and TryTake. You see, IProducerConsumerCollection is designed for multi-threaded producer/consumer scenarios, which means situations where we have multiple threads producing pieces of data (producers), and then multiple threads that are trying to consume those pieces of data (consumers). So, IProducerConsumer just signifies that the implementing type supports thread-safe adding and removing of data. (In .NET 4.0, the types that will support this interface are the ones that we already looked at: ConcurrentStack, ConcurrentQueue, and ConcurrentBag)

And this is where BlockingCollection comes in. The BlockingCollection type is very interesting in that it is just a wrapper over an IProducerConsumerCollection. Meaning that it doesn’t actually implement an underlying storage mechanism, it needs an instance of an IProducerConsumerCollection in order to store data. What it provides is a mechanism for the specific scenario where you have one (or more) threads that are producing values (producers) and then have one more more threads which are consuming values from the collection (consumers). This means that we can use any type which supports IProducerConsumerCollection and if we don’t give the BlockingCollection an implementation, it will use ConcurrentQueue (FIFO) by default.

In order to show off the main use case for the BlockingCollection class, let’s look at a multi-threaded scenario in which our normal collection classes will blow up spectacularly. Let’s say that we have a queue and we want to put work into that queue, and have other threads consuming the items that are put into the queue in order to perform some operation on them.

(We will use Tasks from the System.Threading.Tasks namespace in order to fire off async operations. Go read my post on Tasks if you have never used them.):

var queue = new Queue<string>();
Task.Factory.StartNew(() =>
{
    while (true)
    {
        queue.Enqueue("value");
    }
});

In this case, the producer is just adding the same value over and over into the queue. We will then create some consumers which will use this collection and try to loop over it’s values:

Task.Factory.StartNew(() =>
{
    while (true)
    {
        if (queue.Count > 0)
        {
            string value = queue.Dequeue();
            Console.WriteLine("Worker 1: " + value);
        }
    }
    
});

Task.Factory.StartNew(() =>
{
    while (true)
    {                    
        if (queue.Count > 0)
        {
            string value = queue.Dequeue();
            Console.WriteLine("Worker 2: " + value);
        }
    }
    
});

Now, I hope this is clear that this is NOT GOING TO WORK. It might appear to work for a bit, and if the load is low, it might run for a while. But if we let this run at a high load for more than a few minutes we will get errors because one of the threads will check if the queue is empty and then try to pull an item out, and get an exception because the other thread already removed the item. A classic race condition. In fact, when I ran it, it even started popping a bunch of null values off the queue, which I think is because the internal Count of items dropped below zero and the Dequeue method only checks to see if Count equals zero, not less than zero.

So while this code doesn’t work properly, this scenario is a very popular one. Having worker threads pull their own work is a great way of distributing it. If you want more parallelism, then you just spin up more consumers who will then start pulling work.

So, what would we have to do in order to support this scenario? One way to do this would be to use the lock keyword to serialize all calls to the queue. This might be an okay solution if each work item is heavy, but it is going to destroy performance if we have work items that are very light or we have a huge number of consumers. Well, in .NET 4.0 we have several lock free data structures available to us. In fact, as you saw in one of my previous posts, one of them is a concurrent queue. If we used the ConcurrentQueue, we could write the code like this:

var queue = new ConcurrentQueue<string>();
Task.Factory.StartNew(() =>
{
    while (true)
    {
        queue.Enqueue("value" + count);
        count++;                    
    }
});

Task.Factory.StartNew(() =>
{
    while (true)
    {
        string value;
        if (queue.TryDequeue(out value))
        {
            Console.WriteLine("Worker 1: " + value);
        }
    }
});

Task.Factory.StartNew(() =>
{
    while (true)
    {
        string value;
        if (queue.TryDequeue(out value))
        {
            Console.WriteLine("Worker 2: " + value);
        }
        
    }
});

Which isn’t bad, but wouldn’t it be nice if we could just try and get an item and have the queue block if no items are available? Sure it would be! And this is why we have BlockingCollection. It implements this exact behavior, and a little extra. BlockingCollection takes an IProducerConsumerCollection in its constructor, or it will use a ConcurrentQueue by default if you call its empty constructor. Then all you have to do is call "Add" or "Take" on the BlockingCollection and it will block if there is nothing left in the queue. So the above code would look like this:

var blockingCollection = new BlockingCollection<string>();
Task.Factory.StartNew(() =>
{
    while (true)
    {
        blockingCollection.Add("value" + count);
        count++;                    
    }
});

Task.Factory.StartNew(() =>
{
    while (true)
    {                    
        Console.WriteLine("Worker 1: " + blockingCollection.Take());
    }
});

Task.Factory.StartNew(() =>
{
    while (true)
    {
        Console.WriteLine("Worker 2: " + blockingCollection.Take());
    }
});

Well that is a tiny bit better, but we still have those random while statements. Wouldn’t it be nice if we didn’t need those? Yes, again, it would be! Don’t you wish I’d stop asking rhetorical questions? Ha, of course you do. 🙂 Anyways, BlockingCollection implements this behavior for us as well with a method called "GetConsumingEnumerable". What we do is simply call this method and then iterate over the resulting IEnumerable and it blocks until it finds work items! Now that is nice! So one of the workers from above would look like this:

Task.Factory.StartNew(() =>
{
    foreach (string value in blockingCollection.GetConsumingEnumerable())
    {
        Console.WriteLine("Worker 1: " + value);
    }                
});

This code will sit there forever, iterating over the blocking collection, and blocking whenever it runs out of items. As soon as new items start appearing in the collection, it will start enumerating them again! Very easy!

Summary

And there you have it. BlockingCollection is an incredibly easy way to have a few threads producing data and have numerous other threads picking up and processing that same data. You can switch out its underlying storage mechanism in order to affect the behavior regarding where items go when you add them, and where items come from when you take them, all with it being completely abstracted away from the underlying data store. I hope that you find some great uses for the BlockingCollection class, and I hope you enjoyed this post!

More Insights

View All