Using Rx to send rate-limited events and testing it

Many a time, we might want to consume an API which is rate-limited. Well, if you are hitting the API far below the rate limit good for you. But if you want to squeeze out every penny this blog post might be for you.

For my case, there is a windows service listening periodically (every minute) to a MSMQ. Each MSMQ message contains a list of user Ids. These user Ids will be posted to a dummy API. My requirements are to call the API only 2 times per second.

Let’s set up the logistics of the service first. I have created a .NET Framework 4.8 console application and used Topshelf to do service management. I have created an Orchestrator class that has equivalent of service start and stop events. The Start and Stop event will start a timer and stop it respectively.

public class Orchestrator{
	
    Timer timer;

    // passed as delegate for ServiceConfigurator.WhenStarted 
    public void Start()
    {
        timer.Start();
    }

    // passed as delegate for ServiceConfigurator.WhenStopped
    public void Stop()
    {
        timer.Stop();
    }
}

When a timer is elapsed, a Timer_Elapsed handler is called. The event handler, reads a message from queue, deserializes the message to a list of integers and passes it to Publish(List<int> userIds) method of MessageProcessor class. Phew, logistics setup; lets move forward.

public Orchestrator(IApiClient client, string queuePath, int notificateRate)
{
    this.client = client;
    messageQueue = new MessageQueue(queuePath);
    messageProcessor = new MessageProcessor(client, notificateRate);
    timer = new Timer(60 * 1000);
    timer.Elapsed += Timer_Elapsed;
}

private void Timer_Elapsed(object sender, ElapsedEventArgs e)
{
    try
    {
        Message message = messageQueue.Receive(TimeSpan.FromSeconds(2), MessageQueueTransactionType.None);
        XmlMessageFormatter formatter = new XmlMessageFormatter(new Type[] { typeof(string) });
        var intArray = formatter.Read(message);
        messageProcessor.Publish(JsonConvert.DeserializeObject<List<int>>(intArray.ToString()));
    }
    catch (MessageQueueException ex)
    {
      //.. Error handling
    }
}

The MessageProcessor class does the actual work of calling the ApiClient for each userId by calling ApiClient’s PostAsync(int userId) method, so lets see how its Publish method work.

public class MessageProcessor
{
    private readonly IApiClient client;
    private readonly int requestsPerSecond;
    private readonly TimeSpan timeSpan;
    private IScheduler scheduler;

    public MessageProcessor(IApiClient client, int requestsPerSecond) 
        : this(client, requestsPerSecond, ThreadPoolScheduler.Instance)
    {

    }

    public MessageProcessor(IApiClient client, int requestsPerSecond, IScheduler scheduler)
    {
        this.client = client;
        this.requestsPerSecond = requestsPerSecond; // per second
        timeSpan = TimeSpan.FromMilliseconds(1000.0 / requestsPerSecond);
        this.scheduler = scheduler;
    }

    public void Publish(List<int> userIds)
    {
        var subject = new Subject<int>();
        subject
            // Zip takes an combines one from both the subject and interval 
            // and picks up the new as per the zip function
            .Zip(Observable.Interval(timeSpan, scheduler), (int x, long y) => x)
            .Select(userId => Observable.FromAsync(() => SendNotification(userId), scheduler))
            .Concat()
            .Subscribe();
        userIds.ForEach(userId => subject.OnNext(userId));
        subject.OnCompleted();
    }

    public async Task SendNotification(int userId)
    {
        await client.PostAsync(userId);
    }

}

We have created a Subject which acts as an Observable. We then apply Zip operator to the Subject. The Zip operator takes two Observables and combines one element from each Observable using the combine function and emits a single item. In our case our first Observable is the Subject itself and second observable is an Interval that emits an item twice per second. We then convert the observable to an async observable by calling the SendNotification function. The concat and then subscribe completes the job.

After the subscription is setup, we push the integers into the subject and call complete on the Subject. Please note, if you do not complete the subject, you might be leaking memory.

To test the output, I create a dummy node.js server and timestamped the output. I ran the job for a list of 50 integers and here is the output.

[2019-11-10T06:59:52.935Z] /0
[2019-11-10T06:59:53.285Z] /1
[2019-11-10T06:59:53.785Z] /2
[2019-11-10T06:59:54.286Z] /3
[2019-11-10T06:59:54.786Z] /4
[2019-11-10T06:59:55.287Z] /5
[2019-11-10T06:59:55.787Z] /6
[2019-11-10T06:59:56.288Z] /7
[2019-11-10T06:59:56.789Z] /8
[2019-11-10T06:59:57.289Z] /9
[2019-11-10T06:59:57.790Z] /10
[2019-11-10T06:59:58.291Z] /11
[2019-11-10T06:59:58.791Z] /12
[2019-11-10T06:59:59.292Z] /13
[2019-11-10T06:59:59.793Z] /14

