Implementing an Actor in C# with Task Continuations

,

We are having fun at work with some investigation work in the NoSQL space. We've been playing with actors in our distributed system. There has been a lot of prior work in this space so we are not really doing anything new with actors but it's still fun. Scala, F#, Erlang, and many others have successfully applied the model for managing concurrency.

Earlier today, Aaron Lahman attempted to concurrently send messages to 20,000 actors in the same process. My implementation of the actor model didn't handle it. As we started investigating, we came up with an implementation pattern that we thought of sharing in case others find it interesting. Lots of credit goes to Aaron for his insight and feedback.

Let's start with what was wrong.

Asynchrony is baked in everything that we are building. Every message sent between components or every method invoked has to be done so asynchronously. We are heavy users of the Task Parallel Library (TPL) and C#'s upcoming async/await feature (Microsoft Visual Studio Async CTP). So, what happened with the actor?

Our actors look just like any other actor out there 🙂 There is a message queue and some optional state. Each message is processed in turn and is given a copy of the state (if any). Then, it is expected to produce an updated version of that state (if it wishes) and some output to be sent to the original sender of the message.

image

So, my original implementation used a blocking queue for the incoming messages (effectively calls to Execute() are treated as messages). Each actor in the system would start a task (line 7) and then block waiting for a message to arrive (line 30). Each message is converted to a task which was queued (line 17).

(As you can tell from the simple implementation above, our actors don't have any state. I just wrote these two simple actor implementations for illustration purposes.)

         1:  
      class BlockingActor
         2:  {
         3:  
      private BlockingQueue<Task> operationQueue = new BlockingQueue<Task>();
         4:   
         5:  
      public BlockingActor()
         6:      {
         7:          Task.Factory.StartNew(this.ProcessMessage);
         8:      }
         9:   
        10:  
      public Task<R> Execute<T, R>(Func<T, Task<R>> function, T arg)
        11:      {
        12:  
      if (!function.Method.IsStatic)
        13:          {
        14:  
      throw
      new ArgumentException("Function must be static");
        15:          }
        16:   
      
          17:          var task = new Task<R>(() => function(arg).Result);
    
        18:  
      lock (this.operationQueue)
        19:          {
        20:  
      this.operationQueue.Enqueue(task);
        21:          }
        22:   
        23:  
      return task;
        24:      }
        25:   
        26:  
      void ProcessMessage()
        27:      {
        28:  
      while (true)
        29:          {
        30:              var task = this.operationQueue.Dequeue();
      
          31:              task.RunSynchronously();
    
        32:          }
        33:      }
        34:  }

 

As you can see from line 17, I had to wrap the given function inside a Task so that I could control the timing of its execution. The use of Task.Result meant that when the task actually run (line 31), the allocated thread would block. Here's a sample program to demonstrate the behavior...

         1:  
      class Program
         2:  {
         3:  
      static
      int NoActors = 50000;
         4:   
         5:  
      static async void Main(string[] args)
         6:      {
         7:          var actors = new BlockingActor[NoActors];
         8:          var tasks = new List<Task>();
         9:   
        10:  
      for (int i = 0; i < NoActors; i++)
        11:          {
        12:              actors[i] = new BlockingActor();
        13:              actors[i].Execute(Foo, i);
        14:          }
        15:   
        16:          Console.WriteLine("done queuing");
        17:          Task.WaitAll(tasks.ToArray());
        18:          Console.ReadLine();
        19:      }
        20:   
        21:  
      static async Task<string> Foo(int i)
        22:      {
        23:          await TaskEx.Delay(TimeSpan.FromSeconds(4));
        24:          Console.WriteLine("done " + i);
        25:  
      return
      "done";
        26:      }
        27:  }

 

The blocking calls make the Task scheduler's life very difficult since it has to keep allocating threads. All our asynchrony-related effort goes to waste because the processing of a message doesn't yield the processor when it blocks, like it happens in line 23 above.

What Aaron and I came up with is an Actor implementation that uses Task continuations to simulate the message queue. Since the semantics of the actor model require us to process a message only after the processing of the previous one has finished, all we have to do is to chain the respective tasks. Here's how our Actor looks like now...

         1:  
      class Actor
         2:  {
         3:  
      private Task lastTask = TaskEx.FromResult(0);
         4:  
      private
      object objLock = newobject();
         5:   
         6:  
      public Task<R> Execute<T, R>(Func<T, Task<R>> function, T arg)
         7:      {
         8:  
      if (!function.Method.IsStatic)
         9:          {
        10:  
      throw
      new ArgumentException("Function must be static");
        11:          }
        12:   
        13:          var tcs = new TaskCompletionSource<R>();
        14:   
        15:          Task<R> task = null;
        16:  
      lock (this.objLock)
        17:          {
        18:              task = this.lastTask.ContinueWith(_ => function(arg)).Unwrap();
        19:  
      this.lastTask = task;
        20:          }
        21:   
        22:          task.ContinueWith(t => tcs.TrySetResult(t.Result), TaskContinuationOptions.OnlyOnRanToCompletion);
        23:          task.ContinueWith(t => tcs.TrySetException(t.Exception), TaskContinuationOptions.OnlyOnFaulted);
        24:          task.ContinueWith(t => tcs.TrySetCanceled(), TaskContinuationOptions.OnlyOnCanceled);
        25:   
        26:  
      return tcs.Task;
        27:      }
        28:  }

 

With the above implementation we are only restricted by the amount of available memory for the running process. We always maintain a reference to the last task in the chain so that we can attach incoming tasks to it. The first ever task in the chain is created to be in the "complete" state (line 3) so the first incoming message will run immediately. We only call the Task.Result blocking operation after a given task has been completed successfully (line 22).

We can now have thousands of concurrent actors together as long as the Tasks representing the processing of a message use the async/await pattern to yield the processor when necessary. We are making sure that the libraries we build on top do just that so that we can achieve maximum possible performance.

Task continuations are pretty cool.

One response to “Implementing an Actor in C# with Task Continuations”

  1. If you consider implementing the above pattern, please be aware that there is a bug in case your actor happens to send messages to itself. Depending on how you implement the message queue, you might be affected by the following situation…

    function(arg) may be executed synchronously if the lastTask is already completed. As a result, any messages to the same actor that are sent by function(arg) will be able to acquire the lock (reentrant locks). That might cause problems with the queueing logic.