Author: jgomes
Date: Thu Oct  9 01:38:53 2014
New Revision: 1630267

URL: http://svn.apache.org/r1630267
Log:
Add support for serializing/deserializing BytesMessages.
Fixes [AMQNET-491]. (See https://issues.apache.org/jira/browse/AMQNET-491)

Modified:
    activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/BaseMessage.cs
    activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Destination.cs
    
activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/MessageConsumer.cs
    
activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/MessageProducer.cs
    
activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/TemporaryQueue.cs
    
activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/TemporaryTopic.cs

Modified: 
activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/BaseMessage.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/BaseMessage.cs?rev=1630267&r1=1630266&r2=1630267&view=diff
==============================================================================
--- 
activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/BaseMessage.cs 
(original)
+++ 
activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/BaseMessage.cs 
Thu Oct  9 01:38:53 2014
@@ -36,12 +36,19 @@ namespace Apache.NMS.ZMQ
                private string type;
                private event AcknowledgeHandler Acknowledger;
                private DateTime timestamp = new DateTime();
+               private bool readOnlyMsgProperties = false;
                private bool readOnlyMsgBody = false;
 
-               public bool ReadOnlyBody
+               public virtual bool ReadOnlyProperties
                {
-                       get { return readOnlyMsgBody; }
-                       set { readOnlyMsgBody = value; }
+                       get { return this.readOnlyMsgProperties; }
+                       set { this.readOnlyMsgProperties = value; }
+               }
+
+               public virtual bool ReadOnlyBody
+               {
+                       get { return this.readOnlyMsgBody; }
+                       set { this.readOnlyMsgBody = value; }
                }
 
                // IMessage interface
@@ -155,7 +162,6 @@ namespace Apache.NMS.ZMQ
                        set { }
                }
 
-
                /// <summary>
                /// The destination that the consumer of this message should 
send replies to
                /// </summary>
@@ -190,7 +196,6 @@ namespace Apache.NMS.ZMQ
                        set { type = value; }
                }
 