As you can see, except for the first message, all the other messages have arrived after at a difference of 500ms as expected. I suspect, first one might have taken time because of nodejs initialization. You can find the code on Github.

Wait, don’t go yet. Have we not yet finished the job. We have not written a most important piece of code: Unit Test. But how do we unit test something driven by time. Microsoft has provided us with Microsoft.Reactive.Testing nuget package which provides us the tools we need.

Most of the Observables, depend on a Scheduler to perform its task and take IScheduler as a parameter. If we do not pass a Scheduler, a default one is used. The IScheduler interface allows us to schedule an action. The .NET runtime provides us different types of Schedulers based on the context, for example WPF provides Dispatcher, ASP.NET provides ThreadPoolScheduler and so on.

In the MessageProcessor class above, we have used a ThreadPoolScheduler as the default Scheduler for Interval observable. We have also created a overload for testing where we can inject a new Scheduler.

Here comes the user of Microsoft.Reactive.Testing nuget package. It provides us a TestScheduler class that we can substitute for the ThreadPoolScheduler and do our testing. Here is the unit testing code.

[Fact]
public void PublishTest()
{
    // Arrange
    var scheduler = new TestScheduler();
    var testUserIds = new List<int> { 1, 2, 3, 4, 5, 6, 7, 8, 9 };
    MockApiClient client = new MockApiClient();
    var ticks = TimeSpan.FromMilliseconds(500).Ticks;

    // Act
    MessageProcessor messageProcessor = new MessageProcessor(client, 2, scheduler);
    messageProcessor.Publish(testUserIds);

    // Assert
    for (int i = 0; i < testUserIds.Count; i++)
    {
        scheduler.AdvanceBy(ticks);
        Assert.Equal(testUserIds[i], client.Value);
    }
}

public class MockApiClient : IApiClient
{
    public int Value = 0;
    public async Task<bool> PostAsync(int userId)
    {
        Value = userId;
        return await Task.FromResult(true);
    }
}

We have created a MockApiClient that is a mock of ApiClient for testing purpose. In the PublishTest method, we arrange the mocks, publish the data. Our assertion should be to verify that the messageProcessor object is calling PostAsync method of MockApiClient every 500ms with the correct userId.

In case, we would have passed ThreadPoolScheduler in place of TestScheduler, we would have to wait for 500ms for the next event of interval to be raised and consumed, which is not acceptable. The TestScheduler’s AdvanceBy(long ticks) method allows us to move time ahead by the ticks passed. So we advanced our scheduler by the ticks (computed for 500ms) and assert that our mockApiclient’s PostAsync method be called and the variable set. And unlike the actual Scheduler, TestScheduler’s clock remains at that position unless we explicitly move it forward.

The TestScheduler class some other functions which allow us to manipulate time:

  • Start() it starts the Scheduler’s clock and keeps it running
  • AdvanceTo(long ticks) moves the Schedulers clock to that absolute position
  • Stop() stops the Scheduler’s clock.

Conclusion

Testing time based code will turn out to be difficult, unless right tools are used. Do you have any such experiences in writing and testing time based code, please let me know in the comments.

You can find the code on https://github.com/jigargandhi/throttler

Saying hello to Span and Memory types

The Span and Memory types are wonderful types that allow you to write allocation free code. This means you can squeeze out more performance by reducing the time taken by GC to clean up the objects that we might have created as a side effect of our code.

Equipped with these types, I set out to try and play around with these shiny toys like a grown-up kid. Let’s employ these types to the task of finding words in a given sentence.

I have taken 1024 bytes from lipsum.com site as my input. The simplest way of performing the task is to use string.Split(' ') to find out individual words.

public void StandardWordCounter()
{
    string[] words = sentence.Split(' ');
    foreach (var word in words)
    {
        Console.WriteLine(word);
    }
}

However, the problem above is that it will do memory allocation for all the separate words. Now we will try to reduce the memory allocation required to find out a bunch of words.  With Span (or Memory) we can create slices of string which are views of the same string and are not a new string.

To do the task below with less allocation, we can convert our input to a ReadOnlyMemory and loop over it; slicing every time, we encounter a new split character.

[Benchmark]
public void Raw()
{
    char split = ' ';
    ReadOnlySpan span = sentence.AsSpan();
    int oldIndex = 0;
    for (int i = 0; i < span.Length; i++)
    {
        if (split == span[i])
        {
            var slicedSpan = span.Slice(oldIndex, i - oldIndex + 1);
            oldIndex = i + 1;
            // Do something with the slicedSpan
        }
    }
}

