Few days ago I have open-sourced the NGinn-MessageBus project at http://code.google.com/p/nginn-messagebus/. NGinn MessageBus is an implementation of so called ‘Service Bus’, that is a message-oriented, asynchronous communication framework for distributed applications. I have implemented NGinn MessageBus as the infrastructure for the NGinn BPM project, but since it has evolved into quite independent component I decided to publish it as a separate project. NGinn MessageBus has functionality similar to other open-source service buses for .Net (like NServiceBus, MassTransit or Rhino ServiceBus) but is smaller and simpler and, what is most important, uses SQL Server for storing message queues.
NGinn MessageBus provides publish-subscribe message distribution and supports transactional and durable messages only (as for now, because it’s still in development). All messages and subscriber information is persisted in SQL database and no other message transport mechanism is used. A message queue in NGinn MessageBus is simply a database table where messages are written and read in FIFO (more or less) manner. You might wonder why I decided to use SQL instead of well known messaging implementations like MSMQ, and here are the reasons:
- All application data in single database. As NGinn.BPM stores process state in SQL database it made a perfect sense to have all other data, including queued messages, in the same database. It simplifies maintaninance, backups and other management tasks.
- NGinn MessageBus provides ‘scheduled message’ functionality allowing you to schedule a message to be delivered at specified moment in future. This function is heavily used by NGinn BPM and there are lots of scheduled messages sitting in database and waiting to be delivered. It’s easy to handle this in a random-access database and very difficult to do it when you have a FIFO queue with sequential access only.
- The performance is good, in some situations much better than what I was able to get with MSMQ. For example, sending messages in a distributed transaction is faster in NGinn MessageBus than in MSMQ. NGinn BPM uses distributed transactions everywhere so this was important.
NGinn MessageBus is built around the concept of an Endpoint. Endpoint is an address to where messages can be delivered or from where they can be sent. Endpoint corresponds to a single message queue, with the same name (actually, the queue name is the endpoint). Usually you have one endpoint per process and in case of distributed applications each process has its own endpoint, and hence its own queue. If you have a single process you are using NGinn Message Bus in a ‘local’ mode, where messages are sent and received in the same queue. To extend it to distributed mode you have to set up several endpoints and provide message distribution rules by subscribing. One endpoint can subscribe at another endpoint for messages of particular type. After subscription is done all messages published at the publisher endpoint will be delivered to that endpoint locally and to all endpoints that have subscribed for particular message type.
NGinn Message Bus is built in a decentralized architecture – there is no central message router component, each pair of endpoints can communicate directly independent of any other endpoints.
From the application perspective you access NGinn MessageBus through the IMessageBus interface that provides functions for sending messages and also manages the process of receiving incoming messages and dispatching them to their handlers. NGinn Message Bus uses an IOC container to resolve message handlers so you have to register your message handling components in the container. Currently NGinn MessageBus project provides a configuration tool to set it up with the Castle Windsor container, but it’s possible to use other containers as well.
Some code examples
First of all, let’s see how to configure NGinn Message Bus with the Castle Windsor container. The source code contains a test project with very similar example:
Dictionary connStrings = new Dictionary();
connStrings["testdb1"] = "Data Source=(local);Initial Catalog=NGinn;User Id=nguser;Password=PASS";
connStrings["testdb2"] = "Data Source=(local);Initial Catalog=NGinn;User Id=nguser;Password=PASS";
IWindsorContainer wc1 = ConfigureMessageBus("sql://testdb1/MQueue1", connStrings);
IWindsorContainer wc2 = ConfigureMessageBus("sql://testdb2/MQueue2", connStrings);
IMessageBus mb1 = wc1.Resolve();
mb1.SubscribeAt("sql://testdb2/MQueue2", typeof(TestMessage1));
IMessageBus mb2 = wc2.Resolve();
The code above configures two instances of message bus component, for two different endpoints. Note that in order to configure a message bus endpoint we must specify the endpoint name in the form of sql://[connection alias]/[table name] and give the list of connection string aliases. Here we have two connection string aliases (testdb1 and testdb2), both pointing to the same database. So in the database we will have two message queue tables created, named MQueue1 and MQueue2 (NGinn MessageBus will create them automatically if the credentials provided in connection string allow for table creation).
There is also a call to mb1.SubscribeAt(“endpoint”, Message type) that tells message bus 1 (represented by sql://testdb1/MQueue1) to subscribe at endpoint sql://testdb2/MQueue2 to all messages of type TestMessage1.
And here is the ‘ConfigureMessageBus’ method code that performs actual configuration:
static IWindsorContainer ConfigureMessageBus(string endpointName, IDictionary dbConnectionStrings)
{
IWindsorContainer wc = Configure.Begin()
.SetConnectionStrings(dbConnectionStrings)
.SetEndpoint(endpointName)
.UseSqlSubscriptions()
.AddMessageHandler(typeof(TestHandler))
.BuildContainer();
return wc;
}
Now, after we have configured the message bus here is how to send some messages:
IMessageBus mb2 = wc2.Resolve();
using (System.Transactions.TransactionScope ts = new System.Transactions.TransactionScope())
{
for (int i = 0; i < 10; i++)
{
mb2.NewMessage(new TestMessage1("M" + i))
.SetDeliveryDate(DateTime.Now.AddSeconds(1 + i))
.SetLabel("Test XX" + i)
.Publish();
}
ts.Complete();
}
We open a transaction and then publish 10 messages. For each message we specify the delivery date (optional) and label (also optional). Messages are published to local endpoint, then they will be distributed according to currently configured subscriptions.
Finally, some code showing how to set up a message handler. Please take look above at the ConfigureMessageBus snippet. You’ll notice the line
.AddMessageHandler(typeof(TestHandler))
which registers a message handler component. This component looks like this:
public class TestHandler : IMessageConsumer {
private Logger log;
public TestHandler()
{
log = LogManager.GetLogger("TestHandler_" + this.GetHashCode());
}
#region IMessageConsumer Members
public void Handle(TestMessage1 message)
{
log.Info("TestMessage1 {0} ARRIVED", message.Id);
System.Threading.Thread.Sleep(1000);
}
#endregion
As you can see, message handler has to implement IMessageConsumerinterface which defines the ‘Handle’ method. Single component can implement the IMessageConsumer interface multiple times to be able to handle multiple types of messages.
And basically that’s it.