Apply patch for AMQNET-554. Suport for message properties, and selectors. Thanks Stephane Ramet!
Project: http://git-wip-us.apache.org/repos/asf/activemq-nms-msmq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-nms-msmq/commit/16d8f06d Tree: http://git-wip-us.apache.org/repos/asf/activemq-nms-msmq/tree/16d8f06d Diff: http://git-wip-us.apache.org/repos/asf/activemq-nms-msmq/diff/16d8f06d Branch: refs/heads/master Commit: 16d8f06dd0b178c5030c0da7345aa16e2d51a761 Parents: 51ec9a2 Author: Jim Gomes <[email protected]> Authored: Thu Jul 7 20:47:10 2016 +0000 Committer: Jim Gomes <[email protected]> Committed: Thu Jul 7 20:47:10 2016 +0000 ---------------------------------------------------------------------- src/main/csharp/BaseMessage.cs | 154 ++- src/main/csharp/DefaultMessageConverter.cs | 500 +++++++- src/main/csharp/IMessageConverter.cs | 28 +- src/main/csharp/IMessageConverterEx.cs | 44 + src/main/csharp/MessageConsumer.cs | 93 +- src/main/csharp/QueueBrowser.cs | 31 +- .../csharp/Readers/AbstractMessageReader.cs | 126 ++ .../Readers/ByCorrelationIdMessageReader.cs | 139 +++ src/main/csharp/Readers/ByIdMessageReader.cs | 136 ++ .../csharp/Readers/ByLookupIdMessageReader.cs | 145 +++ .../csharp/Readers/BySelectorMessageReader.cs | 290 +++++ src/main/csharp/Readers/IMessageReader.cs | 93 ++ src/main/csharp/Readers/MessageReaderUtil.cs | 91 ++ .../csharp/Readers/NonFilteringMessageReader.cs | 128 ++ src/main/csharp/Selector/ANDExpression.cs | 47 + .../csharp/Selector/AlignedNumericValues.cs | 175 +++ .../csharp/Selector/ArithmeticExpression.cs | 57 + src/main/csharp/Selector/BinaryExpression.cs | 59 + .../csharp/Selector/BooleanCastExpression.cs | 45 + .../Selector/BooleanConstantExpression.cs | 38 + .../csharp/Selector/BooleanUnaryExpression.cs | 39 + .../csharp/Selector/ComparisonExpression.cs | 162 +++ src/main/csharp/Selector/ConstantExpression.cs | 157 +++ src/main/csharp/Selector/DivideExpression.cs | 67 + src/main/csharp/Selector/EqualExpression.cs | 47 + src/main/csharp/Selector/GreaterExpression.cs | 42 + .../csharp/Selector/GreaterOrEqualExpression.cs | 43 + src/main/csharp/Selector/IBooleanExpression.cs | 35 + src/main/csharp/Selector/IExpression.cs | 35 + src/main/csharp/Selector/InExpression.cs | 98 ++ src/main/csharp/Selector/IsNullExpression.cs | 59 + src/main/csharp/Selector/LesserExpression.cs | 42 + .../csharp/Selector/LesserOrEqualExpression.cs | 43 + src/main/csharp/Selector/LikeExpression.cs | 124 ++ src/main/csharp/Selector/LogicExpression.cs | 48 + .../csharp/Selector/MessageEvaluationContext.cs | 78 ++ src/main/csharp/Selector/MinusExpression.cs | 67 + src/main/csharp/Selector/ModExpression.cs | 67 + src/main/csharp/Selector/MultiplyExpression.cs | 67 + src/main/csharp/Selector/NOTExpression.cs | 45 + src/main/csharp/Selector/NegateExpression.cs | 51 + src/main/csharp/Selector/ORExpression.cs | 46 + src/main/csharp/Selector/ParseException.cs | 197 +++ src/main/csharp/Selector/PlusExpression.cs | 68 + src/main/csharp/Selector/PropertyExpression.cs | 53 + src/main/csharp/Selector/SelectorParser.cs | 1172 ++++++++++++++++++ src/main/csharp/Selector/SelectorParser.csc | 589 +++++++++ .../csharp/Selector/SelectorParserConstants.cs | 75 ++ .../Selector/SelectorParserTokenManager.cs | 1042 ++++++++++++++++ src/main/csharp/Selector/SimpleCharStream.cs | 366 ++++++ src/main/csharp/Selector/Token.cs | 78 ++ src/main/csharp/Selector/TokenMgrError.cs | 130 ++ src/main/csharp/Selector/UnaryExpression.cs | 66 + src/main/csharp/Session.cs | 12 +- vs2008-msmq.csproj | 47 + 55 files changed, 7585 insertions(+), 191 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-nms-msmq/blob/16d8f06d/src/main/csharp/BaseMessage.cs ---------------------------------------------------------------------- diff --git a/src/main/csharp/BaseMessage.cs b/src/main/csharp/BaseMessage.cs index db73f4c..8ea7e31 100644 --- a/src/main/csharp/BaseMessage.cs +++ b/src/main/csharp/BaseMessage.cs @@ -1,4 +1,4 @@ -/* + /* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. @@ -24,28 +24,9 @@ namespace Apache.NMS.MSMQ public class BaseMessage : IMessage { - private PrimitiveMap propertiesMap = new PrimitiveMap(); - private IDestination destination; - private string correlationId; - private TimeSpan timeToLive; - private string messageId; - private MsgDeliveryMode deliveryMode; - private MsgPriority priority; - private Destination replyTo; - private byte[] content; - private string type; - private event AcknowledgeHandler Acknowledger; - private DateTime timestamp = new DateTime(); - private bool readOnlyMsgBody = false; - - public bool ReadOnlyBody - { - get { return readOnlyMsgBody; } - set { readOnlyMsgBody = value; } - } - - // IMessage interface + #region Acknowledgement + private event AcknowledgeHandler Acknowledger; public void Acknowledge() { if(null != Acknowledger) @@ -54,6 +35,27 @@ namespace Apache.NMS.MSMQ } } + #endregion + + #region Message body + + private byte[] content; + public byte[] Content + { + get { return content; } + set { this.content = value; } + } + + private bool readOnlyMsgBody = false; + /// <summary> + /// Whether the message body is read-only. + /// </summary> + public bool ReadOnlyBody + { + get { return readOnlyMsgBody; } + set { readOnlyMsgBody = value; } + } + /// <summary> /// Clears out the message body. Clearing a message's body does not clear its header /// values or property entries. @@ -67,26 +69,82 @@ namespace Apache.NMS.MSMQ this.readOnlyMsgBody = false; } + #endregion + + #region Message properties + + private PrimitiveMap propertiesMap = new PrimitiveMap(); + private MessagePropertyIntercepter propertyHelper; + /// <summary> + /// Provides access to the message properties (headers) + /// </summary> + public Apache.NMS.IPrimitiveMap Properties + { + get + { + if(propertyHelper == null) + { + propertyHelper = new Apache.NMS.Util.MessagePropertyIntercepter( + this, propertiesMap, this.ReadOnlyProperties); + } + + return propertyHelper; + } + } + + private bool readOnlyMsgProperties = false; + /// <summary> + /// Whether the message properties is read-only. + /// </summary> + public virtual bool ReadOnlyProperties + { + get { return this.readOnlyMsgProperties; } + + set + { + if(this.propertyHelper != null) + { + this.propertyHelper.ReadOnly = value; + } + this.readOnlyMsgProperties = value; + } + } + /// <summary> /// Clears a message's properties. - /// /// The message's header fields and body are not cleared. /// </summary> - public virtual void ClearProperties() + public void ClearProperties() { - propertiesMap.Clear(); + this.ReadOnlyProperties = false; + this.propertiesMap.Clear(); } - // Properties + public object GetObjectProperty(string name) + { + return Properties[name]; + } - public IPrimitiveMap Properties + public void SetObjectProperty(string name, object value) { - get { return propertiesMap; } + Properties[name] = value; } + #endregion - // NMS headers + #region Message header fields + private string messageId; + /// <summary> + /// The message ID which is set by the provider + /// </summary> + public string NMSMessageId + { + get { return messageId; } + set { messageId = value; } + } + + private string correlationId; /// <summary> /// The correlation ID used to correlate messages with conversations or long running business processes /// </summary> @@ -96,6 +154,7 @@ namespace Apache.NMS.MSMQ set { correlationId = value; } } + private IDestination destination; /// <summary> /// The destination of the message /// </summary> @@ -105,6 +164,7 @@ namespace Apache.NMS.MSMQ set { destination = value; } } + private TimeSpan timeToLive; /// <summary> /// The time in milliseconds that this message should expire in /// </summary> @@ -114,15 +174,7 @@ namespace Apache.NMS.MSMQ set { timeToLive = value; } } - /// <summary> - /// The message ID which is set by the provider - /// </summary> - public string NMSMessageId - { - get { return messageId; } - set { messageId = value; } - } - + private MsgDeliveryMode deliveryMode; /// <summary> /// Whether or not this message is persistent /// </summary> @@ -132,6 +184,7 @@ namespace Apache.NMS.MSMQ set { deliveryMode = value; } } + private MsgPriority priority; /// <summary> /// The Priority on this message /// </summary> @@ -150,7 +203,7 @@ namespace Apache.NMS.MSMQ set { } } - + private Destination replyTo; /// <summary> /// The destination that the consumer of this message should send replies to /// </summary> @@ -160,7 +213,7 @@ namespace Apache.NMS.MSMQ set { replyTo = (Destination) value; } } - + private DateTime timestamp = new DateTime(); /// <summary> /// The timestamp the broker added to the message /// </summary> @@ -170,12 +223,7 @@ namespace Apache.NMS.MSMQ set { timestamp = value; } } - public byte[] Content - { - get { return content; } - set { this.content = value; } - } - + private string type; /// <summary> /// The type name of this message /// </summary> @@ -185,15 +233,9 @@ namespace Apache.NMS.MSMQ set { type = value; } } + #endregion - public object GetObjectProperty(string name) - { - return null; - } - - public void SetObjectProperty(string name, object value) - { - } + #region Check access mode protected void FailIfReadOnlyBody() { @@ -205,11 +247,13 @@ namespace Apache.NMS.MSMQ protected void FailIfWriteOnlyBody() { - if( ReadOnlyBody == false ) + if(ReadOnlyBody == false) { throw new MessageNotReadableException("Message is in Write-Only mode."); } } + + #endregion } } http://git-wip-us.apache.org/repos/asf/activemq-nms-msmq/blob/16d8f06d/src/main/csharp/DefaultMessageConverter.cs ---------------------------------------------------------------------- diff --git a/src/main/csharp/DefaultMessageConverter.cs b/src/main/csharp/DefaultMessageConverter.cs index 2aa3438..83097fc 100644 --- a/src/main/csharp/DefaultMessageConverter.cs +++ b/src/main/csharp/DefaultMessageConverter.cs @@ -32,12 +32,61 @@ namespace Apache.NMS.MSMQ StreamMessage } - public class DefaultMessageConverter : IMessageConverter + /// <summary> + /// This class provides default rules for converting MSMQ to and from + /// NMS messages, when the peer system expects or produces compatible + /// mappings, typically when the peer system is also implemented on + /// Apache.NMS. + /// Default mappings are as follows : + /// <ul> + /// <li> + /// the MSMQ Message.AppSetting field is used for specifying the NMS + /// message type, as specified by the <c>NMSMessageType</c> enumeration. + /// </li> + /// <li> + /// the MSMQ Message.Extension field is populated with a map + /// (a marshalled <c>PrimitiveMap</c>) of message properties. + /// </li> + /// <li> + /// in earlier versions of Apache.NMS.MSMQ, the MSMQ Message.Label + /// field was populated with the value of the NMSType field. Setting + /// <c>SetLabelAsNMSType</c> to true (the default value) applies that + /// same rule, which makes it compatible with existing NMS peers. If + /// set to false, the Message.Label field is populated with the value + /// of a "Label" property, if it exists, thus making it readable by + /// standard management or monitoring tools. The NMSType value is then + /// transmitted as a field in the Message.Extension map. + /// </li> + /// </ul> + /// Please note that in earlier versions of Apache.NMS, only one property + /// was set in the Message.Extension field : the NMSCorrelationID. + /// The native Message.CorrelationId field is not settable, except for + /// reply messages explicitely created as such through the MSMQ API. + /// Transmission of the correlation id. through a mapped property called + /// NMSCorrelationID is therefore maintained. + /// When exchanging messages with a non compatible peer, a specific + /// message converter must be provided, which should at least be able to + /// map message types and define the encoding used for text messages. + /// </summary> + public class DefaultMessageConverter : IMessageConverterEx { + private bool setLabelAsNMSType = true; + public bool SetLabelAsNMSType + { + get { return setLabelAsNMSType; } + set { setLabelAsNMSType = value; } + } + + #region Messages + /// <summary> + /// Converts the specified NMS message to an equivalent MSMQ message. + /// </summary> + /// <param name="message">NMS message to be converted.</param> + /// <result>Converted MSMQ message.</result> public virtual Message ToMsmqMessage(IMessage message) { Message msmqMessage = new Message(); - PrimitiveMap metaData = new PrimitiveMap(); + PrimitiveMap propertyData = new PrimitiveMap(); ConvertMessageBodyToMSMQ(message, msmqMessage); @@ -48,32 +97,72 @@ namespace Apache.NMS.MSMQ if(message.NMSCorrelationID != null) { - metaData.SetString("NMSCorrelationID", message.NMSCorrelationID); + propertyData.SetString("NMSCorrelationID", message.NMSCorrelationID); } msmqMessage.Recoverable = (message.NMSDeliveryMode == MsgDeliveryMode.Persistent); - msmqMessage.Priority = ToMessagePriority(message.NMSPriority); + msmqMessage.Priority = ToMsmqMessagePriority(message.NMSPriority); msmqMessage.ResponseQueue = ToMsmqDestination(message.NMSReplyTo); if(message.NMSType != null) { - msmqMessage.Label = message.NMSType; + if(SetLabelAsNMSType) + { + propertyData.SetString("NMSType", message.NMSType); + } + else + { + msmqMessage.Label = message.NMSType; + } } - // Store the NMS meta data in the extension area - msmqMessage.Extension = metaData.Marshal(); + // Populate property data + foreach(object keyObject in message.Properties.Keys) + { + string key = (keyObject as string); + object val = message.Properties.GetString(key); + if(!SetLabelAsNMSType && string.Compare(key, "Label", true) == 0 && val != null) + { + msmqMessage.Label = val.ToString(); + } + else + { + propertyData[key] = val; + } + } + + // Store the NMS property data in the extension area + msmqMessage.Extension = propertyData.Marshal(); return msmqMessage; } + /// <summary> + /// Converts the specified MSMQ message to an equivalent NMS message + /// (including its message body). + /// </summary> + /// <param name="message">MSMQ message to be converted.</param> + /// <result>Converted NMS message.</result> public virtual IMessage ToNmsMessage(Message message) { - BaseMessage answer = CreateNmsMessage(message); - // Get the NMS meta data from the extension area - PrimitiveMap metaData = PrimitiveMap.Unmarshal(message.Extension); + return ToNmsMessage(message, true); + } + + /// <summary> + /// Converts the specified MSMQ message to an equivalent NMS message. + /// </summary> + /// <param name="message">MSMQ message to be converted.</param> + /// <param name="convertBody">true if message body should be converted.</param> + /// <result>Converted NMS message.</result> + public virtual IMessage ToNmsMessage(Message message, bool convertBody) + { + BaseMessage answer = CreateNmsMessage(message, convertBody); + + // Get the NMS property data from the extension area + PrimitiveMap propertyData = PrimitiveMap.Unmarshal(message.Extension); try { answer.NMSMessageId = message.Id; - answer.NMSCorrelationID = metaData.GetString("NMSCorrelationID"); + answer.NMSCorrelationID = propertyData.GetString("NMSCorrelationID"); answer.NMSDeliveryMode = (message.Recoverable ? MsgDeliveryMode.Persistent : MsgDeliveryMode.NonPersistent); answer.NMSDestination = ToNmsDestination(message.DestinationQueue); } @@ -83,18 +172,85 @@ namespace Apache.NMS.MSMQ try { - answer.NMSType = message.Label; answer.NMSReplyTo = ToNmsDestination(message.ResponseQueue); answer.NMSTimeToLive = message.TimeToBeReceived; + answer.NMSPriority = ToNmsMsgPriority(message.Priority); + } + catch(InvalidOperationException) + { + } + + try + { + if(message.Label != null) + { + if(SetLabelAsNMSType) + { + answer.NMSType = message.Label; + } + else + { + answer.Properties["Label"] = message.Label; + } + } + answer.Properties["LookupId"] = message.LookupId; } catch(InvalidOperationException) { } + foreach(object keyObject in propertyData.Keys) + { + try + { + string key = (keyObject as string); + if(string.Compare(key, "NMSType", true) == 0) + { + answer.NMSType = propertyData.GetString(key); + } + else if(string.Compare(key, "NMSCorrelationID", true) == 0) + { + answer.NMSCorrelationID = propertyData.GetString("NMSCorrelationID"); + } + else + { + answer.Properties[key] = propertyData[key]; + } + } + catch(InvalidOperationException) + { + } + } return answer; } - private static MessagePriority ToMessagePriority(MsgPriority msgPriority) + #endregion + + #region Message priority + + // Message priorities are defined as follows : + // | MSMQ | NMS | + // | MessagePriority | MsgPriority | + // +--------------------+--------------------+ + // | Lowest | Lowest | + // | VeryLow | VeryLow | + // | Low | Low | + // | \-> | AboveLow | + // | /-> | BelowNormal | + // | Normal | Normal | + // | AboveNormal | AboveNormal | + // | High | High | + // | VeryHigh | VeryHigh | + // | Highest | Highest | + // +--------------------+--------------------+ + + /// <summary> + /// Converts the specified NMS message priority to an equivalent MSMQ + /// message priority. + /// </summary> + /// <param name="msgPriority">NMS message priority to be converted.</param> + /// <result>Converted MSMQ message priority.</result> + private static MessagePriority ToMsmqMessagePriority(MsgPriority msgPriority) { switch(msgPriority) { @@ -127,6 +283,153 @@ namespace Apache.NMS.MSMQ } } + /// <summary> + /// Converts the specified MSMQ message priority to an equivalent NMS + /// message priority. + /// </summary> + /// <param name="messagePriority">MSMQ message priority to be converted.</param> + /// <result>Converted NMS message priority.</result> + private static MsgPriority ToNmsMsgPriority(MessagePriority messagePriority) + { + switch(messagePriority) + { + case MessagePriority.Lowest: + return MsgPriority.Lowest; + + case MessagePriority.VeryLow: + return MsgPriority.VeryLow; + + case MessagePriority.Low: + return MsgPriority.Low; + + default: + case MessagePriority.Normal: + return MsgPriority.Normal; + + case MessagePriority.AboveNormal: + return MsgPriority.AboveNormal; + + case MessagePriority.High: + return MsgPriority.High; + + case MessagePriority.VeryHigh: + return MsgPriority.VeryHigh; + + case MessagePriority.Highest: + return MsgPriority.Highest; + } + } + + #endregion + + #region Message creation + + // Conversion of the message body has been separated from the creation + // of the NMS message object for performance reasons when using + // selectors (selectors handle only message attributes, not message + // bodies). + // CreateNmsMessage(Message) is maintained for compatibility reasons + // with existing clients that may have implemented derived classes, + // instead of completely removing the body conversion part from the + // method. + + /// <summary> + /// Creates an NMS message of appropriate type for the specified MSMQ + /// message, and convert the message body. + /// </summary> + /// <param name="message">MSMQ message.</param> + /// <result>NMS message created for retrieving the MSMQ message.</result> + protected virtual BaseMessage CreateNmsMessage(Message message) + { + return CreateNmsMessage(message, true); + } + + /// <summary> + /// Creates an NMS message of appropriate type for the specified MSMQ + /// message, and convert the message body if specified. + /// </summary> + /// <param name="message">MSMQ message.</param> + /// <param name="convertBody">true if the message body must be + /// converted.</param> + /// <result>NMS message created for retrieving the MSMQ message.</result> + protected virtual BaseMessage CreateNmsMessage(Message message, + bool convertBody) + { + BaseMessage result = null; + + if((int) NMSMessageType.TextMessage == message.AppSpecific) + { + TextMessage textMessage = new TextMessage(); + + if(convertBody) + { + ConvertTextMessageBodyToNMS(message, textMessage); + } + + result = textMessage; + } + else if((int) NMSMessageType.BytesMessage == message.AppSpecific) + { + BytesMessage bytesMessage = new BytesMessage(); + + if(convertBody) + { + ConvertBytesMessageBodyToNMS(message, bytesMessage); + } + + result = bytesMessage; + } + else if((int) NMSMessageType.ObjectMessage == message.AppSpecific) + { + ObjectMessage objectMessage = new ObjectMessage(); + + if(convertBody) + { + ConvertObjectMessageBodyToNMS(message, objectMessage); + } + + result = objectMessage; + } + else if((int) NMSMessageType.MapMessage == message.AppSpecific) + { + MapMessage mapMessage = new MapMessage(); + + if(convertBody) + { + ConvertMapMessageBodyToNMS(message, mapMessage); + } + + result = mapMessage; + } + else if((int) NMSMessageType.StreamMessage == message.AppSpecific) + { + StreamMessage streamMessage = new StreamMessage(); + + if(convertBody) + { + ConvertStreamMessageBodyToNMS(message, streamMessage); + } + + result = streamMessage; + } + else + { + BaseMessage baseMessage = new BaseMessage(); + result = baseMessage; + } + + return result; + } + + #endregion + + #region Message body + + /// <summary> + /// Converts an NMS message body to the equivalent MSMQ message body. + /// </summary> + /// <param name="message">Source NMS message.</param> + /// <param name="answer">Target MSMQ message.</param> protected virtual void ConvertMessageBodyToMSMQ(IMessage message, Message answer) { if(message is TextMessage) @@ -172,78 +475,133 @@ namespace Apache.NMS.MSMQ } } - protected virtual BaseMessage CreateNmsMessage(Message message) + /// <summary> + /// Converts an MSMQ message body to the equivalent NMS message body. + /// </summary> + /// <param name="message">Source MSMQ message.</param> + /// <param name="answer">Target NMS message.</param> + public virtual void ConvertMessageBodyToNMS(Message message, IMessage answer) { - BaseMessage result = null; - - if((int) NMSMessageType.TextMessage == message.AppSpecific) + if(answer is TextMessage) { - TextMessage textMessage = new TextMessage(); - string content = String.Empty; - - if(message.BodyStream != null && message.BodyStream.Length > 0) - { - byte[] buf = null; - buf = new byte[message.BodyStream.Length]; - message.BodyStream.Read(buf, 0, buf.Length); - content = Encoding.UTF32.GetString(buf); - } - - textMessage.Text = content; - result = textMessage; + ConvertTextMessageBodyToNMS(message, (TextMessage)answer); } - else if((int) NMSMessageType.BytesMessage == message.AppSpecific) + else if(answer is BytesMessage) { - byte[] buf = null; - - if(message.BodyStream != null && message.BodyStream.Length > 0) - { - buf = new byte[message.BodyStream.Length]; - message.BodyStream.Read(buf, 0, buf.Length); - } - - BytesMessage bytesMessage = new BytesMessage(); - bytesMessage.Content = buf; - result = bytesMessage; + ConvertBytesMessageBodyToNMS(message, (BytesMessage)answer); } - else if((int) NMSMessageType.ObjectMessage == message.AppSpecific) + else if(answer is ObjectMessage) { - ObjectMessage objectMessage = new ObjectMessage(); - - objectMessage.Body = message.Body; - result = objectMessage; + ConvertObjectMessageBodyToNMS(message, (ObjectMessage)answer); } - else if((int) NMSMessageType.MapMessage == message.AppSpecific) + else if(answer is MapMessage) { - byte[] buf = null; - - if(message.BodyStream != null && message.BodyStream.Length > 0) - { - buf = new byte[message.BodyStream.Length]; - message.BodyStream.Read(buf, 0, buf.Length); - } - - MapMessage mapMessage = new MapMessage(); - mapMessage.Body = PrimitiveMap.Unmarshal(buf); - result = mapMessage; + ConvertMapMessageBodyToNMS(message, (MapMessage)answer); } - else if((int) NMSMessageType.StreamMessage == message.AppSpecific) + else if(answer is StreamMessage) { - StreamMessage streamMessage = new StreamMessage(); + ConvertStreamMessageBodyToNMS(message, (StreamMessage)answer); + } - // TODO: Implement - result = streamMessage; + return; + } + + /// <summary> + /// Converts an MSMQ message body to the equivalent NMS text message + /// body. + /// </summary> + /// <param name="message">Source MSMQ message.</param> + /// <param name="answer">Target NMS text message.</param> + public virtual void ConvertTextMessageBodyToNMS(Message message, + TextMessage answer) + { + string content = String.Empty; + + if(message.BodyStream != null && message.BodyStream.Length > 0) + { + byte[] buf = new byte[message.BodyStream.Length]; + message.BodyStream.Read(buf, 0, buf.Length); + content = Encoding.UTF32.GetString(buf); } - else + + answer.Text = content; + } + + /// <summary> + /// Converts an MSMQ message body to the equivalent NMS bytes message + /// body. + /// </summary> + /// <param name="message">Source MSMQ message.</param> + /// <param name="answer">Target NMS bytes message.</param> + public virtual void ConvertBytesMessageBodyToNMS(Message message, + BytesMessage answer) + { + byte[] buf = null; + + if(message.BodyStream != null && message.BodyStream.Length > 0) { - BaseMessage baseMessage = new BaseMessage(); + buf = new byte[message.BodyStream.Length]; + message.BodyStream.Read(buf, 0, buf.Length); + } - result = baseMessage; + answer.Content = buf; + } + + /// <summary> + /// Converts an MSMQ message body to the equivalent NMS object message + /// body. + /// </summary> + /// <param name="message">Source MSMQ message.</param> + /// <param name="answer">Target NMS object message.</param> + public virtual void ConvertObjectMessageBodyToNMS(Message message, + ObjectMessage answer) + { + answer.Body = message.Body; + } + + /// <summary> + /// Converts an MSMQ message body to the equivalent NMS map message + /// body. + /// </summary> + /// <param name="message">Source MSMQ message.</param> + /// <param name="answer">Target NMS map message.</param> + public virtual void ConvertMapMessageBodyToNMS(Message message, + MapMessage answer) + { + byte[] buf = null; + + if(message.BodyStream != null && message.BodyStream.Length > 0) + { + buf = new byte[message.BodyStream.Length]; + message.BodyStream.Read(buf, 0, buf.Length); } - return result; + answer.Body = PrimitiveMap.Unmarshal(buf); + } + + /// <summary> + /// Converts an MSMQ message body to the equivalent NMS stream message + /// body. + /// </summary> + /// <param name="message">Source MSMQ message.</param> + /// <param name="answer">Target NMS stream message.</param> + public virtual void ConvertStreamMessageBodyToNMS(Message message, + StreamMessage answer) + { + // TODO: Implement + throw new NotImplementedException(); } + #endregion + + #region Destination + + /// <summary> + /// Converts an NMS destination to the equivalent MSMQ destination + /// (ie. queue). + /// </summary> + /// <param name="destination">NMS destination.</param> + /// <result>MSMQ queue.</result> public MessageQueue ToMsmqDestination(IDestination destination) { if(null == destination) @@ -254,6 +612,12 @@ namespace Apache.NMS.MSMQ return new MessageQueue((destination as Destination).Path); } + /// <summary> + /// Converts an MSMQ destination (ie. queue) to the equivalent NMS + /// destination. + /// </summary> + /// <param name="destinationQueue">MSMQ destination queue.</param> + /// <result>NMS destination.</result> protected virtual IDestination ToNmsDestination(MessageQueue destinationQueue) { if(null == destinationQueue) @@ -263,5 +627,7 @@ namespace Apache.NMS.MSMQ return new Queue(destinationQueue.Path); } + + #endregion } } http://git-wip-us.apache.org/repos/asf/activemq-nms-msmq/blob/16d8f06d/src/main/csharp/IMessageConverter.cs ---------------------------------------------------------------------- diff --git a/src/main/csharp/IMessageConverter.cs b/src/main/csharp/IMessageConverter.cs index 152377b..14c6669 100644 --- a/src/main/csharp/IMessageConverter.cs +++ b/src/main/csharp/IMessageConverter.cs @@ -20,15 +20,27 @@ namespace Apache.NMS.MSMQ { public interface IMessageConverter { - - /// <summary> - /// Method ToMSMQMessageQueue - /// </summary> - /// <param name="destination">An IDestination</param> - /// <returns>A MessageQueue</returns> - MessageQueue ToMsmqDestination(IDestination destination); - + /// <summary> + /// Converts the specified NMS message to an equivalent MSMQ message. + /// </summary> + /// <param name="message">NMS message to be converted.</param> + /// <result>Converted MSMQ message.</result> Message ToMsmqMessage(IMessage message); + + /// <summary> + /// Converts the specified MSMQ message to an equivalent NMS message + /// (including its message body). + /// </summary> + /// <param name="message">MSMQ message to be converted.</param> + /// <result>Converted NMS message.</result> IMessage ToNmsMessage(Message message); + + /// <summary> + /// Converts an NMS destination to the equivalent MSMQ destination + /// (ie. queue). + /// </summary> + /// <param name="destination">NMS destination.</param> + /// <result>MSMQ queue.</result> + MessageQueue ToMsmqDestination(IDestination destination); } } http://git-wip-us.apache.org/repos/asf/activemq-nms-msmq/blob/16d8f06d/src/main/csharp/IMessageConverterEx.cs ---------------------------------------------------------------------- diff --git a/src/main/csharp/IMessageConverterEx.cs b/src/main/csharp/IMessageConverterEx.cs new file mode 100644 index 0000000..92be928 --- /dev/null +++ b/src/main/csharp/IMessageConverterEx.cs @@ -0,0 +1,44 @@ +using System.Messaging; +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.NMS.MSMQ +{ + /// <summary> + /// Extended IMessageConverter interface supporting new methods for + /// optimizing message selection through "selectors". + /// The original IMessageConverter is maintained for compatibility + /// reasons with existing clients implementing it. + /// </summary> + public interface IMessageConverterEx : IMessageConverter + { + /// <summary> + /// Converts the specified MSMQ message to an equivalent NMS message. + /// </summary> + /// <param name="message">MSMQ message to be converted.</param> + /// <param name="convertBody">true if message body should be converted.</param> + /// <result>Converted NMS message.</result> + IMessage ToNmsMessage(Message message, bool convertBody); + + /// <summary> + /// Converts an MSMQ message body to the equivalent NMS message body. + /// </summary> + /// <param name="message">Source MSMQ message.</param> + /// <param name="answer">Target NMS message.</param> + void ConvertMessageBodyToNMS(Message message, IMessage answer); + } +} http://git-wip-us.apache.org/repos/asf/activemq-nms-msmq/blob/16d8f06d/src/main/csharp/MessageConsumer.cs ---------------------------------------------------------------------- diff --git a/src/main/csharp/MessageConsumer.cs b/src/main/csharp/MessageConsumer.cs index 6961298..eaaed5c 100644 --- a/src/main/csharp/MessageConsumer.cs +++ b/src/main/csharp/MessageConsumer.cs @@ -1,6 +1,8 @@ using System; using System.Messaging; using System.Threading; +using Apache.NMS.Util; +using Apache.NMS.MSMQ.Readers; /* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with @@ -17,12 +19,11 @@ using System.Threading; * See the License for the specific language governing permissions and * limitations under the License. */ -using Apache.NMS.Util; namespace Apache.NMS.MSMQ { /// <summary> - /// An object capable of receiving messages from some destination + /// An object capable of receiving messages from some destination. /// </summary> public class MessageConsumer : IMessageConsumer { @@ -31,8 +32,6 @@ namespace Apache.NMS.MSMQ private readonly Session session; private readonly AcknowledgementMode acknowledgementMode; private MessageQueue messageQueue; - private event MessageListener listener; - private int listenerCount = 0; private Thread asyncDeliveryThread = null; private AutoResetEvent pause = new AutoResetEvent(false); private Atomic<bool> asyncDelivery = new Atomic<bool>(false); @@ -44,17 +43,46 @@ namespace Apache.NMS.MSMQ set { this.consumerTransformer = value; } } - public MessageConsumer(Session session, AcknowledgementMode acknowledgementMode, MessageQueue messageQueue) + private IMessageReader reader; + + /// <summary> + /// Constructs a message consumer on the specified queue. + /// </summary> + /// <param name="session">The messaging session.</param> + /// <param name="acknowledgementMode">The message acknowledgement mode.</param> + /// <param name="messageQueue">The message queue to consume messages from.</param> + public MessageConsumer(Session session, + AcknowledgementMode acknowledgementMode, MessageQueue messageQueue) + : this(session, acknowledgementMode, messageQueue, null) + { + } + + /// <summary> + /// Constructs a message consumer on the specified queue, using a + /// selector for filtering incoming messages. + /// </summary> + /// <param name="session">The messaging session.</param> + /// <param name="acknowledgementMode">The message acknowledgement mode.</param> + /// <param name="messageQueue">The message queue to consume messages from.</param> + /// <param name="selector">The selection criteria.</param> + public MessageConsumer(Session session, + AcknowledgementMode acknowledgementMode, MessageQueue messageQueue, + string selector) { this.session = session; this.acknowledgementMode = acknowledgementMode; this.messageQueue = messageQueue; - if(null != this.messageQueue) + if(this.messageQueue != null) { this.messageQueue.MessageReadPropertyFilter.SetAll(); } + + reader = MessageReaderUtil.CreateMessageReader( + messageQueue, session.MessageConverter, selector); } + private int listenerCount = 0; + private event MessageListener listener; public event MessageListener Listener { add @@ -85,32 +113,8 @@ namespace Apache.NMS.MSMQ if(messageQueue != null) { - Message message; - - try - { - message = messageQueue.Receive(zeroTimeout); - } - catch - { - message = null; - } - - if(null == message) - { - ReceiveCompletedEventHandler receiveMsg = - delegate(Object source, ReceiveCompletedEventArgs asyncResult) { - message = messageQueue.EndReceive(asyncResult.AsyncResult); - pause.Set(); - }; - - messageQueue.ReceiveCompleted += receiveMsg; - messageQueue.BeginReceive(); - pause.WaitOne(); - messageQueue.ReceiveCompleted -= receiveMsg; - } - - nmsMessage = ToNmsMessage(message); + nmsMessage = reader.Receive(); + nmsMessage = TransformMessage(nmsMessage); } return nmsMessage; @@ -122,8 +126,8 @@ namespace Apache.NMS.MSMQ if(messageQueue != null) { - Message message = messageQueue.Receive(timeout); - nmsMessage = ToNmsMessage(message); + nmsMessage = reader.Receive(timeout); + nmsMessage = TransformMessage(nmsMessage); } return nmsMessage; @@ -135,8 +139,8 @@ namespace Apache.NMS.MSMQ if(messageQueue != null) { - Message message = messageQueue.Receive(zeroTimeout); - nmsMessage = ToNmsMessage(message); + nmsMessage = reader.Receive(zeroTimeout); + nmsMessage = TransformMessage(nmsMessage); } return nmsMessage; @@ -226,25 +230,20 @@ namespace Apache.NMS.MSMQ session.Connection.HandleException(e); } - protected virtual IMessage ToNmsMessage(Message message) + protected virtual IMessage TransformMessage(IMessage message) { - if(message == null) - { - return null; - } - - IMessage converted = session.MessageConverter.ToNmsMessage(message); + IMessage transformed = message; - if(this.ConsumerTransformer != null) + if(message != null && this.ConsumerTransformer != null) { - IMessage newMessage = ConsumerTransformer(this.session, this, converted); + IMessage newMessage = ConsumerTransformer(this.session, this, message); if(newMessage != null) { - converted = newMessage; + transformed = newMessage; } } - return converted; + return transformed; } } } http://git-wip-us.apache.org/repos/asf/activemq-nms-msmq/blob/16d8f06d/src/main/csharp/QueueBrowser.cs ---------------------------------------------------------------------- diff --git a/src/main/csharp/QueueBrowser.cs b/src/main/csharp/QueueBrowser.cs index 32752c5..3ff795d 100644 --- a/src/main/csharp/QueueBrowser.cs +++ b/src/main/csharp/QueueBrowser.cs @@ -19,6 +19,7 @@ using System.Collections; using System.Messaging; using Apache.NMS; using Apache.NMS.Util; +using Apache.NMS.MSMQ.Readers; namespace Apache.NMS.MSMQ { @@ -30,7 +31,17 @@ namespace Apache.NMS.MSMQ private readonly Session session; private MessageQueue messageQueue; + private string selector; + + private IMessageReader reader; + public QueueBrowser(Session session, MessageQueue messageQueue) + : this(session, messageQueue, null) + { + } + + public QueueBrowser(Session session, MessageQueue messageQueue, + string selector) { this.session = session; this.messageQueue = messageQueue; @@ -39,6 +50,8 @@ namespace Apache.NMS.MSMQ this.messageQueue.MessageReadPropertyFilter.SetAll(); } + reader = MessageReaderUtil.CreateMessageReader( + messageQueue, session.MessageConverter, selector); } ~QueueBrowser() @@ -95,7 +108,7 @@ namespace Apache.NMS.MSMQ public string MessageSelector { - get { throw new NotSupportedException(); } + get { return selector; } } public IQueue Queue @@ -107,11 +120,14 @@ namespace Apache.NMS.MSMQ { private readonly Session session; private readonly MessageEnumerator innerEnumerator; + private readonly IMessageReader reader; - public Enumerator(Session session, MessageQueue messageQueue) + public Enumerator(Session session, MessageQueue messageQueue, + IMessageReader reader) { this.session = session; this.innerEnumerator = messageQueue.GetMessageEnumerator2(); + this.reader = reader; } public object Current @@ -124,7 +140,14 @@ namespace Apache.NMS.MSMQ public bool MoveNext() { - return this.innerEnumerator.MoveNext(); + while(this.innerEnumerator.MoveNext()) + { + if(reader.Matches(this.innerEnumerator.Current)) + { + return true; + } + } + return false; } public void Reset() @@ -135,7 +158,7 @@ namespace Apache.NMS.MSMQ public IEnumerator GetEnumerator() { - return new Enumerator(this.session, this.messageQueue); + return new Enumerator(this.session, this.messageQueue, this.reader); } } } http://git-wip-us.apache.org/repos/asf/activemq-nms-msmq/blob/16d8f06d/src/main/csharp/Readers/AbstractMessageReader.cs ---------------------------------------------------------------------- diff --git a/src/main/csharp/Readers/AbstractMessageReader.cs b/src/main/csharp/Readers/AbstractMessageReader.cs new file mode 100644 index 0000000..7874696 --- /dev/null +++ b/src/main/csharp/Readers/AbstractMessageReader.cs @@ -0,0 +1,126 @@ +using System; +using System.Messaging; +using Apache.NMS.MSMQ; +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.NMS.MSMQ.Readers +{ + /// <summary> + /// Abstract MSMQ message reader. Derived classes support various + /// message filtering methods. + /// </summary> + public abstract class AbstractMessageReader : IMessageReader + { + protected MessageQueue messageQueue; + protected IMessageConverter messageConverter; + protected IMessageConverterEx messageConverterEx; + + /// <summary> + /// Constructor. + /// </summary> + /// <param name="messageQueue">The MSMQ message queue from which + /// messages will be read.</param> + /// <param name="messageConverter">A message converter for mapping + /// MSMQ messages to NMS messages.</param> + public AbstractMessageReader(MessageQueue messageQueue, + IMessageConverter messageConverter) + { + this.messageQueue = messageQueue; + + this.messageConverter = messageConverter; + this.messageConverterEx = (messageConverter as IMessageConverterEx); + } + + /// <summary> + /// Returns without removing (peeks) the first message in the queue + /// referenced by this MessageQueue matching the selection criteria. + /// The Peek method is synchronous, so it blocks the current thread + /// until a message becomes available. + /// </summary> + /// <returns>Peeked message.</returns> + public abstract IMessage Peek(); + + /// <summary> + /// Returns without removing (peeks) the first message in the queue + /// referenced by this MessageQueue matching the selection criteria. + /// The Peek method is synchronous, so it blocks the current thread + /// until a message becomes available or the specified time-out occurs. + /// </summary> + /// <param name="timeSpan">Reception time-out.</param> + /// <returns>Peeked message.</returns> + public abstract IMessage Peek(TimeSpan timeSpan); + + /// <summary> + /// Receives the first message available in the queue referenced by + /// the MessageQueue matching the selection criteria. + /// This call is synchronous, and blocks the current thread of execution + /// until a message is available. + /// </summary> + /// <returns>Received message.</returns> + public abstract IMessage Receive(); + + /// <summary> + /// Receives the first message available in the queue referenced by the + /// MessageQueue matching the selection criteria, and waits until either + /// a message is available in the queue, or the time-out expires. + /// </summary> + /// <param name="timeSpan">Reception time-out.</param> + /// <returns>Received message.</returns> + public abstract IMessage Receive(TimeSpan timeSpan); + + /// <summary> + /// Receives the first message available in the transactional queue + /// referenced by the MessageQueue matching the selection criteria. + /// This call is synchronous, and blocks the current thread of execution + /// until a message is available. + /// </summary> + /// <param name="transaction">Transaction.</param> + /// <returns>Received message.</returns> + public abstract IMessage Receive(MessageQueueTransaction transaction); + + /// <summary> + /// Receives the first message available in the transactional queue + /// referenced by the MessageQueue matching the selection criteria, + /// and waits until either a message is available in the queue, or the + /// time-out expires. + /// </summary> + /// <param name="timeSpan">Reception time-out.</param> + /// <param name="transaction">Transaction.</param> + /// <returns>Received message.</returns> + public abstract IMessage Receive(TimeSpan timeSpan, + MessageQueueTransaction transaction); + + /// <summary> + /// Checks if an MSMQ message matches the selection criteria. + /// </summary> + /// <param name="message">MSMQ message.</param> + /// <return>true if the message matches the selection criteria.</return> + public abstract bool Matches(Message message); + + /// <summary> + /// Converts an MSMQ message to an NMS message, using the converter + /// specified at construction time. + /// </summary> + /// <param name="message">MSMQ message.</param> + /// <return>NMS message.</return> + protected IMessage Convert(Message message) + { + return message == null ? null : messageConverter.ToNmsMessage(message); + } + } +} http://git-wip-us.apache.org/repos/asf/activemq-nms-msmq/blob/16d8f06d/src/main/csharp/Readers/ByCorrelationIdMessageReader.cs ---------------------------------------------------------------------- diff --git a/src/main/csharp/Readers/ByCorrelationIdMessageReader.cs b/src/main/csharp/Readers/ByCorrelationIdMessageReader.cs new file mode 100644 index 0000000..fad3d1a --- /dev/null +++ b/src/main/csharp/Readers/ByCorrelationIdMessageReader.cs @@ -0,0 +1,139 @@ +using System; +using System.Messaging; +using Apache.NMS.MSMQ; +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.NMS.MSMQ.Readers +{ + /// <summary> + /// MSMQ message reader, returning messages matching the specified + /// message identifier. + /// </summary> + public class ByCorrelationIdMessageReader : AbstractMessageReader + { + private string correlationId; + + /// <summary> + /// Constructor. + /// </summary> + /// <param name="messageQueue">The MSMQ message queue from which + /// messages will be read.</param> + /// <param name="messageConverter">A message converter for mapping + /// MSMQ messages to NMS messages.</param> + /// <param name="correlationId">The correlation identifier of messages + /// to be read.</param> + public ByCorrelationIdMessageReader(MessageQueue messageQueue, + IMessageConverter messageConverter, string correlationId) + : base(messageQueue, messageConverter) + { + this.correlationId = correlationId; + } + + /// <summary> + /// Returns without removing (peeks) the first message in the queue + /// referenced by this MessageQueue matching the selection criteria. + /// The Peek method is synchronous, so it blocks the current thread + /// until a message becomes available. + /// </summary> + /// <returns>Peeked message.</returns> + public override IMessage Peek() + { + return Convert(messageQueue.PeekByCorrelationId(correlationId)); + } + + /// <summary> + /// Returns without removing (peeks) the first message in the queue + /// referenced by this MessageQueue matching the selection criteria. + /// The Peek method is synchronous, so it blocks the current thread + /// until a message becomes available or the specified time-out occurs. + /// </summary> + /// <param name="timeSpan">Reception time-out.</param> + /// <returns>Peeked message.</returns> + public override IMessage Peek(TimeSpan timeSpan) + { + return Convert(messageQueue.PeekByCorrelationId(correlationId, + timeSpan)); + } + + /// <summary> + /// Receives the first message available in the queue referenced by + /// the MessageQueue matching the selection criteria. + /// This call is synchronous, and blocks the current thread of execution + /// until a message is available. + /// </summary> + /// <returns>Received message.</returns> + public override IMessage Receive() + { + return Convert(messageQueue.ReceiveByCorrelationId(correlationId)); + } + + /// <summary> + /// Receives the first message available in the queue referenced by the + /// MessageQueue matching the selection criteria, and waits until either + /// a message is available in the queue, or the time-out expires. + /// </summary> + /// <param name="timeSpan">Reception time-out.</param> + /// <returns>Received message.</returns> + public override IMessage Receive(TimeSpan timeSpan) + { + return Convert(messageQueue.ReceiveByCorrelationId(correlationId, + timeSpan)); + } + + /// <summary> + /// Receives the first message available in the transactional queue + /// referenced by the MessageQueue matching the selection criteria. + /// This call is synchronous, and blocks the current thread of execution + /// until a message is available. + /// </summary> + /// <param name="transaction">Transaction.</param> + /// <returns>Received message.</returns> + public override IMessage Receive(MessageQueueTransaction transaction) + { + return Convert(messageQueue.ReceiveByCorrelationId(correlationId, + transaction)); + } + + /// <summary> + /// Receives the first message available in the transactional queue + /// referenced by the MessageQueue matching the selection criteria, + /// and waits until either a message is available in the queue, or the + /// time-out expires. + /// </summary> + /// <param name="timeSpan">Reception time-out.</param> + /// <param name="transaction">Transaction.</param> + /// <returns>Received message.</returns> + public override IMessage Receive(TimeSpan timeSpan, + MessageQueueTransaction transaction) + { + return Convert(messageQueue.ReceiveByCorrelationId(correlationId, + timeSpan, transaction)); + } + + /// <summary> + /// Checks if an MSMQ message matches the selection criteria. + /// </summary> + /// <param name="message">MSMQ message.</param> + /// <return>true if the message matches the selection criteria.</return> + public override bool Matches(Message message) + { + // NB: case-sensitive match + return message.CorrelationId == correlationId; + } + } +} http://git-wip-us.apache.org/repos/asf/activemq-nms-msmq/blob/16d8f06d/src/main/csharp/Readers/ByIdMessageReader.cs ---------------------------------------------------------------------- diff --git a/src/main/csharp/Readers/ByIdMessageReader.cs b/src/main/csharp/Readers/ByIdMessageReader.cs new file mode 100644 index 0000000..f981ca8 --- /dev/null +++ b/src/main/csharp/Readers/ByIdMessageReader.cs @@ -0,0 +1,136 @@ +using System; +using System.Messaging; +using Apache.NMS.MSMQ; +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.NMS.MSMQ.Readers +{ + /// <summary> + /// MSMQ message reader, returning messages matching the specified + /// message identifier. + /// </summary> + public class ByIdMessageReader : AbstractMessageReader + { + private string messageId; + + /// <summary> + /// Constructor. + /// </summary> + /// <param name="messageQueue">The MSMQ message queue from which + /// messages will be read.</param> + /// <param name="messageConverter">A message converter for mapping + /// MSMQ messages to NMS messages.</param> + /// <param name="messageId">The message identifier of messages to + /// be read.</param> + public ByIdMessageReader(MessageQueue messageQueue, + IMessageConverter messageConverter, string messageId) + : base(messageQueue, messageConverter) + { + this.messageId = messageId; + } + + /// <summary> + /// Returns without removing (peeks) the first message in the queue + /// referenced by this MessageQueue matching the selection criteria. + /// The Peek method is synchronous, so it blocks the current thread + /// until a message becomes available. + /// </summary> + /// <returns>Peeked message.</returns> + public override IMessage Peek() + { + return Convert(messageQueue.PeekById(messageId)); + } + + /// <summary> + /// Returns without removing (peeks) the first message in the queue + /// referenced by this MessageQueue matching the selection criteria. + /// The Peek method is synchronous, so it blocks the current thread + /// until a message becomes available or the specified time-out occurs. + /// </summary> + /// <param name="timeSpan">Reception time-out.</param> + /// <returns>Peeked message.</returns> + public override IMessage Peek(TimeSpan timeSpan) + { + return Convert(messageQueue.PeekById(messageId, timeSpan)); + } + + /// <summary> + /// Receives the first message available in the queue referenced by + /// the MessageQueue matching the selection criteria. + /// This call is synchronous, and blocks the current thread of execution + /// until a message is available. + /// </summary> + /// <returns>Received message.</returns> + public override IMessage Receive() + { + return Convert(messageQueue.ReceiveById(messageId)); + } + + /// <summary> + /// Receives the first message available in the queue referenced by the + /// MessageQueue matching the selection criteria, and waits until either + /// a message is available in the queue, or the time-out expires. + /// </summary> + /// <param name="timeSpan">Reception time-out.</param> + /// <returns>Received message.</returns> + public override IMessage Receive(TimeSpan timeSpan) + { + return Convert(messageQueue.ReceiveById(messageId, timeSpan)); + } + + /// <summary> + /// Receives the first message available in the transactional queue + /// referenced by the MessageQueue matching the selection criteria. + /// This call is synchronous, and blocks the current thread of execution + /// until a message is available. + /// </summary> + /// <param name="transaction">Transaction.</param> + /// <returns>Received message.</returns> + public override IMessage Receive(MessageQueueTransaction transaction) + { + return Convert(messageQueue.ReceiveById(messageId, transaction)); + } + + /// <summary> + /// Receives the first message available in the transactional queue + /// referenced by the MessageQueue matching the selection criteria, + /// and waits until either a message is available in the queue, or the + /// time-out expires. + /// </summary> + /// <param name="timeSpan">Reception time-out.</param> + /// <param name="transaction">Transaction.</param> + /// <returns>Received message.</returns> + public override IMessage Receive(TimeSpan timeSpan, + MessageQueueTransaction transaction) + { + return Convert(messageQueue.ReceiveById(messageId, timeSpan, + transaction)); + } + + /// <summary> + /// Checks if an MSMQ message matches the selection criteria. + /// </summary> + /// <param name="message">MSMQ message.</param> + /// <return>true if the message matches the selection criteria.</return> + public override bool Matches(Message message) + { + // NB: case-sensitive match + return message.Id == messageId; + } + } +} http://git-wip-us.apache.org/repos/asf/activemq-nms-msmq/blob/16d8f06d/src/main/csharp/Readers/ByLookupIdMessageReader.cs ---------------------------------------------------------------------- diff --git a/src/main/csharp/Readers/ByLookupIdMessageReader.cs b/src/main/csharp/Readers/ByLookupIdMessageReader.cs new file mode 100644 index 0000000..421c52b --- /dev/null +++ b/src/main/csharp/Readers/ByLookupIdMessageReader.cs @@ -0,0 +1,145 @@ +using System; +using System.Messaging; +using Apache.NMS.MSMQ; +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.NMS.MSMQ.Readers +{ + /// <summary> + /// MSMQ message reader, returning messages matching the specified + /// lookup identifier. + /// </summary> + public class ByLookupIdMessageReader : AbstractMessageReader + { + private Int64 lookupId; + + /// <summary> + /// Constructor. + /// </summary> + /// <param name="messageQueue">The MSMQ message queue from which + /// messages will be read.</param> + /// <param name="messageConverter">A message converter for mapping + /// MSMQ messages to NMS messages.</param> + /// <param name="lookupId">The lookup identifier of the message + /// to be read.</param> + public ByLookupIdMessageReader(MessageQueue messageQueue, + IMessageConverter messageConverter, Int64 lookupId) + : base(messageQueue, messageConverter) + { + this.lookupId = lookupId; + } + + /// <summary> + /// Returns without removing (peeks) the first message in the queue + /// referenced by this MessageQueue matching the selection criteria. + /// The Peek method is synchronous, so it blocks the current thread + /// until a message becomes available. + /// </summary> + /// <returns>Peeked message.</returns> + public override IMessage Peek() + { + return Convert(messageQueue.PeekByLookupId(lookupId)); + } + + /// <summary> + /// Returns without removing (peeks) the first message in the queue + /// referenced by this MessageQueue matching the selection criteria. + /// The Peek method is synchronous, so it blocks the current thread + /// until a message becomes available or the specified time-out occurs. + /// </summary> + /// <param name="timeSpan">Reception time-out.</param> + /// <returns>Peeked message.</returns> + public override IMessage Peek(TimeSpan timeSpan) + { + // No time-out option for receiving messages by lookup identifiers: + // either the message is present in the queue, or the method throws + // an exception immediately if the message is not in the queue. + return Convert(messageQueue.PeekByLookupId(lookupId)); + } + + /// <summary> + /// Receives the first message available in the queue referenced by + /// the MessageQueue matching the selection criteria. + /// This call is synchronous, and blocks the current thread of execution + /// until a message is available. + /// </summary> + /// <returns>Received message.</returns> + public override IMessage Receive() + { + return Convert(messageQueue.ReceiveByLookupId(lookupId)); + } + + /// <summary> + /// Receives the first message available in the queue referenced by the + /// MessageQueue matching the selection criteria, and waits until either + /// a message is available in the queue, or the time-out expires. + /// </summary> + /// <param name="timeSpan">Reception time-out.</param> + /// <returns>Received message.</returns> + public override IMessage Receive(TimeSpan timeSpan) + { + // No time-out option for receiving messages by lookup identifiers: + // either the message is present in the queue, or the method throws + // an exception immediately if the message is not in the queue. + return Convert(messageQueue.ReceiveByLookupId(lookupId)); + } + + /// <summary> + /// Receives the first message available in the transactional queue + /// referenced by the MessageQueue matching the selection criteria. + /// This call is synchronous, and blocks the current thread of execution + /// until a message is available. + /// </summary> + /// <param name="transaction">Transaction.</param> + /// <returns>Received message.</returns> + public override IMessage Receive(MessageQueueTransaction transaction) + { + return Convert(messageQueue.ReceiveByLookupId( + MessageLookupAction.Current, lookupId, transaction)); + } + + /// <summary> + /// Receives the first message available in the transactional queue + /// referenced by the MessageQueue matching the selection criteria, + /// and waits until either a message is available in the queue, or the + /// time-out expires. + /// </summary> + /// <param name="timeSpan">Reception time-out.</param> + /// <param name="transaction">Transaction.</param> + /// <returns>Received message.</returns> + public override IMessage Receive(TimeSpan timeSpan, + MessageQueueTransaction transaction) + { + // No time-out option for receiving messages by lookup identifiers: + // either the message is present in the queue, or the method throws + // an exception immediately if the message is not in the queue. + return Convert(messageQueue.ReceiveByLookupId( + MessageLookupAction.Current, lookupId, transaction)); + } + + /// <summary> + /// Checks if an MSMQ message matches the selection criteria. + /// </summary> + /// <param name="message">MSMQ message.</param> + /// <return>true if the message matches the selection criteria.</return> + public override bool Matches(Message message) + { + return message.LookupId == lookupId; + } + } +} http://git-wip-us.apache.org/repos/asf/activemq-nms-msmq/blob/16d8f06d/src/main/csharp/Readers/BySelectorMessageReader.cs ---------------------------------------------------------------------- diff --git a/src/main/csharp/Readers/BySelectorMessageReader.cs b/src/main/csharp/Readers/BySelectorMessageReader.cs new file mode 100644 index 0000000..e7cd5c3 --- /dev/null +++ b/src/main/csharp/Readers/BySelectorMessageReader.cs @@ -0,0 +1,290 @@ +using System; +using System.Messaging; +using Apache.NMS.MSMQ; +using Apache.NMS; +using Apache.NMS.Selector; +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.NMS.MSMQ.Readers +{ + /// <summary> + /// MSMQ message reader, returning messages matching the specified + /// selector. + /// </summary> + public class BySelectorMessageReader : AbstractMessageReader + { + private string selector; + private MessageEvaluationContext evaluationContext; + private IBooleanExpression selectionExpression; + + /// <summary> + /// Constructor. + /// </summary> + /// <param name="messageQueue">The MSMQ message queue from which + /// messages will be read.</param> + /// <param name="messageConverter">A message converter for mapping + /// MSMQ messages to NMS messages.</param> + /// <param name="selector">The selector string.</param> + public BySelectorMessageReader(MessageQueue messageQueue, + IMessageConverter messageConverter, string selector) + : base(messageQueue, messageConverter) + { + this.selector = selector; + + SelectorParser selectorParser = new SelectorParser(); + selectionExpression = selectorParser.Parse(selector); + + evaluationContext = new MessageEvaluationContext(null); + } + + /// <summary> + /// Returns without removing (peeks) the first message in the queue + /// referenced by this MessageQueue matching the selection criteria. + /// The Peek method is synchronous, so it blocks the current thread + /// until a message becomes available. + /// </summary> + /// <returns>Peeked message.</returns> + public override IMessage Peek() + { + return InternalPeek(DateTime.MaxValue, true); + } + + /// <summary> + /// Returns without removing (peeks) the first message in the queue + /// referenced by this MessageQueue matching the selection criteria. + /// The Peek method is synchronous, so it blocks the current thread + /// until a message becomes available or the specified time-out occurs. + /// </summary> + /// <param name="timeSpan">Reception time-out.</param> + /// <returns>Peeked message.</returns> + public override IMessage Peek(TimeSpan timeSpan) + { + DateTime maxTime = DateTime.Now + timeSpan; + return InternalPeek(maxTime, true); + } + + /// <summary> + /// Receives the first message available in the queue referenced by + /// the MessageQueue matching the selection criteria. + /// This call is synchronous, and blocks the current thread of execution + /// until a message is available. + /// </summary> + /// <returns>Received message.</returns> + public override IMessage Receive() + { + return InternalReceive(DateTime.MaxValue, null); + } + + /// <summary> + /// Receives the first message available in the queue referenced by the + /// MessageQueue matching the selection criteria, and waits until either + /// a message is available in the queue, or the time-out expires. + /// </summary> + /// <param name="timeSpan">Reception time-out.</param> + /// <returns>Received message.</returns> + public override IMessage Receive(TimeSpan timeSpan) + { + return InternalReceive(DateTime.Now + timeSpan, null); + } + + /// <summary> + /// Receives the first message available in the transactional queue + /// referenced by the MessageQueue matching the selection criteria. + /// This call is synchronous, and blocks the current thread of execution + /// until a message is available. + /// </summary> + /// <param name="transaction">Transaction.</param> + /// <returns>Received message.</returns> + public override IMessage Receive(MessageQueueTransaction transaction) + { + return InternalReceive(DateTime.MaxValue, transaction); + } + + /// <summary> + /// Receives the first message available in the transactional queue + /// referenced by the MessageQueue matching the selection criteria, + /// and waits until either a message is available in the queue, or the + /// time-out expires. + /// </summary> + /// <param name="timeSpan">Reception time-out.</param> + /// <param name="transaction">Transaction.</param> + /// <returns>Received message.</returns> + public override IMessage Receive(TimeSpan timeSpan, + MessageQueueTransaction transaction) + { + return InternalReceive(DateTime.Now + timeSpan, transaction); + } + + /// <summary> + /// Receives the first message available in the transactional queue + /// referenced by the MessageQueue matching the selection criteria, + /// and waits until either a message is available in the queue, or the + /// time-out expires. + /// </summary> + /// <param name="maxTime">Reception time-out.</param> + /// <param name="transaction">Transaction.</param> + /// <returns>Received message.</returns> + public IMessage InternalReceive(DateTime maxTime, + MessageQueueTransaction transaction) + { + // In a shared connection / multi-consumer context, the message may + // have been consumed by another client, after it was peeked but + // before it was peeked by this client. Hence the loop. + // (not sure it can be shared AND transactional, though). + while(true) + { + IMessage peekedMessage = InternalPeek(maxTime, false); + + if(peekedMessage == null) + { + return null; + } + + try + { + long lookupId = peekedMessage.Properties.GetLong("LookupId"); + + Message message = (transaction == null ? + messageQueue.ReceiveByLookupId(lookupId) : + messageQueue.ReceiveByLookupId( + MessageLookupAction.Current, lookupId, transaction)); + + return Convert(message); + } + catch(InvalidOperationException exc) + { + // TODO: filter exceptions, catch only exceptions due to + // unknown lookup id. + } + } + } + + /// <summary> + /// Returns without removing (peeks) the first message in the queue + /// referenced by this MessageQueue, matching the selection criteria. + /// </summary> + /// <param name="maxTime">Reception time-out.</param> + /// <param name="convertBody">true if message body should be converted.</param> + /// <returns>Peeked message.</returns> + private IMessage InternalPeek(DateTime maxTime, bool convertBody) + { + TimeSpan timeSpan = maxTime - DateTime.Now; + if(timeSpan <= TimeSpan.Zero) + { + timeSpan = TimeSpan.Zero; + } + + Cursor cursor = messageQueue.CreateCursor(); + + PeekAction action = PeekAction.Current; + + while(true) + { + Message msmqMessage = null; + + try + { + msmqMessage = messageQueue.Peek(timeSpan, cursor, action); + } + catch(MessageQueueException exc) + { + if(exc.MessageQueueErrorCode != MessageQueueErrorCode.IOTimeout) + { + throw exc; + } + } + + if(msmqMessage == null) + { + return null; + } + + IMessage nmsMessage = InternalMatch(msmqMessage, convertBody); + + if(nmsMessage != null) + { + return nmsMessage; + } + + action = PeekAction.Next; + } + } + + /// <summary> + /// Checks if an MSMQ message matches the selection criteria. If matched + /// the method returns the converted NMS message. Else it returns null. + /// </summary> + /// <param name="message">The MSMQ message to check.</param> + /// <param name="convertBody">true if the message body should be + /// converted.</param> + /// <returns>The matching message converted to NMS, or null.</returns> + private IMessage InternalMatch(Message message, bool convertBody) + { + if(messageConverterEx == null) + { + IMessage nmsMessage = messageConverter.ToNmsMessage(message); + + evaluationContext.Message = nmsMessage; + + if(selectionExpression.Matches(evaluationContext)) + { + return nmsMessage; + } + } + else + { + // This version converts the message body only for those + // messages matching the selection criteria. + // Relies on MessageConverterEx for partial conversions. + IMessage nmsMessage = messageConverterEx.ToNmsMessage( + message, false); + + evaluationContext.Message = nmsMessage; + + if(selectionExpression.Matches(evaluationContext)) + { + if(convertBody) + { + messageConverterEx.ConvertMessageBodyToNMS( + message, nmsMessage); + } + + return nmsMessage; + } + } + + return null; + } + + /// <summary> + /// Checks if an MSMQ message matches the selection criteria. + /// </summary> + /// <param name="message">MSMQ message.</param> + /// <return>true if the message matches the selection criteria.</return> + public override bool Matches(Message message) + { + IMessage nmsMessage = messageConverterEx == null ? + messageConverter.ToNmsMessage(message) : + messageConverterEx.ToNmsMessage(message, false); + + evaluationContext.Message = nmsMessage; + + return selectionExpression.Matches(evaluationContext); + } + } +} http://git-wip-us.apache.org/repos/asf/activemq-nms-msmq/blob/16d8f06d/src/main/csharp/Readers/IMessageReader.cs ---------------------------------------------------------------------- diff --git a/src/main/csharp/Readers/IMessageReader.cs b/src/main/csharp/Readers/IMessageReader.cs new file mode 100644 index 0000000..8168664 --- /dev/null +++ b/src/main/csharp/Readers/IMessageReader.cs @@ -0,0 +1,93 @@ +using System; +using System.Messaging; +using Apache.NMS.MSMQ; +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.NMS.MSMQ.Readers +{ + /// <summary> + /// MSMQ message reader. + /// </summary> + public interface IMessageReader + { + /// <summary> + /// Returns without removing (peeks) the first message in the queue + /// referenced by this MessageQueue matching the selection criteria. + /// The Peek method is synchronous, so it blocks the current thread + /// until a message becomes available. + /// </summary> + /// <returns>Peeked message.</returns> + IMessage Peek(); + + /// <summary> + /// Returns without removing (peeks) the first message in the queue + /// referenced by this MessageQueue matching the selection criteria. + /// The Peek method is synchronous, so it blocks the current thread + /// until a message becomes available or the specified time-out occurs. + /// </summary> + /// <param name="timeSpan">Reception time-out.</param> + /// <returns>Peeked message.</returns> + IMessage Peek(TimeSpan timeSpan); + + /// <summary> + /// Receives the first message available in the queue referenced by + /// the MessageQueue matching the selection criteria. + /// This call is synchronous, and blocks the current thread of execution + /// until a message is available. + /// </summary> + /// <returns>Received message.</returns> + IMessage Receive(); + + /// <summary> + /// Receives the first message available in the queue referenced by the + /// MessageQueue matching the selection criteria, and waits until either + /// a message is available in the queue, or the time-out expires. + /// </summary> + /// <param name="timeSpan">Reception time-out.</param> + /// <returns>Received message.</returns> + IMessage Receive(TimeSpan timeSpan); + + /// <summary> + /// Receives the first message available in the transactional queue + /// referenced by the MessageQueue matching the selection criteria. + /// This call is synchronous, and blocks the current thread of execution + /// until a message is available. + /// </summary> + /// <param name="transaction">Transaction.</param> + /// <returns>Received message.</returns> + IMessage Receive(MessageQueueTransaction transaction); + + /// <summary> + /// Receives the first message available in the transactional queue + /// referenced by the MessageQueue matching the selection criteria, + /// and waits until either a message is available in the queue, or the + /// time-out expires. + /// </summary> + /// <param name="timeSpan">Reception time-out.</param> + /// <param name="transaction">Transaction.</param> + /// <returns>Received message.</returns> + IMessage Receive(TimeSpan timeSpan, MessageQueueTransaction transaction); + + /// <summary> + /// Checks if an MSMQ message matches the selection criteria. + /// </summary> + /// <param name="message">MSMQ message.</param> + /// <return>true if the message matches the selection criteria.</return> + bool Matches(Message message); + } +}