Let's walk through the code above.

In line 5, we create a ReadOnlySpan out of string. In the following for loop, we keep on looking for the split character. As soon as we find one, we take a slice of the span from the start index of the word, and of length = current index (i) – startIndex of the word +1.

We then print the sliced word. As you will see in the benchmark later, this approach has zero allocations and takes ~4ms of more time.

When coding regularly, we might want developers to reuse the code. We might want to provide this as a string extension that returns an IEnumerable<ReadOnlySpan>. I would implement something like.

public class StringExtension {
    public static IEnumerable<ReadOnlySpan> Split(this string input, char split)
    {
        ///...
    }   
}

However, there is a catch here. Span or ReadOnlySpan is a ref struct, thus it can not be promoted to the heap and hence the above code will show an error and will not compile. So let us use ReadOnlyMemory, which does not have such restrictions. So I hurriedly converted ReadOnlySpan to ReadOnlyMemory as below.

public static class StringExtensions
{
    public static IEnumerable<ReadOnlyMemory> Split(this string input, char split)
    {
        ReadOnlyMemory<char> span = input.AsMemory();
        int oldIndex = 0;
        for (int i = 0; i < span.Length; i++)
        {
            if (split == span.Span[i])
            {
                var slicedMemory = span.Slice(oldIndex, i - oldIndex + 1);
                oldIndex = i + 1;
                yield return slicedMemory;
            }
        }
    }
}

With the confidence that I should see zero allocations, I ran the benchmark tests and found that it still allocates 88 bytes.  I am not able to account for 88 bytes. I suspect that it could be due to IEnumerable but that will a part of a later post.

In case, that IEnumerable is the issue, we can still avoid it. We can create an extension that takes a delegate and a state object. We can then pass the state object and the slice to the delegate every time we encounter it.

public static void SplitExByAction(this string input, char split, T state, Action action)
{
ReadOnlyMemory span = input.AsMemory();
int oldIndex = 0;
for (int i = 0; i < span.Length; i++)
{
if (split == span.Span[i])
{
var slicedMemory = span.Slice(oldIndex, i – oldIndex + 1);
oldIndex = i + 1;
action(state, slicedMemory);
}
}
}
In this case, we are passing our state as a method to the parameter and then passing it to the delegate. We are doing this to avoid closure that is usually responsible for creating objects.

Yes, we have now achieved our target of splitting string with zero cost. You can find all the classes and the result in the gist below.

Conclusion

The Raw method of doing string split is the fastest method. But there is a tradeoff with readability. Such coding can be done in critical parts of code.  With a little bit more time we can achieve the same results, by passing in a delegate.

If we want, we can avoid boxing and unboxing calls which could be responsible for many Gen 0 allocations.

If you find any errors or have any comments, please mention them so that I can correct those.

Decorator pattern in F#

Idea behind decorator pattern is to execute some wrapper code or adorn a function call.

For example, we can do logging before and after the function call. We can audit the number of times a function is called which can be useful for testing. We can implement policies on function calls such as retry, timeout etc.

Let us take a simple example of logging pre and post a function call.
We will create a generic logger that takes a function and a value. This generic logger will print a pre message, call the function and print a post message

