ReactGraph Part 2: Remote evaluation of continuous queries

Investigation into bringing together graph and reactive computing. This is the second post in the series and deals with the remote evaluation of continuous queries.

What’s an IQbservable?

In my previous post I mentioned that Reactor is based on the IQbservable abstraction. Given the importance of this interface in the exploration for a graph store with both “pull” graph querying and “push” reactive computing capabilities, perhaps we should look at it a bit more closely.

The following table is often used in presentations to summarize the LINQ-related interfaces…

IEnumerable. Abstraction for pull-based behavior against a collection/data source (e.g. LINQ-to-object) IQueryable. Abstraction for remoting the pull-based behavior against a data source (e.g. LINQ-to-SQL)
IObservable. Abstraction for push-based behavior against a sequence of events (a stream) (e.g. Rx) IQbserable. Abstraction for remoting the push-based behavior against a sequence of events (e.g. Rx/Reactor)

The top row represents “pull” queries. Such queries consider the data in the collection/data source at the time of their evaluation/execution. If we want to consider changes to the collection/data source since the last evaluation, we have to submit the query again. The second row represents “push” queries, or continuous queries. Such queries consider changes to the data as those changes happen. They operate over streams of data instead of collections of data.

If I was to explain the above using code…

IEnumerable<Post> feed = savas
  .Where(f => savas.Follows(f))
  .SelectMany(f => f.Posts().NodeValue<StatusUpdate>().OrderByDescending(p => p.DatePublished).Take(5))
  .OrderByDescending(p => p.DatePublished)

// savas.Friends() and f.Posts() are IEnumerables

The above LINQ expression will be evaluated against the in-memory data source. If we make savas.Friends() and f.Posts() IQueryables, then the same LINQ expression tree will be sent to the remote data source, where it might be transformed to some other query language (e.g. SQL), for evaluation. In both cases, we trigger the evaluation of the expression via an enumeration operation…

var result = feed.ToList();
// or
foreach (var post in feed) {

If we wanted to observe an information stream in order to receive notifications when something changes in the underlying data, we can write an Rx LINQ expression that looks similar to the above. Since we continuously observe for events, we don’t need to order the data, even though we could create a time-based window and re-order the events.

IObservable<Person> feed = savas
  .Where(f => savas.Follows(f))
  .Select(f => f.PostStream().NoveValue<StatusUpdate>())

// Friends() is an IObservable<Post>
// Person.PostStream() is an IObservable<Post>

The above LINQ expression tree represents a continuous query that, once subscribed to, will start emitting a stream of posts that make a user’s feed. When any of the friends create post an update, the subscribers will receive an update. Example:

feed.Subscribe(p => Display(p));

The evaluation of the query will happen against the in-memory observable streams. If the Friends() and Person.PostStream() returned IQbservables (notice the “Q” instead of “O”), then the same exact expression tree would be sent to a remote platform for continuous evaluation against the streams of events there.