On Tue, Sep 25, 2007 at 12:32:31PM +0000, David Holroyd wrote:
> So, I'm starting to learn C#, and am attempting to prototype some
> generic NMS bridging code that I can use to connect ActiveMQ and MSMQ
> implementations.
So, attached are some vaguely bridge-flavoured bits.
Any comments?
I suspect it should be doing its own threading, for instance?
--
http://david.holroyd.me.uk/
namespace Test
{
using NMS;
using System;
public class Bridge
{
private IConnectionFactory srcFactory;
private IConnectionFactory dstFactory;
private IConnection srcConn;
private IConnection dstConn;
private IMessageProducer producer;
private ISession dstSession ;
private MessageMapper mapper;
public Bridge(IConnectionFactory srcFactory,
IConnectionFactory dstFactory,
MessageMapper mapper)
{
this.srcFactory = srcFactory;
this.dstFactory = dstFactory;
this.mapper = mapper;
}
public void Connect()
{
srcConn = srcFactory.CreateConnection();
dstConn = dstFactory.CreateConnection();
}
public void Close()
{
srcConn.Close();
dstConn.Close();
}
public void Start(string srcQueueName, string dstQueueName)
{
ISession srcSession = srcConn.CreateSession();
dstSession = dstConn.CreateSession();
IQueue srcQueue = srcSession.GetQueue(srcQueueName);
IQueue dstQueue = dstSession.GetQueue(dstQueueName);
IMessageConsumer consumer = srcSession.CreateConsumer(srcQueue);
producer = dstSession.CreateProducer(dstQueue);
consumer.Listener += new MessageListener(OnMessage);
}
private void OnMessage(IMessage msg)
{
System.Console.WriteLine("Bridge processing one message");
try
{
producer.Send(mapper.map(msg, dstSession));
}
catch (Exception e)
{
System.Console.WriteLine(e.ToString());
}
System.Console.WriteLine("Bridge processing complete");
}
}
}
// vim:sw=4:sts=4
namespace Test
{
using NMS;
public interface MessageMapper
{
IMessage map(IMessage src, ISession dstSession);
}
}
namespace Test
{
using NMS;
using System;
public class SimpleMessageMapper : MessageMapper
{
public IMessage map(IMessage src, ISession dstSession)
{
IMessage dst;
if (src is ITextMessage) {
dst = mapText(src as ITextMessage, dstSession);
} else {
throw new Exception("Unhandled message type");
}
mapCommon(src, dst);
return dst;
}
private ITextMessage mapText(ITextMessage src, ISession dstSession)
{
ITextMessage dst = dstSession.CreateTextMessage(src.Text);
return dst;
}
private void mapCommon(IMessage src, IMessage dst)
{
dst.NMSCorrelationID = src.NMSCorrelationID;
dst.NMSExpiration = src.NMSExpiration;
dst.NMSPersistent = src.NMSPersistent;
dst.NMSPriority = src.NMSPriority;
dst.NMSReplyTo = src.NMSReplyTo;
if (src.NMSType == null)
{
System.Console.WriteLine("src.NMSType is null");
dst.NMSType = "Test";
}
else
{
dst.NMSType = src.NMSType;
}
mapHeaders(src.Properties, dst.Properties);
}
private void mapHeaders(IPrimitiveMap src, IPrimitiveMap dst)
{
foreach (string key in src.Keys)
{
dst[key] = src[key];
}
}
}
}
namespace Test {
using System;
using NMS;
//using ActiveMQ.Commands;
public class Test {
public static void Main(string[] args) {
IConnectionFactory amqFactory = new ActiveMQ.ConnectionFactory(new
Uri("tcp://192.168.9.162:61616"));
Console.WriteLine("Created AMQ fact");
IConnectionFactory msmqFactory = new MSMQ.ConnectionFactory();
Console.WriteLine("Created MSMQ fact");
MessageMapper mapper = new SimpleMessageMapper();
Bridge bridge = new Bridge(amqFactory, msmqFactory, mapper);
bridge.Connect();
Console.WriteLine("Connected bridge");
bridge.Start("FOO.BAR", @".\blater");
Console.WriteLine("Started bridge");
System.Threading.Thread.Sleep(10000);
}
}
}