In my previous blog entry I've discussed the problem of using a single envelope-like data type in a data bus. I've presented a nice solution that uses topic-based subscriptions available in Rebus.FM framework. I've also promised to show how to do the same in MassTransit.
The problem with MassTransit is that you need a clever trick to set up all the bindings as MT doesn't support topic based subscriptions that easy. The trick is to have two layers of exchanges:
- the first layer of exchanges consists of a single exchange that is used by the publisher to put messages into the RabbitMQ
- the second layer of exchanges consists of multiple exchanges bound to the single exchange from the first layer by routing keys
To get this, we need models
public class MessageEnvelope { // will help to build routing keys public string MessageTypeName { get; set; } // the payload serialized to JSON public string Content { get; set; } public MessageEnvelope() { } public MessageEnvelope(object payload) { if (payload == null) throw new ArgumentNullException(); this.Content = JsonConvert.SerializeObject(payload); this.MessageTypeName = payload.GetType().Name; } public T GetPayloadas well as the handler. Note that the handler is almost identical to the one from the previous blog entry about Rebus.FM. In fact, the only notable difference is naming of the interface you implement.() { if (string.IsNullOrEmpty(this.Content)) throw new ArgumentException(); return JsonConvert.DeserializeObject (this.Content); } } public static class EnvelopedExtensions { public static string ToJson(this object payload) { if (payload == null) throw new ArgumentNullException(); return JsonConvert.SerializeObject(payload); } } public class Foo { public string Data { get; set; } } public class Bar { public string Something { get; set; } }
public class Enveloped_Handler : IConsumerThen, the actual code that setups the exchanges, queues and bindings and publishes messages.{ private string agent { get; set; } public Enveloped_Handler(string agent) { this.agent = agent; } public async Task Consume(ConsumeContext context) { var message = context.Message; await Console.Out.WriteLineAsync($"{message.MessageTypeName} received by {this.agent}\r\n{message.ToJson()}"); } }
class Program { static IBusControl ConfigureActivator(string queueName, int workers = 1) { var bus = Bus.Factory.CreateUsingRabbitMq(cfg => { var host = cfg.Host(new Uri("rabbitmq://localhost"), h => { h.Username("username"); h.Password("password"); }); if (queueName == "publisher") { // take routing keys from the property cfg.Send(x => x.UseRoutingKeyFormatter(context => context.Message.MessageTypeName )); // publish to this exchange cfg.Message (x => x.SetEntityName("MessageEnvelope")); // use direct binding between the exchanges cfg.Publish (x => { x.ExchangeType = ExchangeType.Direct; }); } else { cfg.ReceiveEndpoint(host, queueName, e => { e.PrefetchCount = (ushort)workers; switch (queueName) { case "subscriber1": e.BindMessageExchanges = false; e.Bind("MessageEnvelope", x => { x.ExchangeType = ExchangeType.Direct; x.RoutingKey = "Foo"; }); e.Consumer(() => new Enveloped_Handler("agent1")); break; case "subscriber2": e.BindMessageExchanges = false; e.Bind("MessageEnvelope", x => { x.ExchangeType = ExchangeType.Direct; x.RoutingKey = "Foo"; }); e.Bind("MessageEnvelope", x => { x.ExchangeType = ExchangeType.Direct; x.RoutingKey = "Bar"; }); e.Consumer(() => new Enveloped_Handler("agent2")); break; } //e.con }); } }); bus.Start(); return bus; } static async Task Work() { var publisherBus = ConfigureActivator("publisher"); var subscriber1Bus = ConfigureActivator("subscriber1", 1); var subscriber2Bus = ConfigureActivator("subscriber2", 1); Console.WriteLine("publishing"); await publisherBus.Publish(new MessageEnvelope( CreateFoo()) ); await publisherBus.Publish(new MessageEnvelope( CreateBar()) ); Console.WriteLine("published"); Console.WriteLine("Press enter to quit"); Console.ReadLine(); } static Foo CreateFoo() { return new Foo() { Data = "hello foo world" }; } static Bar CreateBar() { return new Bar() { Something = "and this is bar" }; } static void Main(string[] args) { Work().Wait(); } }
The output of this code will be
publishing published Press enter to quit Foo received by agent2 {"MessageTypeName":"Foo","Content":"{\"Data\":\"hello foo world\"}"} Foo received by agent1 {"MessageTypeName":"Foo","Content":"{\"Data\":\"hello foo world\"}"} Bar received by agent2 {"MessageTypeName":"Bar","Content":"{\"Something\":\"and this is bar\"}"}
If there's a simpler way to have this effect in MassTransit, in particular, if it's possible to have topic based subscriptions between exchanges and queues, with no this extra
layer of exchanges, please let me know in the comments section.
No comments:
Post a Comment