Many a time, we might want to consume an API which is rate-limited. Well, if you are hitting the API far below the rate limit good for you. But if you want to squeeze out every penny this blog post might be for you.
For my case, there is a windows service listening periodically (every minute) to a MSMQ. Each MSMQ message contains a list of user Ids. These user Ids will be posted to a dummy API. My requirements are to call the API only 2 times per second.
Let’s set up the logistics of the service first. I have created a .NET Framework 4.8 console application and used Topshelf to do service management. I have created an Orchestrator
class that has equivalent of service start and stop events. The Start and Stop event will start a timer and stop it respectively.
public class Orchestrator{ Timer timer; // passed as delegate for ServiceConfigurator.WhenStarted public void Start() { timer.Start(); } // passed as delegate for ServiceConfigurator.WhenStopped public void Stop() { timer.Stop(); } }
When a timer is elapsed, a Timer_Elapsed handler is called. The event handler, reads a message from queue, deserializes the message to a list of integers and passes it to Publish(List<int> userIds)
method of MessageProcessor
class. Phew, logistics setup; lets move forward.
public Orchestrator(IApiClient client, string queuePath, int notificateRate) { this.client = client; messageQueue = new MessageQueue(queuePath); messageProcessor = new MessageProcessor(client, notificateRate); timer = new Timer(60 * 1000); timer.Elapsed += Timer_Elapsed; } private void Timer_Elapsed(object sender, ElapsedEventArgs e) { try { Message message = messageQueue.Receive(TimeSpan.FromSeconds(2), MessageQueueTransactionType.None); XmlMessageFormatter formatter = new XmlMessageFormatter(new Type[] { typeof(string) }); var intArray = formatter.Read(message); messageProcessor.Publish(JsonConvert.DeserializeObject<List<int>>(intArray.ToString())); } catch (MessageQueueException ex) { //.. Error handling } }
The MessageProcessor
class does the actual work of calling the ApiClient for each userId by calling ApiClient’s PostAsync(int userId)
method, so lets see how its Publish
method work.
public class MessageProcessor { private readonly IApiClient client; private readonly int requestsPerSecond; private readonly TimeSpan timeSpan; private IScheduler scheduler; public MessageProcessor(IApiClient client, int requestsPerSecond) : this(client, requestsPerSecond, ThreadPoolScheduler.Instance) { } public MessageProcessor(IApiClient client, int requestsPerSecond, IScheduler scheduler) { this.client = client; this.requestsPerSecond = requestsPerSecond; // per second timeSpan = TimeSpan.FromMilliseconds(1000.0 / requestsPerSecond); this.scheduler = scheduler; } public void Publish(List<int> userIds) { var subject = new Subject<int>(); subject // Zip takes an combines one from both the subject and interval // and picks up the new as per the zip function .Zip(Observable.Interval(timeSpan, scheduler), (int x, long y) => x) .Select(userId => Observable.FromAsync(() => SendNotification(userId), scheduler)) .Concat() .Subscribe(); userIds.ForEach(userId => subject.OnNext(userId)); subject.OnCompleted(); } public async Task SendNotification(int userId) { await client.PostAsync(userId); } }
We have created a Subject
which acts as an Observable
. We then apply Zip operator to the Subject. The Zip operator takes two Observables and combines one element from each Observable using the combine function and emits a single item. In our case our first Observable is the Subject itself and second observable is an Interval that emits an item twice per second. We then convert the observable to an async observable by calling the SendNotification
function. The concat and then subscribe completes the job.
After the subscription is setup, we push the integers into the subject and call complete on the Subject. Please note, if you do not complete the subject, you might be leaking memory.
To test the output, I create a dummy node.js server and timestamped the output. I ran the job for a list of 50 integers and here is the output.
[2019-11-10T06:59:52.935Z] /0 [2019-11-10T06:59:53.285Z] /1 [2019-11-10T06:59:53.785Z] /2 [2019-11-10T06:59:54.286Z] /3 [2019-11-10T06:59:54.786Z] /4 [2019-11-10T06:59:55.287Z] /5 [2019-11-10T06:59:55.787Z] /6 [2019-11-10T06:59:56.288Z] /7 [2019-11-10T06:59:56.789Z] /8 [2019-11-10T06:59:57.289Z] /9 [2019-11-10T06:59:57.790Z] /10 [2019-11-10T06:59:58.291Z] /11 [2019-11-10T06:59:58.791Z] /12 [2019-11-10T06:59:59.292Z] /13 [2019-11-10T06:59:59.793Z] /14
As you can see, except for the first message, all the other messages have arrived after at a difference of 500ms as expected. I suspect, first one might have taken time because of nodejs initialization. You can find the code on Github.
Wait, don’t go yet. Have we not yet finished the job. We have not written a most important piece of code: Unit Test. But how do we unit test something driven by time. Microsoft has provided us with Microsoft.Reactive.Testing nuget package which provides us the tools we need.
Most of the Observables, depend on a Scheduler to perform its task and take IScheduler
as a parameter. If we do not pass a Scheduler, a default one is used. The IScheduler
interface allows us to schedule an action. The .NET runtime provides us different types of Schedulers based on the context, for example WPF provides Dispatcher, ASP.NET provides ThreadPoolScheduler and so on.
In the MessageProcessor
class above, we have used a ThreadPoolScheduler as the default Scheduler for Interval observable. We have also created a overload for testing where we can inject a new Scheduler.
Here comes the user of Microsoft.Reactive.Testing nuget package. It provides us a TestScheduler
class that we can substitute for the ThreadPoolScheduler and do our testing. Here is the unit testing code.
[Fact] public void PublishTest() { // Arrange var scheduler = new TestScheduler(); var testUserIds = new List<int> { 1, 2, 3, 4, 5, 6, 7, 8, 9 }; MockApiClient client = new MockApiClient(); var ticks = TimeSpan.FromMilliseconds(500).Ticks; // Act MessageProcessor messageProcessor = new MessageProcessor(client, 2, scheduler); messageProcessor.Publish(testUserIds); // Assert for (int i = 0; i < testUserIds.Count; i++) { scheduler.AdvanceBy(ticks); Assert.Equal(testUserIds[i], client.Value); } } public class MockApiClient : IApiClient { public int Value = 0; public async Task<bool> PostAsync(int userId) { Value = userId; return await Task.FromResult(true); } }
We have created a MockApiClient
that is a mock of ApiClient
for testing purpose. In the PublishTest
method, we arrange the mocks, publish the data. Our assertion should be to verify that the messageProcessor
object is calling PostAsync
method of MockApiClient
every 500ms with the correct userId
.
In case, we would have passed ThreadPoolScheduler in place of TestScheduler, we would have to wait for 500ms for the next event of interval to be raised and consumed, which is not acceptable. The TestScheduler’s AdvanceBy(long ticks)
method allows us to move time ahead by the ticks passed. So we advanced our scheduler by the ticks (computed for 500ms) and assert that our mockApiclient’s PostAsync
method be called and the variable set. And unlike the actual Scheduler, TestScheduler’s clock remains at that position unless we explicitly move it forward.
The TestScheduler class some other functions which allow us to manipulate time:
- Start() it starts the Scheduler’s clock and keeps it running
- AdvanceTo(long ticks) moves the Schedulers clock to that absolute position
- Stop() stops the Scheduler’s clock.
Conclusion
Testing time based code will turn out to be difficult, unless right tools are used. Do you have any such experiences in writing and testing time based code, please let me know in the comments.
You can find the code on https://github.com/jigargandhi/throttler