maandag 27 januari 2014

Competing consumers and returning results with in-memory-queues (BlockingCollection)

I needed a way to to put some work in a queue, consume it and return the result to the producer of the work. In C# we can use the BlockingCollection class and TaskCompletionSource for that.

The code in this blogpost is based on the code from Albahari.

What I want to do is this create a Producer which creates work and places that on a queue. I'll also want to create Consumers which take work off the queue and send the result back to the Producer when they are done. See the following image for an overview:





Let's start with a simple console application:
(Don't forget to set it to the correct framework, as VS 2010 defaults to client framework 4.0)

class Program
{
   static void Main()
   {
      // Put something on a queue
   }
}


This is the class that we are going to put on the queue:



public class WorkItem
{
   public readonly TaskCompletionSource<int> TaskSource;
   public readonly int Context;
   public readonly CancellationToken? CancelToken;

   public WorkItem(
      TaskCompletionSource<int> taskSource,
      int context,
      CancellationToken? cancelToken)
   {
      TaskSource = taskSource;
      Context = context;
      CancelToken = cancelToken;
   }

}


Next we create the queue we are going to use for enqueueing the work:


public class ProducerConsumerQueue : IDisposable
{
   readonly BlockingCollection<WorkItem> _workitemQueue =
      new BlockingCollection<WorkItem>();

   public Task<int> EnqueueTask(int context)
   {
      return EnqueueTask(context, null);
   }

   public Task<int> EnqueueTask(int context,
                                   CancellationToken? cancelToken)
   {
      var tcs = new TaskCompletionSource<int>();
      _workitemQueue.Add(new WorkItem(tcs, context, cancelToken));
      return tcs.Task;
   }

   public void Dispose()
   {
      _workitemQueue.CompleteAdding();
   }
}


As you can see our ProducerConsumerQueue uses a BlockingCollection internally. This is a great class that let's multiple consumers/workers read from it in a thread-safe manner.

Next, create the consumer:



public class Consumer
{
   private readonly BlockingCollection<WorkItem> _workitemQueue;

   public Consumer(BlockingCollection<WorkItem> workitemQueue)
   {
      _workitemQueue = workitemQueue;
   }

   public void Consume()
   {
      foreach (WorkItem workItem in _workitemQueue.GetConsumingEnumerable())
      {
         if (workItem.CancelToken.HasValue &&
             workItem.CancelToken.Value.IsCancellationRequested)
         {
            workItem.TaskSource.SetCanceled();
         }
         else
         {
            try
            {
               Thread.Sleep(1000); // simulate work

               workItem.TaskSource.SetResult(workItem.Context + 1);
            }
            catch (OperationCanceledException ex)
            {
               if (ex.CancellationToken == workItem.CancelToken)
               {
                  workItem.TaskSource.SetCanceled();
               }
               else
               {
                  workItem.TaskSource.SetException(ex);
               }
            }
            catch (Exception ex)
            {
               workItem.TaskSource.SetException(ex);
            }
         }
      }
   }

}



So how do we start the consumers?
We do that in the constructor of the ProducerConsumerQueue:


   public ProducerConsumerQueue(int workerCount)
   {
      for (int i = 0; i < workerCount; i++)
      {
         var consumer = new Consumer(_workitemQueue);

         Task.Factory.StartNew(consumer.Consume);
      }
   }

Finally we can finish the console application:

class Program
{
   static void Main()
   {
      var producerConsumerQueue = new ProducerConsumerQueue(2);

      Task<int> task1 = producerConsumerQueue.EnqueueTask(1);
      Task<int> task2 = producerConsumerQueue.EnqueueTask(2);
      Task<int> task3 = producerConsumerQueue.EnqueueTask(3);

      Console.WriteLine(task1.Result);
      Console.WriteLine(task2.Result);
      Console.WriteLine(task3.Result);

      Console.ReadLine();
   }
}


If we run this, we will start three tasks. Each task will get an input value, add one to it, and return the value to the console app. WE start with two consumers, so after one second the first two tasks finish and the following output is printed:

2
3

A second later the third task finished, and we will see this:

4

You can play around with the number of consumers and the number of enqueued WorkItems.