Sneaking Up on Reactive Extensions

I saw Matthew Podwysocki speak on Reactive Extensions at the most recent DC Alt .NET meeting. I’ve heard some buzz about Reactive Extensions (Rx) as Linq over Events. That sounded cool, so I put the sticky note in the back of my mind to look into it later. Matthew’s presentation blew my mind a bit. Rx provides so much functionality and is so different from traditional event programming that I thought it would be helpful for me to retrace a few of the first necessary steps that would go into creating something as powerful as Rx. To that end, I starting writing a DisposableEventObserver class.

This class has two goals at this point:

  1. Replace the traditional EventHandler += new EventHandler() syntax with an IDisposable syntax.
  2. Add conditions to the Observer that determine if it will handle the events.

This is learning code. What I mean by this is that it is doubtful that the code I’m writing here will ever be used an a production application since Rx will be far more capable than what I write here. The purpose of this code is to help me (and maybe you) to gain insight into how Rx works. There are two notable Rx features that I will not be handling in v1 of DisposableEventObserver:

  1. Wrangling Asynchronous Events.
  2. Composability.

The first test I wrote looked something like this:

  1. [TestFixture]
  2. public class DisposableEventObserverTests
  3. {
  4.     public event EventHandler<EventArgs> LocalEvent;
  5.  
  6.     [Test]
  7.     public void SingleSubscriber()
  8.     {
  9.         // Arrange: Setup the test context
  10.         var count = 0;
  11.  
  12.         // Act: Perform the action under test
  13.         using (var observer = new DisposableEventObserver<EventArgs>(this.LocalEvent,
  14.             (sender, e) => { count += 1; })
  15.         {
  16.             this.LocalEvent.Invoke(null, null);
  17.             this.LocalEvent.Invoke(null, null);
  18.             this.LocalEvent.Invoke(null, null);
  19.         }
  20.  
  21.         // Assert: Verify the results of the action.
  22.         Assert.That(count, Is.EqualTo(3));
  23.     }

The test fixture served as my eventing object. I’m passing the event handler as a lambda. There were two interesting things about this approach. The first is that type required to pass this.LocalEvent is the same delegate type as the that required to pass the handler. The second is that this code did not work.

I was a little confused as to why the test didn’t pass. The lines inside the using block blew up with a NullReferenceException when I tried to reference this.LocalEvent. This is odd because inside the Observer I was definitely adding the handler to the event delegate. What’s going on here? It turns out that although Events look for all intents and purposes like a standard delegate field of the same type, the .NET framework treats them differently. Events can only be invoked by the class that owns them. The event fields themselves cannot reliably be passed as parameters.

I backed up a bit and tried this syntax:

  1.  
  2. [Test]
  3. public void SingleSubscriber()
  4. {
  5.     // Arrange: Setup the test context
  6.     var count = 0;
  7.  
  8.     EventHandler<EventArgs> evt = delegate {count += 1; };
  9.  
  10.     // Act: Perform the action under test
  11.     using (var observer = new DisposableEventObserver<EventArgs>(this, "LocalEvent", evt))
  12.     {
  13.         this.LocalEvent.Invoke(null, null);
  14.         this.LocalEvent.Invoke(null, null);
  15.         this.LocalEvent.Invoke(null, null);
  16.     }
  17.  
  18.     // Assert: Verify the results of the action.
  19.     Assert.That(count, Is.EqualTo(3));
  20. }

This test implies an implementation that uses reflection to find the event and add the handler. This worked the first time at bat, however I don’t like that magic string “LocalEvent” sitting there. I thought back to Josh Smith’s PropertyObserver and wondered if I could do something similar. Here’s a test that takes an expression that resolves to the event:

  1. [Test]
  2. public void Subscribe_Using_Lambda()
  3. {
  4.     // Arrange: Setup the test context
  5.     var count = 0;
  6.  
  7.     EventHandler<EventArgs> evt = delegate { count += 1; };
  8.  
  9.     // Act: Perform the action under test
  10.     using (var observer = new DisposableEventObserver<EventArgs>(this, () => this.LocalEvent, evt))
  11.     {
  12.         this.LocalEvent.Invoke(null, null);
  13.         this.LocalEvent.Invoke(null, null);
  14.         this.LocalEvent.Invoke(null, null);
  15.     }
  16.  
  17.     // Assert: Verify the results of the action.
  18.     Assert.That(count, Is.EqualTo(3));
  19.  
  20. }

This looks much better to me. Now I’ll get a compile-error if the event name or signature changes.

The next step is to add some conditionality to the event handling. While this class will not be Queryable like Rx, I’m going to use a similar Where() syntax to add conditions. I added the following test:

  1. [Test]
  2. public void Where_ConditionNotMet_EventShouldNotFire()
  3. {
  4.     // Arrange: Setup the test context
  5.     var count = 0;
  6.  
  7.     EventHandler<EventArgs> evt = delegate { count += 1; };
  8.  
  9.     // Act: Perform the action under test
  10.     using (var observer = new DisposableEventObserver<EventArgs>(this,
  11.         () => this.LocalEvent,
  12.         evt).Where((sender, e) => e != null)
  13.         )
  14.     {
  15.         this.LocalEvent.Invoke(null, null);
  16.         this.LocalEvent.Invoke(null, null);
  17.         this.LocalEvent.Invoke(null, null);
  18.     }
  19.  
  20.     // Assert: Verify the results of the action.
  21.     Assert.That(count, Is.EqualTo(0));
  22. }

The Where condition specifies that the event args cannot be null. In this case count should never be incremented. I had to make several changes to the internals of the Observer class to make this work. Instead of registering “evt” with the event handler I had to create an interceptor method inside the Observer to test the criteria. If the criteria are met then “evt” will be called. I implemented Where as an “Add” function over a collection of Func<object, TEventArgs, bool>.

The full implementation can be found here, and the tests and be found here.

One thought on “Sneaking Up on Reactive Extensions

Leave a Reply