Author: jgomes Date: Thu Oct 9 01:38:53 2014 New Revision: 1630267 URL: http://svn.apache.org/r1630267 Log: Add support for serializing/deserializing BytesMessages. Fixes [AMQNET-491]. (See https://issues.apache.org/jira/browse/AMQNET-491)
Modified: activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/BaseMessage.cs activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Destination.cs activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/MessageConsumer.cs activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/MessageProducer.cs activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/TemporaryQueue.cs activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/TemporaryTopic.cs Modified: activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/BaseMessage.cs URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/BaseMessage.cs?rev=1630267&r1=1630266&r2=1630267&view=diff ============================================================================== --- activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/BaseMessage.cs (original) +++ activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/BaseMessage.cs Thu Oct 9 01:38:53 2014 @@ -36,12 +36,19 @@ namespace Apache.NMS.ZMQ private string type; private event AcknowledgeHandler Acknowledger; private DateTime timestamp = new DateTime(); + private bool readOnlyMsgProperties = false; private bool readOnlyMsgBody = false; - public bool ReadOnlyBody + public virtual bool ReadOnlyProperties { - get { return readOnlyMsgBody; } - set { readOnlyMsgBody = value; } + get { return this.readOnlyMsgProperties; } + set { this.readOnlyMsgProperties = value; } + } + + public virtual bool ReadOnlyBody + { + get { return this.readOnlyMsgBody; } + set { this.readOnlyMsgBody = value; } } // IMessage interface @@ -155,7 +162,6 @@ namespace Apache.NMS.ZMQ set { } } - /// <summary> /// The destination that the consumer of this message should send replies to /// </summary> @@ -190,7 +196,6 @@ namespace Apache.NMS.ZMQ set { type = value; } } - public object GetObjectProperty(string name) { return null; @@ -200,9 +205,15 @@ namespace Apache.NMS.ZMQ { } + public virtual void OnSend() + { + this.ReadOnlyProperties = true; + this.ReadOnlyBody = true; + } + protected void FailIfReadOnlyBody() { - if(ReadOnlyBody == true) + if(ReadOnlyBody) { throw new MessageNotWriteableException("Message is in Read-Only mode."); } @@ -210,7 +221,7 @@ namespace Apache.NMS.ZMQ protected void FailIfWriteOnlyBody() { - if(ReadOnlyBody == false) + if(!ReadOnlyBody) { throw new MessageNotReadableException("Message is in Write-Only mode."); } Modified: activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Destination.cs URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Destination.cs?rev=1630267&r1=1630266&r2=1630267&view=diff ============================================================================== --- activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Destination.cs (original) +++ activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Destination.cs Thu Oct 9 01:38:53 2014 @@ -36,6 +36,7 @@ namespace Apache.NMS.ZMQ protected ZmqSocket producerEndpoint = null; protected ZmqSocket consumerEndpoint = null; protected string destinationName; + internal byte[] rawDestinationName; private bool disposed = false; @@ -47,6 +48,7 @@ namespace Apache.NMS.ZMQ { this.session = session; this.destinationName = destName; + this.rawDestinationName = Destination.encoding.GetBytes(this.destinationName); this.session.RegisterDestination(this); } @@ -88,23 +90,8 @@ namespace Apache.NMS.ZMQ /// </summary> protected virtual void OnDispose() { - if(null != this.producerEndpoint) - { - if(null != this.session - && null != this.session.Connection) - { - this.session.Connection.ReleaseProducer(this.producerEndpoint); - } - - this.producerEndpoint = null; - } - - if(null != this.consumerEndpoint) - { - this.session.Connection.ReleaseConsumer(this.consumerEndpoint); - this.consumerEndpoint = null; - } - + DeinitSender(); + DeinitReceiver(); this.session.UnregisterDestination(this); } @@ -190,6 +177,20 @@ namespace Apache.NMS.ZMQ } } + internal void DeinitSender() + { + if(null != this.producerEndpoint) + { + if(null != this.session + && null != this.session.Connection) + { + this.session.Connection.ReleaseProducer(this.producerEndpoint); + } + + this.producerEndpoint = null; + } + } + internal void InitReceiver() { if(null == this.consumerEndpoint) @@ -198,34 +199,27 @@ namespace Apache.NMS.ZMQ this.consumerEndpoint = connection.GetConsumer(); // Must subscribe first before connecting to the endpoint binding - this.consumerEndpoint.Subscribe(Destination.encoding.GetBytes(this.destinationName)); + this.consumerEndpoint.Subscribe(this.rawDestinationName); this.consumerEndpoint.Connect(connection.GetConsumerBindingPath()); } } - internal void Subscribe(string prefixName) - { - InitReceiver(); - this.consumerEndpoint.Subscribe(Destination.encoding.GetBytes(prefixName)); - } - - internal void Unsubscribe(string prefixName) + internal void DeinitReceiver() { if(null != this.consumerEndpoint) { - this.consumerEndpoint.Unsubscribe(Destination.encoding.GetBytes(prefixName)); + this.session.Connection.ReleaseConsumer(this.consumerEndpoint); + this.consumerEndpoint = null; } } internal SendStatus Send(string msg) { - Debug.Assert(null != this.producerEndpoint, "Call InitSender() before calling Send()."); return this.producerEndpoint.Send(msg, Destination.encoding); } internal SendStatus Send(byte[] buffer) { - Debug.Assert(null != this.producerEndpoint, "Call InitSender() before calling Send()."); return this.producerEndpoint.Send(buffer); } @@ -246,20 +240,6 @@ namespace Apache.NMS.ZMQ this.InitReceiver(); return this.consumerEndpoint.Receive(null, flags, out size); } - - internal Frame ReceiveFrame() - { - // TODO: Implement - this.InitReceiver(); - return null; - } - - internal ZmqMessage ReceiveMessage() - { - // TODO: Implement - this.InitReceiver(); - return null; - } } } Modified: activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/MessageConsumer.cs URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/MessageConsumer.cs?rev=1630267&r1=1630266&r2=1630267&view=diff ============================================================================== --- activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/MessageConsumer.cs (original) +++ activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/MessageConsumer.cs Thu Oct 9 01:38:53 2014 @@ -40,7 +40,6 @@ namespace Apache.NMS.ZMQ private object asyncDeliveryLock = new object(); private bool asyncDelivery = false; private bool asyncInit = false; - private byte[] rawDestinationName; private ConsumerTransformerDelegate consumerTransformer; public ConsumerTransformerDelegate ConsumerTransformer @@ -82,7 +81,6 @@ namespace Apache.NMS.ZMQ this.session = sess; this.destination = theDest; - this.rawDestinationName = Destination.encoding.GetBytes(this.destination.Name); this.acknowledgementMode = ackMode; } @@ -145,7 +143,7 @@ namespace Apache.NMS.ZMQ if(size > 0) { // Strip off the subscribed destination name. - int receivedMsgIndex = this.rawDestinationName.Length; + int receivedMsgIndex = this.destination.rawDestinationName.Length; int msgLength = receivedMsg.Length - receivedMsgIndex; byte[] msgContent = new byte[msgLength]; @@ -406,6 +404,14 @@ namespace Apache.NMS.ZMQ } break; + case WireFormat.MT_BYTESMESSAGE: + nmsMessage = new BytesMessage(); + if(null != messageBody) + { + ((BytesMessage) nmsMessage).Content = messageBody; + } + break; + case WireFormat.MT_UNKNOWN: default: break; @@ -444,6 +450,9 @@ namespace Apache.NMS.ZMQ nmsMessage = transformedMessage as BaseMessage; } } + + nmsMessage.ReadOnlyBody = true; + nmsMessage.ReadOnlyProperties = true; } return nmsMessage; Modified: activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/MessageProducer.cs URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/MessageProducer.cs?rev=1630267&r1=1630266&r2=1630267&view=diff ============================================================================== --- activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/MessageProducer.cs (original) +++ activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/MessageProducer.cs Thu Oct 9 01:38:53 2014 @@ -19,8 +19,8 @@ using System; using System.Collections.Generic; -using System.Text; using System.Net; +using System.Text; using Apache.NMS.Util; namespace Apache.NMS.ZMQ @@ -48,13 +48,35 @@ namespace Apache.NMS.ZMQ public MessageProducer(Session sess, IDestination dest) { - if(null == sess.Connection.Context) + if(null == sess + || null == sess.Connection + || null == sess.Connection.Context) { throw new NMSConnectionException(); } + Destination theDest = dest as Destination; + + if(null == theDest) + { + throw new InvalidDestinationException("Consumer cannot receive on Null Destinations."); + } + else if(null == theDest.Name) + { + throw new InvalidDestinationException("The destination object was not given a physical name."); + } + else if(theDest.IsTemporary) + { + String physicalName = theDest.Name; + + if(String.IsNullOrEmpty(physicalName)) + { + throw new InvalidDestinationException("Physical name of Destination should be valid: " + theDest); + } + } + this.session = sess; - this.destination = (Destination) dest; + this.destination = theDest; this.destination.InitSender(); } @@ -150,6 +172,17 @@ namespace Apache.NMS.ZMQ EncodeField(msgDataBuilder, WireFormat.MFT_BODY, msgBody); } } + else if(message is IBytesMessage) + { + EncodeField(msgDataBuilder, WireFormat.MFT_MSGTYPE, WireFormat.MT_BYTESMESSAGE); + // Append the message text body to the msg. + byte[] msgBody = ((IBytesMessage) message).Content; + + if(null != msgBody) + { + EncodeField(msgDataBuilder, WireFormat.MFT_BODY, msgBody); + } + } else { // TODO: Add support for more message types @@ -158,6 +191,8 @@ namespace Apache.NMS.ZMQ // Put the sentinal field marker. EncodeField(msgDataBuilder, WireFormat.MFT_NONE, 0); + + ((BaseMessage) message).OnSend(); theDest.Send(msgDataBuilder.ToArray()); } Modified: activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/TemporaryQueue.cs URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/TemporaryQueue.cs?rev=1630267&r1=1630266&r2=1630267&view=diff ============================================================================== --- activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/TemporaryQueue.cs (original) +++ activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/TemporaryQueue.cs Thu Oct 9 01:38:53 2014 @@ -25,7 +25,7 @@ namespace Apache.NMS.ZMQ public class TemporaryQueue : Destination, ITemporaryQueue { public TemporaryQueue(Session session) - : base(session, Guid.NewGuid().ToString()) + : base(session, "TEMPQUEUE." + Guid.NewGuid().ToString()) { } Modified: activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/TemporaryTopic.cs URL: http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/TemporaryTopic.cs?rev=1630267&r1=1630266&r2=1630267&view=diff ============================================================================== --- activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/TemporaryTopic.cs (original) +++ activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/TemporaryTopic.cs Thu Oct 9 01:38:53 2014 @@ -25,7 +25,7 @@ namespace Apache.NMS.ZMQ public class TemporaryTopic : Destination, ITemporaryTopic { public TemporaryTopic(Session session) - : base(session, Guid.NewGuid().ToString()) + : base(session, "TEMPTOPIC." + Guid.NewGuid().ToString()) { }