Mixed producer-consumer scenario in .NET

 
 
  • Gérald Barré

While building a web crawler ([available on GitHub][https://github.com/meziantou/WebCrawler]), I needed to set up a mixed producer-consumer. In a classic producer-consumer scenario, some threads produce data while others consume it. In this scenario, threads consume data (page URLs), crawl each page, and produce new URLs to explore. Each thread acts as both a producer and a consumer.

Let's see how to do this!

First, I chose ConcurrentQueue<T> to store the items to process. The collection must be thread-safe since multiple threads enqueue and dequeue items concurrently. You can also use any collection from the System.Collections.Concurrent namespace, such as BlockingCollection<T>.

C#
var items = new ConcurrentQueue<string>();
items.Enqueue("https://www.meziantou.net"); // add the initial url to explore

Next, create as many threads as needed to process the data. A common practice is to match the number of CPU cores (or double that if hyper-threading is enabled). In this case, threads spend a lot of time waiting on network I/O, so more threads are used.

C#
int maxDegreeOfParallelism = 8;
var tasks = new Task[maxDegreeOfParallelism];
int activeThreadsNumber = 0;
for (int i = 0; i < tasks.Length; i++)
{
    tasks[i] = Task.Factory.StartNew(() => { /* TODO Consumer loop */ }));
}

Task.WaitAll(tasks);

Next, implement the consumer loop. The loop dequeues an item, processes it, and adds any newly produced items back to the queue. This repeats while the queue has items. However, an empty queue does not mean processing is complete - another thread may still be adding items. Processing only stops when the queue is empty and no threads are active.

C#
while (true)
{
    Interlocked.Increment(ref activeThreadsNumber);

    while (collection.TryTake(out T item))
    {
        var nextItems = processItem(item);
        foreach (var nextItem in nextItems)
        {
            collection.TryAdd(nextItem);
        }
    }

    Interlocked.Decrement(ref activeThreadsNumber);
    if (activeThreadsNumber == 0) //all tasks finished
        return;
}

You can wrap this logic in a generic reusable method:

C#
Task Process<T>(IProducerConsumerCollection<T> collection, Func<T, IEnumerable<T>> processItem, int maxDegreeOfParallelism, CancellationToken ct)
{
    var tasks = new Task[maxDegreeOfParallelism];
    int activeThreadsNumber = 0;
    for (int i = 0; i < tasks.Length; i++)
    {
        tasks[i] = Task.Run(() =>
        {
            while (true)
            {
                Interlocked.Increment(ref activeThreadsNumber);

                while (collection.TryTake(out T item))
                {
                    var nextItems = processItem(item);
                    foreach (var nextItem in nextItems)
                    {
                        collection.TryAdd(nextItem);
                    }
                }

                Interlocked.Decrement(ref activeThreadsNumber);
                if (activeThreadsNumber == 0) // all tasks finished
                    return;
            }
        }, ct);
    }

    return Task.WhenAll(tasks);
}

#Update: Using a Channel

C#
using System.Diagnostics.CodeAnalysis;
using System.Threading.Channels;

public abstract class Processor<T>
{
    private readonly Channel<T> _pendingItems = Channel.CreateUnbounded<T>();

    protected abstract Task ProcessAsync(T item);

    public async ValueTask EnqueueAsync(T item, CancellationToken cancellationToken = default)
    {
        await _pendingItems.Writer.WriteAsync(item, cancellationToken);
    }

    public async Task RunAsync(int degreeOfParallelism, CancellationToken cancellationToken = default)
    {
        var tasks = new List<Task>(degreeOfParallelism);
        var remainingConcurrency = degreeOfParallelism;
        while (await _pendingItems.Reader.WaitToReadAsync(cancellationToken))
        {
            while (TryGetItem(out var item))
            {
                // If we reach the maximum number of concurrent tasks, wait for one to finish
                while (Volatile.Read(ref remainingConcurrency) < 0)
                {
                    // The tasks collection can change while Task.WhenAny enumerates the collection
                    // so, we need to clone the collection to avoid issues
                    Task[]? clone = null;
                    lock (tasks)
                    {
                        if (tasks.Count > 0)
                        {
                            clone = tasks.ToArray();
                        }
                    }

                    if (clone != null)
                    {
                        await Task.WhenAny(clone);
                    }
                }

                var task = Task.Run(() => ProcessAsync(item), cancellationToken);

                lock (tasks)
                {
                    tasks.Add(task);
                    task.ContinueWith(task => OnTaskCompleted(task), cancellationToken);
                }
            }

            bool TryGetItem([MaybeNullWhen(false)] out T result)
            {
                lock (tasks)
                {
                    if (_pendingItems.Reader.TryRead(out var item))
                    {
                        remainingConcurrency--;
                        result = item;
                        return true;
                    }

                    result = default;
                    return false;
                }
            }

            void OnTaskCompleted(Task completedTask)
            {
                lock (tasks)
                {
                    if (!tasks.Remove(completedTask))
                        throw new Exception("An unexpected error occurred");

                    remainingConcurrency++;

                    // There is no active tasks, so we are sure we are at the end
                    if (degreeOfParallelism == remainingConcurrency && !_pendingItems.Reader.TryPeek(out _))
                    {
                        _pendingItems.Writer.Complete();
                    }
                }
            }
        }
    }
}

Do you have a question or a suggestion about this post? Contact me!

Follow me:
Enjoy this blog?