Mixed producer-consumer scenario in .NET

When I was programming a web crawler (available on GitHub), I have to set up a mixed producer-consumer. In a classic producer-consumer scenario, some threads produce the data, and others consume them. In this case, some threads consume the data (the url of the page), crawl the page, and produce new urls to explore. So, each thread can consume and produce data.

Let's see how to do this!

First, I choose to use a ConcurrentQueue<T> to store the list of items to process. Indeed, the collection must be thred-safe as we can enqueue/dequeue data from many threads. But, you can use any collection from the System.Collections.Concurrent namespace such as BlockingCollection<T>.

var items = new ConcurrentQueue<string>();
items.Enqueue("http://www.meziantou.net"); // add the initial url to explore

Then, you need to create as many threads as needed to process the data. A common practice is to use the number of core of the CPU (or x2 if hyper-threading is enabled). In my case, threads are often waiting for the network so I create more threads.

int maxDegreeOfParallelism = 8;
var tasks = new Task[maxDegreeOfParallelism];
int activeThreadsNumber = 0;
for (int i = 0; i < tasks.Length; i++)
{
    tasks[i] = Task.Factory.StartNew(() => { /* TODO Consumer loop */ }));
}

Task.WaitAll(tasks);

Now, you have to create the consumer loop. The idea is to get an item from the queue, process it, and add the newly created items to the queue. While the queue has items, you repeat the process. When the queue is empty, this doesn't mean this is finished. In fact, one thread can still add new items in the queue. So, you have to wait until there is no more active threads.

while (true)
{
    Interlocked.Increment(ref activeThreadsNumber);

    while (collection.TryTake(out T item))
    {
        var nextItems = processItem(item);
        foreach (var nextItem in nextItems)
        {
            collection.TryAdd(nextItem);
        }
    }

    Interlocked.Decrement(ref activeThreadsNumber);
    if (activeThreadsNumber == 0) //all tasks finished
        return;
}

Now, you can wrap up the code in a generic method:

Task Process<T>(IProducerConsumerCollection<T> collection, Func<T, IEnumerable<T>> processItem, int maxDegreeOfParallelism, CancellationToken ct)
{
    var tasks = new Task[maxDegreeOfParallelism];
    int activeThreadsNumber = 0;
    for (int i = 0; i < tasks.Length; i++)
    {
        tasks[i] = Task.Factory.StartNew(() =>
        {
            while (true)
            {
                Interlocked.Increment(ref activeThreadsNumber);

                while (collection.TryTake(out T item))
                {
                    var nextItems = processItem(item);
                    foreach (var nextItem in nextItems)
                    {
                        collection.TryAdd(nextItem);
                    }
                }

                Interlocked.Decrement(ref activeThreadsNumber);
                if (activeThreadsNumber == 0) //all tasks finished
                    return;
            }
        }, ct);
    }

    return Task.WhenAll(tasks);
}
Enjoy this blog? Buy Me A Coffee Donate with PayPal

Comments

Gérald Barré -

The TPL DataFlow library is very great. You'll find lots of examples to create Producer-Consumer code using it. However, I can't find a nice way to create a Producer-Consumer where the consumer can also produce items using the TPL DataFlow, that's why I wrote the post.

Leave a reply