“Streaming messages” in Web Services

As part of my involvement in the activities of the Grid community, I've heard many times about yet another requirement that is "specific" to the Grid and requires special infrastructure and new toolkits: streaming. It is true that there is no WS-* specification I am aware of that deals explicitly with the streaming of messages concept. But, is it so difficult? Can we reuse existing specifications? Do we need any changes/additions? Or could it be that it's an unnecessary concept to introduce?

As part of my investigation into how to deal with some common uses of Web Services technologies, this note follows the ones on "third-party delivery" and "multi-node flow".

What is streaming in a service-orientated environment?

Before I describe my approach, I must explain what I mean by "streaming" in a service-oriented environment. Does the continuous flow of messages from one service to another constitute a stream? Well, not necessarily. Is the repetition of every message exchange a stream? Not all the time.

I personally see a stream of messages as an application-level abstraction rather than an infrastructure one. At the application-level a stream of messages is any series of messages which are correlated somehow. For example, a reply to a query to a data service which is returned in pieces*, a series of messages representing notifications of events after a subscription has taken place, etc. I hope you get the picture.

How to do streaming?

WS-Eventing already gives the necessary constructs to achieve some sort of streaming. Every notification message can be considered as part of a stream of messages. Each message can be contextualised with additional information so that the event sink (the recipient of a notification message) can distinguish it from other messages, so that it can re-establish the scope. This is done through the inclusion of the context information inside the Endpoint Reference (EPR) of the sink service.

Also, the end of a subscription (either because of an explicit request or because the subscription has expired) can represent the end of the stream.

So, we could use this exact model to create a streaming abstraction for a query on a data service. And that's exactly what follows, using WSE 2.0 for the implementation.

Implementation

As mentioned above, the idea is to allow a series of messages to be scoped as part of the same logical stream of messages. My setup involves a service that accepts an SQL query that is run against the personal version of the SuperCosmos archive. I make the assumption that the result could be so big, that it is much more efficient to return it as a series of messages, rather than a huge SOAP message.

So... enough with the talk, let's examine the interesting parts of the implementation parts of the service which accesses the database.

We start by overriding the Receive() operation for when a request is made. All I do when a request is received, I spawn a thread to deal with it asynchronously.

protected override void Receive(SoapEnvelope request)
{
// Received a request. Spawn a worker to deal with it.
Worker worker = new Worker(request);
Thread thread = new Thread(new ThreadStart(worker.Run));
thread.Start();
}

In the new thread, the query is run against the database. We put 10 rows at a time into a message and sent it to the receiving service.

class Worker
{
private SoapEnvelope _request; public Worker(SoapEnvelope request)
{
this._request = request;
} public void Run()
{
SoapSender sender = new SoapSender(this._request.Context.Addressing.ReplyTo);
sender.Destination.Via = null; // Do the SQL stuff here and create an SqlAdapter.
... ReliableStarsDataSet ds = new ReliableStarsDataSet(); int start = 0;
int rows = 0;
do
{
// Page size is 10!
rows = adapter.Fill(ds, start, 10, ds.ReliableStars.TableName);
start += rows; SoapEnvelope env = new SoapEnvelope();
env.Context.Addressing.Action = new Action("why:do:i:need:an:action????");
env.SetBodyObject(ds); if (rows == 0)
{
env.Header.AppendChild(env.CreateElement("str", "StreamingInfo", "uk:ac:neresc:wsgaf:streaming")).InnerText = "end";
}
sender.Send(env);
}
while (rows > 0);
}
}

Pretty simple stuff, right?

Now, for the service that submits the SQL query and receives the stream of messages.

static void Main(string[] args)
{
// Register our service so we can receive the stream of replies
EndpointReference replyEpr = new EndpointReference(new Uri("soap.tcp://localhost:9001/ConsumerService"));
SoapReceivers.Add(replyEpr, new ConsumerService()); // Send the query
SoapSender sender = new SoapSender(new Uri("soap.tcp://localhost:9000/StreamingService")); SoapEnvelope env = new SoapEnvelope();
env.Context.Addressing.ReplyTo = new ReplyTo(replyEpr);
env.CreateBody();
env.Body.InnerXml = @"<query hello:hello"">show me the data! show me the data!</query>";
env.Context.Addressing.Action = new Action("why:do:i:need:an:action????");
sender.Send(env); Console.WriteLine("Press ENTER to stop waiting");
Console.ReadLine();
}

Again, simple stuff. Now, all we have left is to write the logic that processes the receiving stream...

class ConsumerService : SoapReceiver
{
protected override void Receive(SoapEnvelope envelope)
{
Console.WriteLine("Msg received!"); XmlNamespaceManager mgr = new XmlNamespaceManager(envelope.NameTable);
mgr.AddNamespace("str", "uk:ac:neresc:wsgaf:streaming");
if (envelope.Header.SelectNodes("descendant::[str:StreamingInfo='end']") == null)
{
Console.WriteLine("stream ended");
}
}
}

Above, using an XPath query we search the headers of the SOAP message that we received for the indication that it was the last message in the stream. My first implementation did not introduce a header but, instead, the "end-of-stream" was indicated by an empty-bodied SOAP message. The advantage of this is that streaming is modelled explicitly at the application level but the disadvantage is that there is an extra message.

If the same service was to receive multiple streams at the same time, an identifier could be included in the ReferenceProperties of the ReplyTo Endpoint Reference. The services sending streams would be obligated to include that information and, hence, logically correlate all the messages as far as this service is concerned.

The next step in this example, when I find some time, is to demonstrate how the stream could be managed (e.g., how to pause it, to stop it before it ended, etc.). WS-Eventing demonstrates how this could be done.

* While I was in the middle of writing these notes, Microsoft and others released the WS-Enumeration specification which demonstrates a "pull" way of achieving the same thing.