Kevin Kallberg

The life and times of a software developer

NAVIGATION - SEARCH

Achieving Delayed Messaging in ServiceStack with RabbitMQ and EasyNetQ

DelayedMessageStackOfLove

Recently I was looking for a good, durable way to schedule web events for a project at work.  The project requires high throughput, so I needed a fast, scalable solution for persisting these events.  As fortune would have it, about a month before the project began, a new plugin was announced for RabbitMQ that supported delayed messaging.  A message queue seemed like it might be a good fit for the project needs, so I decided to take a look at it.

Picking the pieces

At work, we use (the fantastic) ServiceStack web service framework.  We’ve used RabbitMQ in the past, and the plugin for ServiceStack makes it very easy to publish and consume messages.  However, ServiceStack’s RabbitMQ plugin is very opinionated and does not leave a lot of room for customization without going down to the RabbitMQ.Client library itself.  I wanted something that would allow me to retain a lot of the simplicity that the ServiceStack RabbitMQ plugin offers but provide further points of configuration.

Enter, EasyNetQ

EasyNetQ_LogoEasyNetQ is an open source client API for RabbitMQ on .NET.  It has over 100,000 downloads on NuGet.org and over 1,200 commits across 70+ contributors on GitHub.  Not only does the health of the project look great, but EasyNetQ also comes with a feature that I was specifically looking for: scheduling messages.  EasyNetQ has long supported scheduled messages through the use of a companion Windows service.  However, as of late April, 2015, support for the new delayed exchange plugin for RabbitMQ was added.

Putting it all together

Now that the pieces have been assembled, it is time to tie them all together.  The project is simple.  It consists of only a pair of services with a single operation each, a pair of POCOs and the AppHost, which initializes ServiceStack and the other components.

The Message

With RabbitMQ, we can simply create a POCO to push into a message queue.  For our sample, our POCO contains just two DateTime properties.  The Queue attribute comes courtesy of EasyNetQ and allows us to specify the name of the queue and exchange that are created in RabbitMQ.

[Queue("DelayedMessageSampleQueue", ExchangeName = "DelayedMessageSampleExchange")]
public class DelayedMessage
{
    public DateTime PublishTime { get; set; }
    public DateTime SentTime { get; set; }
}

The Services

The project contains two services: one to publish messages and one to consume them.  The PublishService is a simple ServiceStack service that responds at /publish.  A single parameter, MillisecondsDelay, can be specified on either the query string or in the post body.  A new DelayedMessage will be created and the PublishTime set.  The service will then get a reference to EasyNetQ’s IBus interface from ServiceStack’s IoC container and use it to publish the message.  Note the use of the FuturePublish method.

[Route("/publish")]
public class PublishRequest : IReturnVoid
{
    public double MillisecondDelay { get; set; }
}

public class PublishService : Service
{
    public void Any(PublishRequest request)
    {
        var message = new DelayedMessage { PublishTime = DateTime.UtcNow };

        var bus = HostContext.Container.Resolve<IBus>();
        bus.FuturePublish(TimeSpan.FromMilliseconds(request.MillisecondDelay), message);
    }
}

The DelayedMessageService is responsible for consuming the message queue when the message delay is reached.  The service sets the SentTime property on the message and then writes a message to the Debug output.

public class DelayedMessageService : Service
{
    // This is the queue consumer for the DelayedMessage queue.
    public void Any(DelayedMessage message)
    {
        message.SentTime = DateTime.UtcNow;

        Debug.WriteLine("{0}New Message Recieved!{0}\tPublished: {1}{0}\tSent: {2}{0}", 
            Environment.NewLine, 
            message.PublishTime, 
            message.SentTime
        );
    }
}

The AppHost

ServiceStack’s AppHost is where initialization of the message queue takes place.  EasyNetQ is registered using its static RabbitHutch.CreateBus method.  Note the registration of the IScheduler interface with a DelayedExchangeScheduler.  This is where we tell EasyNetQ to engage the RabbitMQ delayed exchange plugin.  Next, we configure the message queue to deliver our delayed messages to the DelayedMessageService we looked at above.  To do so, we use the IBus interface’s Subscribe method.  We pass an empty string as the subscriptionId parameter (since we’re using the Queue attribute mentioned previously) and an anonymous method for the callback.  When a delayed message is ready to be sent it is dequeued and enters the Subscribe callback.  Here, we convert the POCO message to an IMessage<T> and pass it to ServiceStack's ServiceController.ExecuteMessage method which delivers it to our service.

public class AppHost : AppHostBase
{
    ... snip ...

    /// <summary>
    /// Application specific configuration
    /// This method should initialize any IoC resources utilized by your web service classes.
    /// </summary>
    /// <param name="container"></param>
    public override void Configure(Container container)
    {
        var connectionConfig = new ConnectionConfiguration
        {
            AMQPConnectionString = new Uri("amqp://localhost:5672/"),
            UserName = "guest",
            Password = "guest"
        };
        container.Register(RabbitHutch.CreateBus(connectionConfig,
            x => x.Register<IScheduler, DelayedExchangeScheduler>()));

        var bus = container.Resolve<IBus>();
        bus.Subscribe<DelayedMessage>(String.Empty, msg =>
        {
            var message = new ServiceStack.Messaging.Message<DelayedMessage>(msg);
            HostContext.ServiceController.ExecuteMessage(message);
        });

        this.RegisterService<PublishService>("/");
        this.RegisterService<DelayedMessageService>("/");
    }
}

We round out the AppHost by registering our two services.

Gotchas

There are a couple things to keep in mind when using this solution.

  • Requires RabbitMQ Server 3.4 or higher
  • Requires new delayed exchange plugin

Wrapping up

With this new plugin for RabbitMQ and the slick implementation offered by EasyNetQ, it is very straightforward to achieve delayed messaging in ServiceStack.  With just a little bit of configuration you tie right in to the existing infrastructure.  The sample code can be viewed / downloaded in its entirety on GitHub

blog comments powered by Disqus