-
                public object GetObjectProperty(string name)
                {
                        return null;
@@ -200,9 +205,15 @@ namespace Apache.NMS.ZMQ
                {
                }
 
+               public virtual void OnSend()
+               {
+                       this.ReadOnlyProperties = true;
+                       this.ReadOnlyBody = true;
+               }
+
                protected void FailIfReadOnlyBody()
                {
-                       if(ReadOnlyBody == true)
+                       if(ReadOnlyBody)
                        {
                                throw new MessageNotWriteableException("Message 
is in Read-Only mode.");
                        }
@@ -210,7 +221,7 @@ namespace Apache.NMS.ZMQ
 
                protected void FailIfWriteOnlyBody()
                {
-                       if(ReadOnlyBody == false)
+                       if(!ReadOnlyBody)
                        {
                                throw new MessageNotReadableException("Message 
is in Write-Only mode.");
                        }

Modified: 
activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Destination.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Destination.cs?rev=1630267&r1=1630266&r2=1630267&view=diff
==============================================================================
--- 
activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Destination.cs 
(original)
+++ 
activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Destination.cs 
Thu Oct  9 01:38:53 2014
@@ -36,6 +36,7 @@ namespace Apache.NMS.ZMQ
                protected ZmqSocket producerEndpoint = null;
                protected ZmqSocket consumerEndpoint = null;
                protected string destinationName;
+               internal byte[] rawDestinationName;
 
                private bool disposed = false;
 
@@ -47,6 +48,7 @@ namespace Apache.NMS.ZMQ
                {
                        this.session = session;
                        this.destinationName = destName;
+                       this.rawDestinationName = 
Destination.encoding.GetBytes(this.destinationName);
                        this.session.RegisterDestination(this);
                }
 
@@ -88,23 +90,8 @@ namespace Apache.NMS.ZMQ
                /// </summary>
                protected virtual void OnDispose()
                {
-                       if(null != this.producerEndpoint)
-                       {
-                               if(null != this.session
-                                       && null != this.session.Connection)
-                               {
-                                       
this.session.Connection.ReleaseProducer(this.producerEndpoint);
-                               }
-
-                               this.producerEndpoint = null;
-                       }
-
-                       if(null != this.consumerEndpoint)
-                       {
-                               
this.session.Connection.ReleaseConsumer(this.consumerEndpoint);
-                               this.consumerEndpoint = null;
-                       }
-
+                       DeinitSender();
+                       DeinitReceiver();
                        this.session.UnregisterDestination(this);
                }
 
@@ -190,6 +177,20 @@ namespace Apache.NMS.ZMQ
                        }
                }
 
+               internal void DeinitSender()
+               {
+                       if(null != this.producerEndpoint)
+                       {
+                               if(null != this.session
+                                       && null != this.session.Connection)
+                               {
+                                       
this.session.Connection.ReleaseProducer(this.producerEndpoint);
+                               }
+
+                               this.producerEndpoint = null;
+                       }
+               }
+
                internal void InitReceiver()
                {
                        if(null == this.consumerEndpoint)
@@ -198,34 +199,27 @@ namespace Apache.NMS.ZMQ
 
                                this.consumerEndpoint = 
connection.GetConsumer();
                                // Must subscribe first before connecting to 
the endpoint binding
-                               
this.consumerEndpoint.Subscribe(Destination.encoding.GetBytes(this.destinationName));
+                               
this.consumerEndpoint.Subscribe(this.rawDestinationName);
                                
this.consumerEndpoint.Connect(connection.GetConsumerBindingPath());
                        }
                }
 
-               internal void Subscribe(string prefixName)
-               {
-                       InitReceiver();
-                       
this.consumerEndpoint.Subscribe(Destination.encoding.GetBytes(prefixName));
-               }
-
-               internal void Unsubscribe(string prefixName)
+               internal void DeinitReceiver()
                {
                        if(null != this.consumerEndpoint)
                        {
-                               
this.consumerEndpoint.Unsubscribe(Destination.encoding.GetBytes(prefixName));
+                               
this.session.Connection.ReleaseConsumer(this.consumerEndpoint);
+                               this.consumerEndpoint = null;
                        }
                }
 
                internal SendStatus Send(string msg)
                {
-                       Debug.Assert(null != this.producerEndpoint, "Call 
InitSender() before calling Send().");
                        return this.producerEndpoint.Send(msg, 
Destination.encoding);
                }
 
                internal SendStatus Send(byte[] buffer)
                {
-                       Debug.Assert(null != this.producerEndpoint, "Call 
InitSender() before calling Send().");
                        return this.producerEndpoint.Send(buffer);
                }
 
@@ -246,20 +240,6 @@ namespace Apache.NMS.ZMQ
                        this.InitReceiver();
                        return this.consumerEndpoint.Receive(null, flags, out 
size);
                }
-
-               internal Frame ReceiveFrame()
-               {
-                       // TODO: Implement
-                       this.InitReceiver();
-                       return null;
-               }
-
-               internal ZmqMessage ReceiveMessage()
-               {
-                       // TODO: Implement
-                       this.InitReceiver();
-                       return null;
-               }
        }
 }
 

Modified: 
activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/MessageConsumer.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/MessageConsumer.cs?rev=1630267&r1=1630266&r2=1630267&view=diff
==============================================================================
--- 
activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/MessageConsumer.cs
 (original)
+++ 
activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/MessageConsumer.cs
 Thu Oct  9 01:38:53 2014
@@ -40,7 +40,6 @@ namespace Apache.NMS.ZMQ
                private object asyncDeliveryLock = new object();
                private bool asyncDelivery = false;
                private bool asyncInit = false;
-               private byte[] rawDestinationName;
 
                private ConsumerTransformerDelegate consumerTransformer;
                public ConsumerTransformerDelegate ConsumerTransformer
@@ -82,7 +81,6 @@ namespace Apache.NMS.ZMQ
 
                        this.session = sess;
                        this.destination = theDest;
-                       this.rawDestinationName = 
Destination.encoding.GetBytes(this.destination.Name);
                        this.acknowledgementMode = ackMode;
                }
 
@@ -145,7 +143,7 @@ namespace Apache.NMS.ZMQ
                        if(size > 0)
                        {
                                // Strip off the subscribed destination name.
-                               int receivedMsgIndex = 
this.rawDestinationName.Length;
+                               int receivedMsgIndex = 
this.destination.rawDestinationName.Length;
                                int msgLength = receivedMsg.Length - 
receivedMsgIndex;
                                byte[] msgContent = new byte[msgLength];
 
@@ -406,6 +404,14 @@ namespace Apache.NMS.ZMQ
                                }
                                break;
 
+                       case WireFormat.MT_BYTESMESSAGE:
+                               nmsMessage = new BytesMessage();
+                               if(null != messageBody)
+                               {
+                                       ((BytesMessage) nmsMessage).Content = 
messageBody;
+                               }
+                               break;
+
                        case WireFormat.MT_UNKNOWN:
                        default:
                                break;
