fix for: https://issues.apache.org/activemq/browse/AMQNET-271
Project: http://git-wip-us.apache.org/repos/asf/activemq-nms-msmq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-nms-msmq/commit/48f3e707 Tree: http://git-wip-us.apache.org/repos/asf/activemq-nms-msmq/tree/48f3e707 Diff: http://git-wip-us.apache.org/repos/asf/activemq-nms-msmq/diff/48f3e707 Branch: refs/heads/1.4.x Commit: 48f3e707949d41bb18b37a675575b2a823ba176f Parents: be02596 Author: Timothy A. Bish <[email protected]> Authored: Sun Aug 29 18:44:38 2010 +0000 Committer: Timothy A. Bish <[email protected]> Committed: Sun Aug 29 18:44:38 2010 +0000 ---------------------------------------------------------------------- trunk/src/main/csharp/Connection.cs | 402 +++++++++--------- trunk/src/main/csharp/ConnectionFactory.cs | 212 +++++----- trunk/src/main/csharp/MessageConsumer.cs | 433 ++++++++++---------- trunk/src/main/csharp/MessageProducer.cs | 524 ++++++++++++------------ trunk/src/main/csharp/Session.cs | 464 +++++++++++---------- 5 files changed, 1058 insertions(+), 977 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-nms-msmq/blob/48f3e707/trunk/src/main/csharp/Connection.cs ---------------------------------------------------------------------- diff --git a/trunk/src/main/csharp/Connection.cs b/trunk/src/main/csharp/Connection.cs index 096e41f..13dda20 100644 --- a/trunk/src/main/csharp/Connection.cs +++ b/trunk/src/main/csharp/Connection.cs @@ -19,198 +19,212 @@ using System; namespace Apache.NMS.MSMQ { - /// <summary> - /// Represents a NMS connection MSMQ. Since the underlying MSMQ APIs are actually - /// connectionless, NMS connection in the MSMQ case are not expensive operations. - /// </summary> - /// - public class Connection : IConnection - { - private AcknowledgementMode acknowledgementMode = AcknowledgementMode.AutoAcknowledge; - private IMessageConverter messageConverter = new DefaultMessageConverter(); - - private IRedeliveryPolicy redeliveryPolicy; - private ConnectionMetaData metaData = null; - private bool connected; - private bool closed; - private string clientId; - - /// <summary> - /// Starts message delivery for this connection. - /// </summary> - public void Start() - { - CheckConnected(); - } - - /// <summary> - /// This property determines if the asynchronous message delivery of incoming - /// messages has been started for this connection. - /// </summary> - public bool IsStarted - { - get { return true; } - } - - /// <summary> - /// Stop message delivery for this connection. - /// </summary> - public void Stop() - { - CheckConnected(); - } - - /// <summary> - /// Creates a new session to work on this connection - /// </summary> - public ISession CreateSession() - { - return CreateSession(acknowledgementMode); - } - - /// <summary> - /// Creates a new session to work on this connection - /// </summary> - public ISession CreateSession(AcknowledgementMode mode) - { - CheckConnected(); - return new Session(this, mode); - } - - public void Dispose() - { - closed = true; - } - - /// <summary> - /// The default timeout for network requests. - /// </summary> - public TimeSpan RequestTimeout - { - get { return NMSConstants.defaultRequestTimeout; } - set { } - } - - public AcknowledgementMode AcknowledgementMode - { - get { return acknowledgementMode; } - set { acknowledgementMode = value; } - } - - public IMessageConverter MessageConverter - { - get { return messageConverter; } - set { messageConverter = value; } - } - - public string ClientId - { - get { return clientId; } - set - { - if(connected) - { - throw new NMSException("You cannot change the ClientId once the Connection is connected"); - } - clientId = value; - } - } - - /// <summary> - /// Get/or set the redelivery policy for this connection. - /// </summary> - public IRedeliveryPolicy RedeliveryPolicy - { - get { return this.redeliveryPolicy; } - set { this.redeliveryPolicy = value; } - } - - /// <summary> - /// Gets the Meta Data for the NMS Connection instance. - /// </summary> - public IConnectionMetaData MetaData - { - get { return this.metaData ?? (this.metaData = new ConnectionMetaData()); } - } - - /// <summary> - /// A delegate that can receive transport level exceptions. - /// </summary> - public event ExceptionListener ExceptionListener; - - /// <summary> - /// An asynchronous listener that is notified when a Fault tolerant connection - /// has been interrupted. - /// </summary> - public event ConnectionInterruptedListener ConnectionInterruptedListener; - - /// <summary> - /// An asynchronous listener that is notified when a Fault tolerant connection - /// has been resumed. - /// </summary> - public event ConnectionResumedListener ConnectionResumedListener; - - protected void CheckConnected() - { - if(closed) - { - throw new NMSException("Connection Closed"); - } - if(!connected) - { - connected = true; - // now lets send the connection and see if we get an ack/nak - // TODO: establish a connection - } - } - - public void Close() - { - Dispose(); - } - - public void HandleException(Exception e) - { - if(ExceptionListener != null && !this.closed) - { - ExceptionListener(e); - } - else - { - Tracer.Error(e); - } - } - - public void HandleTransportInterrupted() - { - Tracer.Debug("Transport has been Interrupted."); - - if(this.ConnectionInterruptedListener != null && !this.closed) - { - try - { - this.ConnectionInterruptedListener(); - } - catch - { - } - } - } - - public void HandleTransportResumed() - { - Tracer.Debug("Transport has resumed normal operation."); - - if(this.ConnectionResumedListener != null && !this.closed) - { - try - { - this.ConnectionResumedListener(); - } - catch - { - } - } - } - } + /// <summary> + /// Represents a NMS connection MSMQ. Since the underlying MSMQ APIs are actually + /// connectionless, NMS connection in the MSMQ case are not expensive operations. + /// </summary> + /// + public class Connection : IConnection + { + private AcknowledgementMode acknowledgementMode = AcknowledgementMode.AutoAcknowledge; + private IMessageConverter messageConverter = new DefaultMessageConverter(); + + private IRedeliveryPolicy redeliveryPolicy; + private ConnectionMetaData metaData = null; + private bool connected; + private bool closed; + private string clientId; + + /// <summary> + /// Starts message delivery for this connection. + /// </summary> + public void Start() + { + CheckConnected(); + } + + /// <summary> + /// This property determines if the asynchronous message delivery of incoming + /// messages has been started for this connection. + /// </summary> + public bool IsStarted + { + get { return true; } + } + + /// <summary> + /// Stop message delivery for this connection. + /// </summary> + public void Stop() + { + CheckConnected(); + } + + /// <summary> + /// Creates a new session to work on this connection + /// </summary> + public ISession CreateSession() + { + return CreateSession(acknowledgementMode); + } + + /// <summary> + /// Creates a new session to work on this connection + /// </summary> + public ISession CreateSession(AcknowledgementMode mode) + { + CheckConnected(); + return new Session(this, mode); + } + + public void Dispose() + { + closed = true; + } + + /// <summary> + /// The default timeout for network requests. + /// </summary> + public TimeSpan RequestTimeout + { + get { return NMSConstants.defaultRequestTimeout; } + set { } + } + + public AcknowledgementMode AcknowledgementMode + { + get { return acknowledgementMode; } + set { acknowledgementMode = value; } + } + + public IMessageConverter MessageConverter + { + get { return messageConverter; } + set { messageConverter = value; } + } + + public string ClientId + { + get { return clientId; } + set + { + if(connected) + { + throw new NMSException("You cannot change the ClientId once the Connection is connected"); + } + clientId = value; + } + } + + /// <summary> + /// Get/or set the redelivery policy for this connection. + /// </summary> + public IRedeliveryPolicy RedeliveryPolicy + { + get { return this.redeliveryPolicy; } + set { this.redeliveryPolicy = value; } + } + + private ConsumerTransformerDelegate consumerTransformer; + public ConsumerTransformerDelegate ConsumerTransformer + { + get { return this.consumerTransformer; } + set { this.consumerTransformer = value; } + } + + private ProducerTransformerDelegate producerTransformer; + public ProducerTransformerDelegate ProducerTransformer + { + get { return this.producerTransformer; } + set { this.producerTransformer = value; } + } + + /// <summary> + /// Gets the Meta Data for the NMS Connection instance. + /// </summary> + public IConnectionMetaData MetaData + { + get { return this.metaData ?? (this.metaData = new ConnectionMetaData()); } + } + + /// <summary> + /// A delegate that can receive transport level exceptions. + /// </summary> + public event ExceptionListener ExceptionListener; + + /// <summary> + /// An asynchronous listener that is notified when a Fault tolerant connection + /// has been interrupted. + /// </summary> + public event ConnectionInterruptedListener ConnectionInterruptedListener; + + /// <summary> + /// An asynchronous listener that is notified when a Fault tolerant connection + /// has been resumed. + /// </summary> + public event ConnectionResumedListener ConnectionResumedListener; + + protected void CheckConnected() + { + if(closed) + { + throw new NMSException("Connection Closed"); + } + if(!connected) + { + connected = true; + // now lets send the connection and see if we get an ack/nak + // TODO: establish a connection + } + } + + public void Close() + { + Dispose(); + } + + public void HandleException(Exception e) + { + if(ExceptionListener != null && !this.closed) + { + ExceptionListener(e); + } + else + { + Tracer.Error(e); + } + } + + public void HandleTransportInterrupted() + { + Tracer.Debug("Transport has been Interrupted."); + + if(this.ConnectionInterruptedListener != null && !this.closed) + { + try + { + this.ConnectionInterruptedListener(); + } + catch + { + } + } + } + + public void HandleTransportResumed() + { + Tracer.Debug("Transport has resumed normal operation."); + + if(this.ConnectionResumedListener != null && !this.closed) + { + try + { + this.ConnectionResumedListener(); + } + catch + { + } + } + } + } } http://git-wip-us.apache.org/repos/asf/activemq-nms-msmq/blob/48f3e707/trunk/src/main/csharp/ConnectionFactory.cs ---------------------------------------------------------------------- diff --git a/trunk/src/main/csharp/ConnectionFactory.cs b/trunk/src/main/csharp/ConnectionFactory.cs index 24e895d..fe9beee 100644 --- a/trunk/src/main/csharp/ConnectionFactory.cs +++ b/trunk/src/main/csharp/ConnectionFactory.cs @@ -19,101 +19,119 @@ using Apache.NMS.Policies; namespace Apache.NMS.MSMQ { - /// <summary> - /// A Factory that can estbalish NMS connections to MSMQ - /// </summary> - public class ConnectionFactory : IConnectionFactory - { - public const string DEFAULT_BROKER_URL = "msmq://localhost"; - public const string ENV_BROKER_URL = "MSMQ_BROKER_URL"; - private Uri brokerUri; - private IRedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy(); - - public static string GetDefaultBrokerUrl() - { - string answer = Environment.GetEnvironmentVariable(ENV_BROKER_URL); - if(answer == null) - { - answer = DEFAULT_BROKER_URL; - } - return answer; - } - - public ConnectionFactory() - : this(GetDefaultBrokerUrl()) - { - } - - public ConnectionFactory(string brokerUri) - : this(brokerUri, null) - { - } - - public ConnectionFactory(string brokerUri, string clientID) - : this(new Uri(brokerUri), clientID) - { - } - - public ConnectionFactory(Uri brokerUri) - : this(brokerUri, null) - { - } - - public ConnectionFactory(Uri brokerUri, string clientID) - { - this.brokerUri = brokerUri; - } - - /// <summary> - /// Creates a new connection to MSMQ. - /// </summary> - public IConnection CreateConnection() - { - return CreateConnection(string.Empty, string.Empty, false); - } - - /// <summary> - /// Creates a new connection to MSMQ. - /// </summary> - public IConnection CreateConnection(string userName, string password) - { - return CreateConnection(userName, password, false); - } - - /// <summary> - /// Creates a new connection to MSMQ. - /// </summary> - public IConnection CreateConnection(string userName, string password, bool useLogging) - { - IConnection connection = new Connection(); - - connection.RedeliveryPolicy = this.redeliveryPolicy.Clone() as IRedeliveryPolicy; - return connection; - } - - /// <summary> - /// Get/or set the broker Uri. - /// </summary> - public Uri BrokerUri - { - get { return brokerUri; } - set { brokerUri = value; } - } - - /// <summary> - /// Get/or set the redelivery policy that new IConnection objects are - /// assigned upon creation. - /// </summary> - public IRedeliveryPolicy RedeliveryPolicy - { - get { return this.redeliveryPolicy; } - set - { - if(value != null) - { - this.redeliveryPolicy = value; - } - } - } - } + /// <summary> + /// A Factory that can estbalish NMS connections to MSMQ + /// </summary> + public class ConnectionFactory : IConnectionFactory + { + public const string DEFAULT_BROKER_URL = "msmq://localhost"; + public const string ENV_BROKER_URL = "MSMQ_BROKER_URL"; + private Uri brokerUri; + private IRedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy(); + + public static string GetDefaultBrokerUrl() + { + string answer = Environment.GetEnvironmentVariable(ENV_BROKER_URL); + if(answer == null) + { + answer = DEFAULT_BROKER_URL; + } + return answer; + } + + public ConnectionFactory() + : this(GetDefaultBrokerUrl()) + { + } + + public ConnectionFactory(string brokerUri) + : this(brokerUri, null) + { + } + + public ConnectionFactory(string brokerUri, string clientID) + : this(new Uri(brokerUri), clientID) + { + } + + public ConnectionFactory(Uri brokerUri) + : this(brokerUri, null) + { + } + + public ConnectionFactory(Uri brokerUri, string clientID) + { + this.brokerUri = brokerUri; + } + + /// <summary> + /// Creates a new connection to MSMQ. + /// </summary> + public IConnection CreateConnection() + { + return CreateConnection(string.Empty, string.Empty, false); + } + + /// <summary> + /// Creates a new connection to MSMQ. + /// </summary> + public IConnection CreateConnection(string userName, string password) + { + return CreateConnection(userName, password, false); + } + + /// <summary> + /// Creates a new connection to MSMQ. + /// </summary> + public IConnection CreateConnection(string userName, string password, bool useLogging) + { + IConnection connection = new Connection(); + + connection.RedeliveryPolicy = this.redeliveryPolicy.Clone() as IRedeliveryPolicy; + connection.ConsumerTransformer = this.consumerTransformer; + connection.ProducerTransformer = this.producerTransformer; + + return connection; + } + + /// <summary> + /// Get/or set the broker Uri. + /// </summary> + public Uri BrokerUri + { + get { return brokerUri; } + set { brokerUri = value; } + } + + /// <summary> + /// Get/or set the redelivery policy that new IConnection objects are + /// assigned upon creation. + /// </summary> + public IRedeliveryPolicy RedeliveryPolicy + { + get { return this.redeliveryPolicy; } + set + { + if(value != null) + { + this.redeliveryPolicy = value; + } + } + } + + private ConsumerTransformerDelegate consumerTransformer; + public ConsumerTransformerDelegate ConsumerTransformer + { + get { return this.consumerTransformer; } + set { this.consumerTransformer = value; } + } + + private ProducerTransformerDelegate producerTransformer; + public ProducerTransformerDelegate ProducerTransformer + { + get { return this.producerTransformer; } + set { this.producerTransformer = value; } + } + + } } http://git-wip-us.apache.org/repos/asf/activemq-nms-msmq/blob/48f3e707/trunk/src/main/csharp/MessageConsumer.cs ---------------------------------------------------------------------- diff --git a/trunk/src/main/csharp/MessageConsumer.cs b/trunk/src/main/csharp/MessageConsumer.cs index ee57b96..708a372 100644 --- a/trunk/src/main/csharp/MessageConsumer.cs +++ b/trunk/src/main/csharp/MessageConsumer.cs @@ -21,211 +21,230 @@ using Apache.NMS.Util; namespace Apache.NMS.MSMQ { - /// <summary> - /// An object capable of receiving messages from some destination - /// </summary> - public class MessageConsumer : IMessageConsumer - { - protected TimeSpan zeroTimeout = new TimeSpan(0); - - private readonly Session session; - private readonly AcknowledgementMode acknowledgementMode; - private MessageQueue messageQueue; - private event MessageListener listener; - private int listenerCount = 0; - private Thread asyncDeliveryThread = null; - private AutoResetEvent pause = new AutoResetEvent(false); - private Atomic<bool> asyncDelivery = new Atomic<bool>(false); - - public MessageConsumer(Session session, AcknowledgementMode acknowledgementMode, MessageQueue messageQueue) - { - this.session = session; - this.acknowledgementMode = acknowledgementMode; - this.messageQueue = messageQueue; - if(null != this.messageQueue) - { - this.messageQueue.MessageReadPropertyFilter.SetAll(); - } - } - - public event MessageListener Listener - { - add - { - listener += value; - listenerCount++; - StartAsyncDelivery(); - } - - remove - { - if(listenerCount > 0) - { - listener -= value; - listenerCount--; - } - - if(0 == listenerCount) - { - StopAsyncDelivery(); - } - } - } - - public IMessage Receive() - { - IMessage nmsMessage = null; - - if(messageQueue != null) - { - Message message; - - try - { - message = messageQueue.Receive(zeroTimeout); - } - catch - { - message = null; - } - - if(null == message) - { - ReceiveCompletedEventHandler receiveMsg = - delegate(Object source, ReceiveCompletedEventArgs asyncResult) { - message = messageQueue.EndReceive(asyncResult.AsyncResult); - pause.Set(); - }; - - messageQueue.ReceiveCompleted += receiveMsg; - messageQueue.BeginReceive(); - pause.WaitOne(); - messageQueue.ReceiveCompleted -= receiveMsg; - } - - nmsMessage = ToNmsMessage(message); - } - - return nmsMessage; - } - - public IMessage Receive(TimeSpan timeout) - { - IMessage nmsMessage = null; - - if(messageQueue != null) - { - Message message = messageQueue.Receive(timeout); - nmsMessage = ToNmsMessage(message); - } - - return nmsMessage; - } - - public IMessage ReceiveNoWait() - { - IMessage nmsMessage = null; - - if(messageQueue != null) - { - Message message = messageQueue.Receive(zeroTimeout); - nmsMessage = ToNmsMessage(message); - } - - return nmsMessage; - } - - public void Dispose() - { - Close(); - } - - public void Close() - { - StopAsyncDelivery(); - if(messageQueue != null) - { - messageQueue.Dispose(); - messageQueue = null; - } - } - - protected virtual void StopAsyncDelivery() - { - if(asyncDelivery.CompareAndSet(true, false)) - { - if(null != asyncDeliveryThread) - { - Tracer.Info("Stopping async delivery thread."); - pause.Set(); - if(!asyncDeliveryThread.Join(10000)) - { - Tracer.Info("Aborting async delivery thread."); - asyncDeliveryThread.Abort(); - } - - asyncDeliveryThread = null; - Tracer.Info("Async delivery thread stopped."); - } - } - } - - protected virtual void StartAsyncDelivery() - { - if(asyncDelivery.CompareAndSet(false, true)) - { - asyncDeliveryThread = new Thread(new ThreadStart(DispatchLoop)); - asyncDeliveryThread.Name = "Message Consumer Dispatch: " + messageQueue.QueueName; - asyncDeliveryThread.IsBackground = true; - asyncDeliveryThread.Start(); - } - } - - protected virtual void DispatchLoop() - { - Tracer.Info("Starting dispatcher thread consumer: " + this); - while(asyncDelivery.Value) - { - try - { - IMessage message = Receive(); - if(asyncDelivery.Value && message != null) - { - try - { - listener(message); - } - catch(Exception e) - { - HandleAsyncException(e); - } - } - } - catch(ThreadAbortException ex) - { - Tracer.InfoFormat("Thread abort received in thread: {0} : {1}", this, ex.Message); - break; - } - catch(Exception ex) - { - Tracer.ErrorFormat("Exception while receiving message in thread: {0} : {1}", this, ex.Message); - } - } - Tracer.Info("Stopping dispatcher thread consumer: " + this); - } - - protected virtual void HandleAsyncException(Exception e) - { - session.Connection.HandleException(e); - } - - protected virtual IMessage ToNmsMessage(Message message) - { - if(message == null) - { - return null; - } - return session.MessageConverter.ToNmsMessage(message); - } - } + /// <summary> + /// An object capable of receiving messages from some destination + /// </summary> + public class MessageConsumer : IMessageConsumer + { + protected TimeSpan zeroTimeout = new TimeSpan(0); + + private readonly Session session; + private readonly AcknowledgementMode acknowledgementMode; + private MessageQueue messageQueue; + private event MessageListener listener; + private int listenerCount = 0; + private Thread asyncDeliveryThread = null; + private AutoResetEvent pause = new AutoResetEvent(false); + private Atomic<bool> asyncDelivery = new Atomic<bool>(false); + + private ConsumerTransformerDelegate consumerTransformer; + public ConsumerTransformerDelegate ConsumerTransformer + { + get { return this.consumerTransformer; } + set { this.consumerTransformer = value; } + } + + public MessageConsumer(Session session, AcknowledgementMode acknowledgementMode, MessageQueue messageQueue) + { + this.session = session; + this.acknowledgementMode = acknowledgementMode; + this.messageQueue = messageQueue; + if(null != this.messageQueue) + { + this.messageQueue.MessageReadPropertyFilter.SetAll(); + } + } + + public event MessageListener Listener + { + add + { + listener += value; + listenerCount++; + StartAsyncDelivery(); + } + + remove + { + if(listenerCount > 0) + { + listener -= value; + listenerCount--; + } + + if(0 == listenerCount) + { + StopAsyncDelivery(); + } + } + } + + public IMessage Receive() + { + IMessage nmsMessage = null; + + if(messageQueue != null) + { + Message message; + + try + { + message = messageQueue.Receive(zeroTimeout); + } + catch + { + message = null; + } + + if(null == message) + { + ReceiveCompletedEventHandler receiveMsg = + delegate(Object source, ReceiveCompletedEventArgs asyncResult) { + message = messageQueue.EndReceive(asyncResult.AsyncResult); + pause.Set(); + }; + + messageQueue.ReceiveCompleted += receiveMsg; + messageQueue.BeginReceive(); + pause.WaitOne(); + messageQueue.ReceiveCompleted -= receiveMsg; + } + + nmsMessage = ToNmsMessage(message); + } + + return nmsMessage; + } + + public IMessage Receive(TimeSpan timeout) + { + IMessage nmsMessage = null; + + if(messageQueue != null) + { + Message message = messageQueue.Receive(timeout); + nmsMessage = ToNmsMessage(message); + } + + return nmsMessage; + } + + public IMessage ReceiveNoWait() + { + IMessage nmsMessage = null; + + if(messageQueue != null) + { + Message message = messageQueue.Receive(zeroTimeout); + nmsMessage = ToNmsMessage(message); + } + + return nmsMessage; + } + + public void Dispose() + { + Close(); + } + + public void Close() + { + StopAsyncDelivery(); + if(messageQueue != null) + { + messageQueue.Dispose(); + messageQueue = null; + } + } + + protected virtual void StopAsyncDelivery() + { + if(asyncDelivery.CompareAndSet(true, false)) + { + if(null != asyncDeliveryThread) + { + Tracer.Info("Stopping async delivery thread."); + pause.Set(); + if(!asyncDeliveryThread.Join(10000)) + { + Tracer.Info("Aborting async delivery thread."); + asyncDeliveryThread.Abort(); + } + + asyncDeliveryThread = null; + Tracer.Info("Async delivery thread stopped."); + } + } + } + + protected virtual void StartAsyncDelivery() + { + if(asyncDelivery.CompareAndSet(false, true)) + { + asyncDeliveryThread = new Thread(new ThreadStart(DispatchLoop)); + asyncDeliveryThread.Name = "Message Consumer Dispatch: " + messageQueue.QueueName; + asyncDeliveryThread.IsBackground = true; + asyncDeliveryThread.Start(); + } + } + + protected virtual void DispatchLoop() + { + Tracer.Info("Starting dispatcher thread consumer: " + this); + while(asyncDelivery.Value) + { + try + { + IMessage message = Receive(); + if(asyncDelivery.Value && message != null) + { + try + { + listener(message); + } + catch(Exception e) + { + HandleAsyncException(e); + } + } + } + catch(ThreadAbortException ex) + { + Tracer.InfoFormat("Thread abort received in thread: {0} : {1}", this, ex.Message); + break; + } + catch(Exception ex) + { + Tracer.ErrorFormat("Exception while receiving message in thread: {0} : {1}", this, ex.Message); + } + } + Tracer.Info("Stopping dispatcher thread consumer: " + this); + } + + protected virtual void HandleAsyncException(Exception e) + { + session.Connection.HandleException(e); + } + + protected virtual IMessage ToNmsMessage(Message message) + { + if(message == null) + { + return null; + } + + IMessage converted = session.MessageConverter.ToNmsMessage(message); + + if(this.ConsumerTransformer != null) + { + IMessage newMessage = ConsumerTransformer(this.session, this, message); + if(newMessage != null) + { + converted = newMessage; + } + } + + return converted; + } + } } http://git-wip-us.apache.org/repos/asf/activemq-nms-msmq/blob/48f3e707/trunk/src/main/csharp/MessageProducer.cs ---------------------------------------------------------------------- diff --git a/trunk/src/main/csharp/MessageProducer.cs b/trunk/src/main/csharp/MessageProducer.cs index 26598d3..5912c07 100644 --- a/trunk/src/main/csharp/MessageProducer.cs +++ b/trunk/src/main/csharp/MessageProducer.cs @@ -19,258 +19,274 @@ using System.Messaging; namespace Apache.NMS.MSMQ { - /// <summary> - /// An object capable of sending messages to some destination - /// </summary> - public class MessageProducer : IMessageProducer - { - - private readonly Session session; - private Destination destination; - - //private long messageCounter; - private MsgDeliveryMode deliveryMode; - private TimeSpan timeToLive; - private MsgPriority priority; - private bool disableMessageID; - private bool disableMessageTimestamp; - - private MessageQueue messageQueue; - - public MessageProducer(Session session, Destination destination) - { - this.session = session; - this.destination = destination; - if(destination != null) - { - messageQueue = openMessageQueue(destination); - } - } - - private MessageQueue openMessageQueue(Destination dest) - { - MessageQueue rc = null; - try - { - if(!MessageQueue.Exists(dest.Path)) - { - // create the new message queue and make it transactional - rc = MessageQueue.Create(dest.Path, session.Transacted); - this.destination.Path = rc.Path; - } - else - { - rc = new MessageQueue(dest.Path); - this.destination.Path = rc.Path; - if(!rc.CanWrite) - { - throw new NMSSecurityException("Do not have write access to: " + dest); - } - } - } - catch(Exception e) - { - if(rc != null) - { - rc.Dispose(); - } - - throw new NMSException(e.Message + ": " + dest, e); - } - return rc; - } - - public void Send(IMessage message) - { - Send(Destination, message); - } - - public void Send(IMessage message, MsgDeliveryMode deliveryMode, MsgPriority priority, TimeSpan timeToLive) - { - Send(Destination, message, deliveryMode, priority, timeToLive); - } - - public void Send(IDestination destination, IMessage message) - { - Send(destination, message, DeliveryMode, Priority, TimeToLive); - } - - public void Send(IDestination destination, IMessage imessage, MsgDeliveryMode deliveryMode, MsgPriority priority, TimeSpan timeToLive) - { - BaseMessage message = (BaseMessage) imessage; - MessageQueue mq = null; - - try - { - // Locate the MSMQ Queue we will be sending to - if(messageQueue != null) - { - if(destination.Equals(this.destination)) - { - mq = messageQueue; - } - else - { - throw new NMSException("This producer can only be used to send to: " + destination); - } - } - else - { - mq = openMessageQueue((Destination) destination); - } - - message.NMSDeliveryMode = deliveryMode; - message.NMSTimeToLive = timeToLive; - message.NMSPriority = priority; - if(!DisableMessageTimestamp) - { - message.NMSTimestamp = DateTime.UtcNow; - } - - if(!DisableMessageID) - { - // TODO: message.NMSMessageId = - } - - // Convert the Mesasge into a MSMQ message - Message msg = session.MessageConverter.ToMsmqMessage(message); - - if(mq.Transactional) - { - if(session.Transacted) - { - mq.Send(msg, session.MessageQueueTransaction); - - } - else - { - // Start our own mini transaction here to send the message. - using(MessageQueueTransaction transaction = new MessageQueueTransaction()) - { - transaction.Begin(); - mq.Send(msg, transaction); - transaction.Commit(); - } - } - } - else - { - if(session.Transacted) - { - // We may want to raise an exception here since app requested - // a transeced NMS session, but is using a non transacted message queue - // For now silently ignore it. - } - mq.Send(msg); - } - - } - finally - { - if(mq != null && mq != messageQueue) - { - mq.Dispose(); - } - } - } - - public void Close() - { - if(messageQueue != null) - { - messageQueue.Dispose(); - messageQueue = null; - } - } - - public void Dispose() - { - Close(); - } - - public IMessage CreateMessage() - { - return session.CreateMessage(); - } - - public ITextMessage CreateTextMessage() - { - return session.CreateTextMessage(); - } - - public ITextMessage CreateTextMessage(String text) - { - return session.CreateTextMessage(text); - } - - public IMapMessage CreateMapMessage() - { - return session.CreateMapMessage(); - } - - public IObjectMessage CreateObjectMessage(Object body) - { - return session.CreateObjectMessage(body); - } - - public IBytesMessage CreateBytesMessage() - { - return session.CreateBytesMessage(); - } - - public IBytesMessage CreateBytesMessage(byte[] body) - { - return session.CreateBytesMessage(body); - } - - public IStreamMessage CreateStreamMessage() - { - return session.CreateStreamMessage(); - } - - public MsgDeliveryMode DeliveryMode - { - get { return deliveryMode; } - set { deliveryMode = value; } - } - - public TimeSpan TimeToLive - { - get { return timeToLive; } - set { timeToLive = value; } - } - - /// <summary> - /// The default timeout for network requests. - /// </summary> - public TimeSpan RequestTimeout - { - get { return NMSConstants.defaultRequestTimeout; } - set { } - } - - public IDestination Destination - { - get { return destination; } - set { destination = (Destination) value; } - } - - public MsgPriority Priority - { - get { return priority; } - set { priority = value; } - } - - public bool DisableMessageID - { - get { return disableMessageID; } - set { disableMessageID = value; } - } - - public bool DisableMessageTimestamp - { - get { return disableMessageTimestamp; } - set { disableMessageTimestamp = value; } - } - } + /// <summary> + /// An object capable of sending messages to some destination + /// </summary> + public class MessageProducer : IMessageProducer + { + + private readonly Session session; + private Destination destination; + + //private long messageCounter; + private MsgDeliveryMode deliveryMode; + private TimeSpan timeToLive; + private MsgPriority priority; + private bool disableMessageID; + private bool disableMessageTimestamp; + + private MessageQueue messageQueue; + + private ProducerTransformerDelegate producerTransformer; + public ProducerTransformerDelegate ProducerTransformer + { + get { return this.producerTransformer; } + set { this.producerTransformer = value; } + } + + public MessageProducer(Session session, Destination destination) + { + this.session = session; + this.destination = destination; + if(destination != null) + { + messageQueue = openMessageQueue(destination); + } + } + + private MessageQueue openMessageQueue(Destination dest) + { + MessageQueue rc = null; + try + { + if(!MessageQueue.Exists(dest.Path)) + { + // create the new message queue and make it transactional + rc = MessageQueue.Create(dest.Path, session.Transacted); + this.destination.Path = rc.Path; + } + else + { + rc = new MessageQueue(dest.Path); + this.destination.Path = rc.Path; + if(!rc.CanWrite) + { + throw new NMSSecurityException("Do not have write access to: " + dest); + } + } + } + catch(Exception e) + { + if(rc != null) + { + rc.Dispose(); + } + + throw new NMSException(e.Message + ": " + dest, e); + } + return rc; + } + + public void Send(IMessage message) + { + Send(Destination, message); + } + + public void Send(IMessage message, MsgDeliveryMode deliveryMode, MsgPriority priority, TimeSpan timeToLive) + { + Send(Destination, message, deliveryMode, priority, timeToLive); + } + + public void Send(IDestination destination, IMessage message) + { + Send(destination, message, DeliveryMode, Priority, TimeToLive); + } + + public void Send(IDestination destination, IMessage imessage, MsgDeliveryMode deliveryMode, MsgPriority priority, TimeSpan timeToLive) + { + BaseMessage message = (BaseMessage) imessage; + MessageQueue mq = null; + + try + { + // Locate the MSMQ Queue we will be sending to + if(messageQueue != null) + { + if(destination.Equals(this.destination)) + { + mq = messageQueue; + } + else + { + throw new NMSException("This producer can only be used to send to: " + destination); + } + } + else + { + mq = openMessageQueue((Destination) destination); + } + + if(this.ProducerTransformer != null) + { + IMessage transformed = this.ProducerTransformer(this.session, this, message); + if(transformed != null) + { + message = transformed; + } + } + + message.NMSDeliveryMode = deliveryMode; + message.NMSTimeToLive = timeToLive; + message.NMSPriority = priority; + if(!DisableMessageTimestamp) + { + message.NMSTimestamp = DateTime.UtcNow; + } + + if(!DisableMessageID) + { + // TODO: message.NMSMessageId = + } + + // Convert the Mesasge into a MSMQ message + Message msg = session.MessageConverter.ToMsmqMessage(message); + + if(mq.Transactional) + { + if(session.Transacted) + { + mq.Send(msg, session.MessageQueueTransaction); + + } + else + { + // Start our own mini transaction here to send the message. + using(MessageQueueTransaction transaction = new MessageQueueTransaction()) + { + transaction.Begin(); + mq.Send(msg, transaction); + transaction.Commit(); + } + } + } + else + { + if(session.Transacted) + { + // We may want to raise an exception here since app requested + // a transeced NMS session, but is using a non transacted message queue + // For now silently ignore it. + } + mq.Send(msg); + } + + } + finally + { + if(mq != null && mq != messageQueue) + { + mq.Dispose(); + } + } + } + + public void Close() + { + if(messageQueue != null) + { + messageQueue.Dispose(); + messageQueue = null; + } + } + + public void Dispose() + { + Close(); + } + + public IMessage CreateMessage() + { + return session.CreateMessage(); + } + + public ITextMessage CreateTextMessage() + { + return session.CreateTextMessage(); + } + + public ITextMessage CreateTextMessage(String text) + { + return session.CreateTextMessage(text); + } + + public IMapMessage CreateMapMessage() + { + return session.CreateMapMessage(); + } + + public IObjectMessage CreateObjectMessage(Object body) + { + return session.CreateObjectMessage(body); + } + + public IBytesMessage CreateBytesMessage() + { + return session.CreateBytesMessage(); + } + + public IBytesMessage CreateBytesMessage(byte[] body) + { + return session.CreateBytesMessage(body); + } + + public IStreamMessage CreateStreamMessage() + { + return session.CreateStreamMessage(); + } + + public MsgDeliveryMode DeliveryMode + { + get { return deliveryMode; } + set { deliveryMode = value; } + } + + public TimeSpan TimeToLive + { + get { return timeToLive; } + set { timeToLive = value; } + } + + /// <summary> + /// The default timeout for network requests. + /// </summary> + public TimeSpan RequestTimeout + { + get { return NMSConstants.defaultRequestTimeout; } + set { } + } + + public IDestination Destination + { + get { return destination; } + set { destination = (Destination) value; } + } + + public MsgPriority Priority + { + get { return priority; } + set { priority = value; } + } + + public bool DisableMessageID + { + get { return disableMessageID; } + set { disableMessageID = value; } + } + + public bool DisableMessageTimestamp + { + get { return disableMessageTimestamp; } + set { disableMessageTimestamp = value; } + } + } } http://git-wip-us.apache.org/repos/asf/activemq-nms-msmq/blob/48f3e707/trunk/src/main/csharp/Session.cs ---------------------------------------------------------------------- diff --git a/trunk/src/main/csharp/Session.cs b/trunk/src/main/csharp/Session.cs index 6ee3d9e..71ee69c 100644 --- a/trunk/src/main/csharp/Session.cs +++ b/trunk/src/main/csharp/Session.cs @@ -19,229 +19,243 @@ using System.Messaging; namespace Apache.NMS.MSMQ { - /// <summary> - /// MSQM provider of ISession - /// </summary> - public class Session : ISession - { - private Connection connection; - private AcknowledgementMode acknowledgementMode; - private MessageQueueTransaction messageQueueTransaction; - private IMessageConverter messageConverter; - - public Session(Connection connection, AcknowledgementMode acknowledgementMode) - { - this.connection = connection; - this.acknowledgementMode = acknowledgementMode; - MessageConverter = connection.MessageConverter; - if(this.acknowledgementMode == AcknowledgementMode.Transactional) - { - MessageQueueTransaction = new MessageQueueTransaction(); - } - } - - public void Dispose() - { - if(MessageQueueTransaction != null) - { - MessageQueueTransaction.Dispose(); - } - } - - public IMessageProducer CreateProducer() - { - return CreateProducer(null); - } - - public IMessageProducer CreateProducer(IDestination destination) - { - return new MessageProducer(this, (Destination) destination); - } - - public IMessageConsumer CreateConsumer(IDestination destination) - { - return CreateConsumer(destination, null); - } - - public IMessageConsumer CreateConsumer(IDestination destination, string selector) - { - return CreateConsumer(destination, selector, false); - } - - public IMessageConsumer CreateConsumer(IDestination destination, string selector, bool noLocal) - { - if(selector != null) - { - throw new NotSupportedException("Selectors are not supported by MSMQ"); - } - MessageQueue queue = MessageConverter.ToMsmqDestination(destination); - return new MessageConsumer(this, acknowledgementMode, queue); - } - - public IMessageConsumer CreateDurableConsumer(ITopic destination, string name, string selector, bool noLocal) - { - throw new NotSupportedException("Durable Topic subscribers are not supported by MSMQ"); - } - - public void DeleteDurableConsumer(string name) - { - throw new NotSupportedException("Durable Topic subscribers are not supported by MSMQ"); - } - - public IQueueBrowser CreateBrowser(IQueue queue) - { - throw new NotImplementedException(); - } - - public IQueueBrowser CreateBrowser(IQueue queue, string selector) - { - throw new NotImplementedException(); - } - - public IQueue GetQueue(string name) - { - return new Queue(name); - } - - public ITopic GetTopic(string name) - { - throw new NotSupportedException("Topics are not supported by MSMQ"); - } - - public ITemporaryQueue CreateTemporaryQueue() - { - throw new NotSupportedException("Tempoary Queues are not supported by MSMQ"); - } - - public ITemporaryTopic CreateTemporaryTopic() - { - throw new NotSupportedException("Tempoary Topics are not supported by MSMQ"); - } - - /// <summary> - /// Delete a destination (Queue, Topic, Temp Queue, Temp Topic). - /// </summary> - public void DeleteDestination(IDestination destination) - { - // TODO: Implement if possible. If not possible, then change exception to NotSupportedException(). - throw new NotImplementedException(); - } - - public IMessage CreateMessage() - { - BaseMessage answer = new BaseMessage(); - return answer; - } - - - public ITextMessage CreateTextMessage() - { - TextMessage answer = new TextMessage(); - return answer; - } - - public ITextMessage CreateTextMessage(string text) - { - TextMessage answer = new TextMessage(text); - return answer; - } - - public IMapMessage CreateMapMessage() - { - return new MapMessage(); - } - - public IBytesMessage CreateBytesMessage() - { - return new BytesMessage(); - } - - public IBytesMessage CreateBytesMessage(byte[] body) - { - BytesMessage answer = new BytesMessage(); - answer.Content = body; - return answer; - } - - public IStreamMessage CreateStreamMessage() - { - return new StreamMessage(); - } - - public IObjectMessage CreateObjectMessage(Object body) - { - ObjectMessage answer = new ObjectMessage(); - answer.Body = body; - return answer; - } - - public void Commit() - { - if(!Transacted) - { - throw new InvalidOperationException("You cannot perform a Commit() on a non-transacted session. Acknowlegement mode is: " + acknowledgementMode); - } - messageQueueTransaction.Commit(); - } - - public void Rollback() - { - if(!Transacted) - { - throw new InvalidOperationException("You cannot perform a Commit() on a non-transacted session. Acknowlegement mode is: " + acknowledgementMode); - } - messageQueueTransaction.Abort(); - } - - // Properties - public Connection Connection - { - get { return connection; } - } - - /// <summary> - /// The default timeout for network requests. - /// </summary> - public TimeSpan RequestTimeout - { - get { return NMSConstants.defaultRequestTimeout; } - set { } - } - - public bool Transacted - { - get { return acknowledgementMode == AcknowledgementMode.Transactional; } - } - - public AcknowledgementMode AcknowledgementMode - { - get { throw new NotImplementedException(); } - } - - public MessageQueueTransaction MessageQueueTransaction - { - get - { - if(null != messageQueueTransaction - && messageQueueTransaction.Status != MessageQueueTransactionStatus.Pending) - { - messageQueueTransaction.Begin(); - } - - return messageQueueTransaction; - } - set { messageQueueTransaction = value; } - } - - public IMessageConverter MessageConverter - { - get { return messageConverter; } - set { messageConverter = value; } - } - - public void Close() - { - Dispose(); - } - } + /// <summary> + /// MSQM provider of ISession + /// </summary> + public class Session : ISession + { + private Connection connection; + private AcknowledgementMode acknowledgementMode; + private MessageQueueTransaction messageQueueTransaction; + private IMessageConverter messageConverter; + + public Session(Connection connection, AcknowledgementMode acknowledgementMode) + { + this.connection = connection; + this.acknowledgementMode = acknowledgementMode; + MessageConverter = connection.MessageConverter; + if(this.acknowledgementMode == AcknowledgementMode.Transactional) + { + MessageQueueTransaction = new MessageQueueTransaction(); + } + } + + public void Dispose() + { + if(MessageQueueTransaction != null) + { + MessageQueueTransaction.Dispose(); + } + } + + public IMessageProducer CreateProducer() + { + return CreateProducer(null); + } + + public IMessageProducer CreateProducer(IDestination destination) + { + return new MessageProducer(this, (Destination) destination); + } + + public IMessageConsumer CreateConsumer(IDestination destination) + { + return CreateConsumer(destination, null); + } + + public IMessageConsumer CreateConsumer(IDestination destination, string selector) + { + return CreateConsumer(destination, selector, false); + } + + public IMessageConsumer CreateConsumer(IDestination destination, string selector, bool noLocal) + { + if(selector != null) + { + throw new NotSupportedException("Selectors are not supported by MSMQ"); + } + MessageQueue queue = MessageConverter.ToMsmqDestination(destination); + return new MessageConsumer(this, acknowledgementMode, queue); + } + + public IMessageConsumer CreateDurableConsumer(ITopic destination, string name, string selector, bool noLocal) + { + throw new NotSupportedException("Durable Topic subscribers are not supported by MSMQ"); + } + + public void DeleteDurableConsumer(string name) + { + throw new NotSupportedException("Durable Topic subscribers are not supported by MSMQ"); + } + + public IQueueBrowser CreateBrowser(IQueue queue) + { + throw new NotImplementedException(); + } + + public IQueueBrowser CreateBrowser(IQueue queue, string selector) + { + throw new NotImplementedException(); + } + + public IQueue GetQueue(string name) + { + return new Queue(name); + } + + public ITopic GetTopic(string name) + { + throw new NotSupportedException("Topics are not supported by MSMQ"); + } + + public ITemporaryQueue CreateTemporaryQueue() + { + throw new NotSupportedException("Tempoary Queues are not supported by MSMQ"); + } + + public ITemporaryTopic CreateTemporaryTopic() + { + throw new NotSupportedException("Tempoary Topics are not supported by MSMQ"); + } + + /// <summary> + /// Delete a destination (Queue, Topic, Temp Queue, Temp Topic). + /// </summary> + public void DeleteDestination(IDestination destination) + { + // TODO: Implement if possible. If not possible, then change exception to NotSupportedException(). + throw new NotImplementedException(); + } + + public IMessage CreateMessage() + { + BaseMessage answer = new BaseMessage(); + return answer; + } + + + public ITextMessage CreateTextMessage() + { + TextMessage answer = new TextMessage(); + return answer; + } + + public ITextMessage CreateTextMessage(string text) + { + TextMessage answer = new TextMessage(text); + return answer; + } + + public IMapMessage CreateMapMessage() + { + return new MapMessage(); + } + + public IBytesMessage CreateBytesMessage() + { + return new BytesMessage(); + } + + public IBytesMessage CreateBytesMessage(byte[] body) + { + BytesMessage answer = new BytesMessage(); + answer.Content = body; + return answer; + } + + public IStreamMessage CreateStreamMessage() + { + return new StreamMessage(); + } + + public IObjectMessage CreateObjectMessage(Object body) + { + ObjectMessage answer = new ObjectMessage(); + answer.Body = body; + return answer; + } + + public void Commit() + { + if(!Transacted) + { + throw new InvalidOperationException("You cannot perform a Commit() on a non-transacted session. Acknowlegement mode is: " + acknowledgementMode); + } + messageQueueTransaction.Commit(); + } + + public void Rollback() + { + if(!Transacted) + { + throw new InvalidOperationException("You cannot perform a Commit() on a non-transacted session. Acknowlegement mode is: " + acknowledgementMode); + } + messageQueueTransaction.Abort(); + } + + // Properties + public Connection Connection + { + get { return connection; } + } + + /// <summary> + /// The default timeout for network requests. + /// </summary> + public TimeSpan RequestTimeout + { + get { return NMSConstants.defaultRequestTimeout; } + set { } + } + + public bool Transacted + { + get { return acknowledgementMode == AcknowledgementMode.Transactional; } + } + + public AcknowledgementMode AcknowledgementMode + { + get { throw new NotImplementedException(); } + } + + public MessageQueueTransaction MessageQueueTransaction + { + get + { + if(null != messageQueueTransaction + && messageQueueTransaction.Status != MessageQueueTransactionStatus.Pending) + { + messageQueueTransaction.Begin(); + } + + return messageQueueTransaction; + } + set { messageQueueTransaction = value; } + } + + public IMessageConverter MessageConverter + { + get { return messageConverter; } + set { messageConverter = value; } + } + + private ConsumerTransformerDelegate consumerTransformer; + public ConsumerTransformerDelegate ConsumerTransformer + { + get { return this.consumerTransformer; } + set { this.consumerTransformer = value; } + } + + private ProducerTransformerDelegate producerTransformer; + public ProducerTransformerDelegate ProducerTransformer + { + get { return this.producerTransformer; } + set { this.producerTransformer = value; } + } + + public void Close() + { + Dispose(); + } + } }