//('a->'b)->'a->'b
let logger (f: 'a->'b) (valA:'a) =
printfn "Pre"
let result = f valA
printfn "Post"
result

Lets create a simple function that adds 5 to the input

// int->int
let add5To x =
printfn "calling add5To"
x+5

Now lets create a decorated function for add5To function, execute it with a parameter 12 and pipe the output to printfn

let decoratedAdd = logger add5To

decoratedAdd 12
|>printfn "%d"

The output is as expected:

Pre
calling add5To
Post
17

So far so good. Now time to get a little bit ambitious.
Lets create a function that takes 2 values and adds them.

//int->int->intS
let add x y =
printfn "calling add"
x + y

No brainer isn’t it. Now lets create a decorated function and call it with 2 parameters

// int->int->int
let decoratedAdder = logger add

decoratedAdder 1 4
|>printfn "%d"

The output is:

Pre
Post
calling add
5

What just happened? Why did calling add come after the Post.

If you see the type annotation on the logger it is (‘a->’b’)->’a->’b. Which means it takes a function that takes one parameter of type 'a and outputs a value of type 'b. The function add having the signature of int->int->int when added to the logger becomes of type int->(int->int), i.e. for the logger 'a is an int and 'b is a function from int->int.

When we call decoratedAdder 1 4 it gets evaluated as (decoratedAdder 1) which does pre and post logging and returns a function of type int-&gt;int and 4 is applied to this function which results in calling of the message calling add after the pre and post messages.

Phew!! What did we learn here. We learned that the signature of the decorator after partial application of target function should be the same as target function for it to behave correctly. If you created a logger that takes a function of 2 parameters, you will see the correct output

//('a->'b->'c)->'a->'b->'c
let twoParamedLogger (f:'a->'b->'c) (a:'a) (b:'b) =
printfn "Pre"
let result = f a b
printfn "Post"
result

// int->int->int
let decoratedAdd2 = twoParamedLogger add

decoratedAdd2 1 4
|> printfn "%d"
//Pre
//calling add
//Post
//5

We figured that in F# we can create decorators to wrap other functions, but we can only create correctly typed decorators. As it is F#’s strength is its type system. You can find the full script in the gist below:

Posted in F#

Memento Pattern in F#

 

Memento pattern allows to store state of an object so that it can be restored later. Lets see how to implement Memento pattern in F#.

To hold the state of the object we create a state type in F# as follows

type State = {State: State option; CurrentItem: 'T}

In the State type, the State property refers to old state and CurrentItem refers to the current object.

Additionally, we create a few helper functions createState, updateState and getPreviousState to help us create initial State, transition to a new State and get the old state respectively.

//'T -> State
let createState (instance :'T) =
    {State=None; CurrentItem= instance}

createState function takes an object and returns a new state for that object. Since, it does not have any previous state its State is None

//('a->'a)->State->State
let updateState modifier currentState  =
    let newObject = modifier currentState.CurrentItem
    { State =Some currentState; CurrentItem = newObject}

Next we have an updateState function that takes a function modifier. In F# records are immutable. We should create new instances instead of updating them. modifier functions does just that, it takes an instance and returns modified new instance.

In the updateState function we create a new state and set the State to currentState as old State.

//State->State    
let getPreviousState state =
    match state.State with
    | Some x -> x
    | None -> failwith "No Old state"

To get previous state, we need to pattern match the State and get the State’s CurrentItem to get the previous item.

Lets see how we can use it.

First we create a Person type who has a property Name of type string

type Person = {Name:string}

We also create a modifier function that takes a new name, a Person object and returns new Person object with the parameter name

// string->Person->Person
let changeNameTo name person =
    {person with Name=name}

Now we can finally use it as follows:

let person = {Name="Jigar"}
let finalState = createState person
                    |> updateState (changeNameTo "GitHub")

printfn "Current Name: %s" finalState.CurrentItem.Name
// Current Name: GitHub

let previousState = getPreviousState finalState
printfn "Previous Name: %s" previousState.CurrentItem.Name
// Previous Name: Jigar

For the complete script please see the gist.

Show Modal Dialog in WinForms for a background task

Many a time, we need to force a user to wait on a modal dialog while we complete our background task.

General Google search does not give a good results for this problem. However, this feature can be achieved as illustrated below:

I have created a WinForms application and added Form1.cs and WaitDialog.cs. Form1 hosts a button

On click event of the button, invokes LongRunningTask. It then passes the Task object to WaitDialog.

The WaitDialog registers a continuation on the task and sets the DialogResult. As soon as the DialogResult is set, the ShowDialog() will complete and the control will resume and show a message box.

Hence we can use Task Continuation to set the DialogResult and wait for a long running background task.

If you have any better approach. Please reply in the comments.

Loading Angular templates from an iframe proxy

Recently, I had to develop a custom SharePoint 2013 Metadata Navigation component that with the same look and feel as OOB component. I choose to develop a SharePoint 2013 Provider hosted add-in and enable my component by programmatically adding a Script Editor webpart into the list views. I used Angular 1.5 for the UI and interaction.

With the Metadata Navigation Component, I had to quickly show the complete tree of Metadata Navigation (along with their count) from database. I hated to query the template, load the data via Ajax and then ask Angular to compile the template into the view.

I thought it is better to pre-render the data with the template on server, so that the time for component to render on the browser is reduced. Plus, the HTML output can be cached by using MVC OutputCache attribute and be reused for multiple users. Also, all this communication took place over an iframe proxy so that the user remains authenticated on the provider hosted add-in.

Whenever, we ask Angular to load a template, $templateRequest service is invoked. It makes an HTTP call and puts the template into the $templateCache before forwarding to the required component. Generally, we can only specify the http headers that are being used by the $templateRequestProvider.

Now came the challenge as I needed to request the template to a proxy iframe using postmessage. The proxy page then requests the backend for the template. I needed to provide my own template loading functionality.

To do this, we need to use the Angular decorator pattern which allows us to modify or replace the behaviour of the existing Angular Services

As you can see in the gist, I have replaced the $templateRequest function with my own. In the function I will make a call to postMessage, receive the data and load into the $templateCache before resolving the promise that I have returned.

Have a better way to solve this problem? Please share your thoughts.