@@ -444,6 +450,9 @@ namespace Apache.NMS.ZMQ
                                                nmsMessage = transformedMessage 
as BaseMessage;
                                        }
                                }
+
+                               nmsMessage.ReadOnlyBody = true;
+                               nmsMessage.ReadOnlyProperties = true;
                        }
 
                        return nmsMessage;

Modified: 
activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/MessageProducer.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/MessageProducer.cs?rev=1630267&r1=1630266&r2=1630267&view=diff
==============================================================================
--- 
activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/MessageProducer.cs
 (original)
+++ 
activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/MessageProducer.cs
 Thu Oct  9 01:38:53 2014
@@ -19,8 +19,8 @@
 
 using System;
 using System.Collections.Generic;
-using System.Text;
 using System.Net;
+using System.Text;
 using Apache.NMS.Util;
 
 namespace Apache.NMS.ZMQ
@@ -48,13 +48,35 @@ namespace Apache.NMS.ZMQ
 
                public MessageProducer(Session sess, IDestination dest)
                {
-                       if(null == sess.Connection.Context)
+                       if(null == sess
+                               || null == sess.Connection
+                               || null == sess.Connection.Context)
                        {
                                throw new NMSConnectionException();
                        }
 
+                       Destination theDest = dest as Destination;
+
+                       if(null == theDest)
+                       {
+                               throw new InvalidDestinationException("Consumer 
cannot receive on Null Destinations.");
+                       }
+                       else if(null == theDest.Name)
+                       {
+                               throw new InvalidDestinationException("The 
destination object was not given a physical name.");
+                       }
+                       else if(theDest.IsTemporary)
+                       {
+                               String physicalName = theDest.Name;
+
+                               if(String.IsNullOrEmpty(physicalName))
+                               {
+                                       throw new 
InvalidDestinationException("Physical name of Destination should be valid: " + 
theDest);
+                               }
+                       }
+
                        this.session = sess;
-                       this.destination = (Destination) dest;
+                       this.destination = theDest;
                        this.destination.InitSender();
                }
 
@@ -150,6 +172,17 @@ namespace Apache.NMS.ZMQ
                                        EncodeField(msgDataBuilder, 
WireFormat.MFT_BODY, msgBody);
                                }
                        }
+                       else if(message is IBytesMessage)
+                       {
+                               EncodeField(msgDataBuilder, 
WireFormat.MFT_MSGTYPE, WireFormat.MT_BYTESMESSAGE);
+                               // Append the message text body to the msg.
+                               byte[] msgBody = ((IBytesMessage) 
message).Content;
+
+                               if(null != msgBody)
+                               {
+                                       EncodeField(msgDataBuilder, 
WireFormat.MFT_BODY, msgBody);
+                               }
+                       }
                        else
                        {
                                // TODO: Add support for more message types
@@ -158,6 +191,8 @@ namespace Apache.NMS.ZMQ
 
                        // Put the sentinal field marker.
                        EncodeField(msgDataBuilder, WireFormat.MFT_NONE, 0);
+
+                       ((BaseMessage) message).OnSend();
                        theDest.Send(msgDataBuilder.ToArray());
                }
 

Modified: 
activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/TemporaryQueue.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/TemporaryQueue.cs?rev=1630267&r1=1630266&r2=1630267&view=diff
==============================================================================
--- 
activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/TemporaryQueue.cs 
(original)
+++ 
activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/TemporaryQueue.cs 
Thu Oct  9 01:38:53 2014
@@ -25,7 +25,7 @@ namespace Apache.NMS.ZMQ
        public class TemporaryQueue : Destination, ITemporaryQueue
        {
                public TemporaryQueue(Session session)
-                       : base(session, Guid.NewGuid().ToString())
+                       : base(session, "TEMPQUEUE." + 
Guid.NewGuid().ToString())
                {
                }
 

Modified: 
activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/TemporaryTopic.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/TemporaryTopic.cs?rev=1630267&r1=1630266&r2=1630267&view=diff
==============================================================================
--- 
activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/TemporaryTopic.cs 
(original)
+++ 
activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/TemporaryTopic.cs 
Thu Oct  9 01:38:53 2014
@@ -25,7 +25,7 @@ namespace Apache.NMS.ZMQ
        public class TemporaryTopic : Destination, ITemporaryTopic
        {
                public TemporaryTopic(Session session)
-                       : base(session, Guid.NewGuid().ToString())
+                       : base(session, "TEMPTOPIC." + 
Guid.NewGuid().ToString())
                {
                }
 


Reply via email to