Author: jgomes
Date: Wed Oct  8 00:59:46 2014
New Revision: 1629998

URL: http://svn.apache.org/r1629998
Log:
Support serializing/deserializing NMS properties on the wire.
Fix several resource leaks that were causing lock-ups when disposing of 
connection objects. Many more unit tests can now be run. Some unit tests will 
never succeed due to the nature of the ZMQ implementation not having a central 
broker.
Fixes [AMQNET-491]. (See https://issues.apache.org/jira/browse/AMQNET-491)

Added:
    activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/WireFormat.cs
Modified:
    activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/nmsprovider-test.config
    activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Connection.cs
    activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Destination.cs
    
activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/MessageConsumer.cs
    
activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/MessageProducer.cs
    activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Session.cs
    activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/vs2010-zmq-net-4.0-test.csproj
    activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/vs2010-zmq-net-4.0.csproj

Modified: activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/nmsprovider-test.config
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/nmsprovider-test.config?rev=1629998&r1=1629997&r2=1629998&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/nmsprovider-test.config 
(original)
+++ activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/nmsprovider-test.config Wed 
Oct  8 00:59:46 2014
@@ -16,7 +16,7 @@
 * limitations under the License.
 -->
 <configuration>
-       <defaultURI value="zmq://localhost">
+       <defaultURI value="zmq:tcp://localhost:5556">
                <factoryParams>
                        <param type="string" value="NMSTestClient"/>
                </factoryParams>

Modified: 
activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Connection.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Connection.cs?rev=1629998&r1=1629997&r2=1629998&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Connection.cs 
(original)
+++ activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Connection.cs 
Wed Oct  8 00:59:46 2014
@@ -73,6 +73,16 @@ namespace Apache.NMS.ZMQ
                        {
                                if(0 == --instanceCount)
                                {
+                                       lock(producerCacheLock)
+                                       {
+                                               foreach(KeyValuePair<string, 
ProducerRef> cacheItem in producerCache)
+                                               {
+                                                       
cacheItem.Value.producer.Unbind(cacheItem.Key);
+                                               }
+
+                                               producerCache.Clear();
+                                       }
+
                                        Connection._context.Dispose();
                                }
                        }
@@ -248,16 +258,6 @@ namespace Apache.NMS.ZMQ
         public void Close()
         {
             Stop();
-
-                       lock(producerCacheLock)
-                       {
-                               foreach(KeyValuePair<string, ProducerRef> 
cacheItem in producerCache)
-                               {
-                                       
cacheItem.Value.producer.Unbind(cacheItem.Key);
-                               }
-
-                               producerCache.Clear();
-                       }
                }
 
         public void PurgeTempDestinations()

Modified: 
activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Destination.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Destination.cs?rev=1629998&r1=1629997&r2=1629998&view=diff
==============================================================================
--- 
activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Destination.cs 
(original)
+++ 
activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Destination.cs 
Wed Oct  8 00:59:46 2014
@@ -16,9 +16,9 @@
  */
 
 using System;
+using System.Diagnostics;
 using System.Text;
 using ZeroMQ;
-using System.Diagnostics;
 
 namespace Apache.NMS.ZMQ
 {
@@ -47,6 +47,7 @@ namespace Apache.NMS.ZMQ
                {
                        this.session = session;
                        this.destinationName = destName;
+                       this.session.RegisterDestination(this);
                }
 
                ~Destination()
@@ -103,6 +104,8 @@ namespace Apache.NMS.ZMQ
                                
this.session.Connection.ReleaseConsumer(this.consumerEndpoint);
                                this.consumerEndpoint = null;
                        }
+
+                       this.session.UnregisterDestination(this);
                }
 
                public string Name

Modified: 
activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/MessageConsumer.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/MessageConsumer.cs?rev=1629998&r1=1629997&r2=1629998&view=diff
==============================================================================
--- 
activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/MessageConsumer.cs
 (original)
+++ 
activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/MessageConsumer.cs
 Wed Oct  8 00:59:46 2014
@@ -19,6 +19,8 @@ using System;
 using System.Diagnostics;
 using System.Text;
 using System.Threading;
+using System.Net;
+using Apache.NMS.Util;
 
 namespace Apache.NMS.ZMQ
 {
@@ -121,10 +123,15 @@ namespace Apache.NMS.ZMQ
                        if(size > 0)
                        {
                                // Strip off the subscribed destination name.
-                               // TODO: Support decoding of all message types 
+ all meta data (e.g., headers and properties)
-                               int msgStart = this.rawDestinationName.Length;
-                               int msgLength = receivedMsg.Length - msgStart;
-                               string msgContent = 
Encoding.UTF8.GetString(receivedMsg, msgStart, msgLength);
+                               int receivedMsgIndex = 
this.rawDestinationName.Length;
+                               int msgLength = receivedMsg.Length - 
receivedMsgIndex;
+                               byte[] msgContent = new byte[msgLength];
+
+                               for(int index = 0; index < msgLength; index++, 
receivedMsgIndex++)
+                               {
+                                       msgContent[index] = 
receivedMsg[receivedMsgIndex];
+                               }
+
                                return ToNmsMessage(msgContent);
                        }
 
@@ -203,6 +210,7 @@ namespace Apache.NMS.ZMQ
                        Tracer.InfoFormat("Starting dispatcher thread consumer: 
{0}", this.asyncDeliveryThread.Name);
                        TimeSpan receiveWait = TimeSpan.FromSeconds(2);
 
+                       this.destination.InitReceiver();
                        // Signal that this thread has started.
                        asyncInit = true;
 
@@ -227,7 +235,7 @@ namespace Apache.NMS.ZMQ
                                                }
                                                else
                                                {
-                                                       Thread.Sleep(0);
+                                                       Thread.Sleep(1);
                                                }
                                        }
                                }
@@ -258,37 +266,219 @@ namespace Apache.NMS.ZMQ
                /// <returns>
                /// nms message object
                /// </returns>
-               protected virtual IMessage ToNmsMessage(string messageText)
+               protected virtual IMessage ToNmsMessage(byte[] msgData)
                {
-                       // Strip off the destination name prefix.
-                       IMessage nmsMessage = new TextMessage(messageText);
+                       IMessage nmsMessage = null;
+                       int messageType = WireFormat.MT_UNKNOWN;
+                       int fieldType = WireFormat.MFT_NONE;
+                       DateTime messageTimestamp = DateTime.UtcNow;
+                       string messageNMSType = null;
+                       string messageCorrelationId = null;
+                       IDestination messageReplyTo = null;
+                       MsgDeliveryMode messageDeliveryMode = 
MsgDeliveryMode.NonPersistent;
+                       MsgPriority messagePriority = MsgPriority.Normal;
+                       TimeSpan messageTimeToLive = TimeSpan.FromTicks(0);
+                       IPrimitiveMap messageProperties = null;
+                       int fieldLen;
+                       int index = 0;
+                       string messageID = string.Empty;
+                       byte[] messageBody = null;
 
                        try
                        {
-                               nmsMessage.NMSMessageId = "";
-                               nmsMessage.NMSDestination = this.destination;
-                               nmsMessage.NMSDeliveryMode = 
MsgDeliveryMode.NonPersistent;
-                               nmsMessage.NMSPriority = MsgPriority.Normal;
-                               nmsMessage.NMSTimestamp = DateTime.Now;
-                               nmsMessage.NMSTimeToLive = new TimeSpan(0);
-                               nmsMessage.NMSType = "";
+                               // Parse the commond message fields
+                               do
+                               {
+                                       fieldType = ReadInt(msgData, ref index);
+                                       switch(fieldType)
+                                       {
+                                       case WireFormat.MFT_NONE:
+                                               break;
+
+                                       case WireFormat.MFT_MESSAGEID:
+                                               messageID = ReadString(msgData, 
ref index);
+                                               break;
+
+                                       case WireFormat.MFT_TIMESTAMP:
+                                               fieldLen = ReadInt(msgData, ref 
index);
+                                               Debug.Assert(sizeof(long) == 
fieldLen);
+                                               messageTimestamp = 
DateTime.FromBinary(ReadLong(msgData, ref index));
+                                               break;
+
+                                       case WireFormat.MFT_NMSTYPE:
+                                               messageNMSType = 
ReadString(msgData, ref index);
+                                               break;
+
+                                       case WireFormat.MFT_CORRELATIONID:
+                                               messageCorrelationId = 
ReadString(msgData, ref index);
+                                               break;
+
+                                       case WireFormat.MFT_REPLYTO:
+                                               string replyToDestName = 
ReadString(msgData, ref index);
+                                               messageReplyTo = 
this.session.GetDestination(replyToDestName);
+                                               break;
+
+                                       case WireFormat.MFT_DELIVERYMODE:
+                                               fieldLen = ReadInt(msgData, ref 
index);
+                                               Debug.Assert(sizeof(int) == 
fieldLen);
+                                               messageDeliveryMode = 
(MsgDeliveryMode) ReadInt(msgData, ref index);
+                                               break;
+
+                                       case WireFormat.MFT_PRIORITY:
+                                               fieldLen = ReadInt(msgData, ref 
index);
+                                               Debug.Assert(sizeof(int) == 
fieldLen);
+                                               messagePriority = (MsgPriority) 
ReadInt(msgData, ref index);
+                                               break;
+
+                                       case WireFormat.MFT_TIMETOLIVE:
+                                               fieldLen = ReadInt(msgData, ref 
index);
+                                               Debug.Assert(sizeof(long) == 
fieldLen);
+                                               messageTimeToLive = 
TimeSpan.FromTicks(ReadLong(msgData, ref index));
+                                               break;
+
+                                       case WireFormat.MFT_HEADERS:
+                                               fieldLen = ReadInt(msgData, ref 
index);
+                                               int numProperties = 
ReadInt(msgData, ref index);
+                                               if(numProperties > 0)
+                                               {
+                                                       messageProperties = new 
PrimitiveMap();
+                                                       while(numProperties-- > 
0)
+                                                       {
+                                                               string 
propertyKey = ReadString(msgData, ref index);
+                                                               byte[] 
propertyVal = ReadBytes(msgData, ref index);
+                                                               
messageProperties.SetBytes(propertyKey, propertyVal);
+                                                       }
+                                               }
+                                               break;
+
+                                       case WireFormat.MFT_MSGTYPE:
+                                               fieldLen = ReadInt(msgData, ref 
index);
+                                               Debug.Assert(sizeof(int) == 
fieldLen);
+                                               messageType = ReadInt(msgData, 
ref index);
+                                               break;
+
+                                       case WireFormat.MFT_BODY:
+                                               messageBody = 
ReadBytes(msgData, ref index);
+                                               break;
+
+                                       default:
+                                               // Skip past this field.
+                                               Tracer.WarnFormat("Unknown 
message field type: {0}", fieldType);
+                                               fieldLen = ReadInt(msgData, ref 
index);
+                                               index += fieldLen;
+                                               break;
+                                       }
+                               } while(WireFormat.MFT_NONE != fieldType && 
index < msgData.Length);
                        }
-                       catch(InvalidOperationException)
+                       catch(Exception ex)
                        {
-                               // Log error
+                               Tracer.ErrorFormat("Exception parsing message: 
{0}", ex.Message);
                        }
 
-                       if(null != this.ConsumerTransformer)
+                       // Instantiate the message type
+                       switch(messageType)
                        {
-                               IMessage transformedMessage = 
ConsumerTransformer(this.session, this, nmsMessage);
+                       case WireFormat.MT_MESSAGE:
+                               nmsMessage = new BaseMessage();
+                               break;
+
+                       case WireFormat.MT_TEXTMESSAGE:
+                               nmsMessage = new TextMessage();
+                               if(null != messageBody)
+                               {
+                                       ((TextMessage) nmsMessage).Text = 
Encoding.UTF8.GetString(messageBody);
+                               }
+                               break;
+
+                       case WireFormat.MT_UNKNOWN:
+                       default:
+                               break;
+                       }
 
-                               if(null != transformedMessage)
+                       // Set the common headers.
+                       if(null != nmsMessage)
+                       {
+                               try
                                {
-                                       nmsMessage = transformedMessage;
+                                       nmsMessage.NMSMessageId = messageID;
+                                       nmsMessage.NMSCorrelationID = 
messageCorrelationId;
+                                       nmsMessage.NMSDestination = 
this.destination;
+                                       nmsMessage.NMSReplyTo = messageReplyTo;
+                                       nmsMessage.NMSDeliveryMode = 
messageDeliveryMode;
+                                       nmsMessage.NMSPriority = 
messagePriority;
+                                       nmsMessage.NMSTimestamp = 
messageTimestamp;
+                                       nmsMessage.NMSTimeToLive = 
messageTimeToLive;
+                                       nmsMessage.NMSType = messageNMSType;
+                                       if(null != messageProperties)
+                                       {
+                                               foreach(string propertyKey in 
messageProperties.Keys)
+                                               {
+                                                       
nmsMessage.Properties.SetBytes(propertyKey, 
messageProperties.GetBytes(propertyKey));
+                                               }
+                                       }
+                               }
+                               catch(InvalidOperationException)
+                               {
+                                       // Log error
+                               }
+
+                               if(null != this.ConsumerTransformer)
+                               {
+                                       IMessage transformedMessage = 
ConsumerTransformer(this.session, this, nmsMessage);
+
+                                       if(null != transformedMessage)
+                                       {
+                                               nmsMessage = transformedMessage;
+                                       }
                                }
                        }
 
                        return nmsMessage;
                }
+
+               private long ReadLong(byte[] msgData, ref int index)
+               {
+                       long val = BitConverter.ToInt64(msgData, index);
+                       index += sizeof(long);
+                       return IPAddress.NetworkToHostOrder(val);
+               }
+
+               private int ReadInt(byte[] msgData, ref int index)
+               {
+                       int val = BitConverter.ToInt32(msgData, index);
+                       index += sizeof(int);
+                       return IPAddress.NetworkToHostOrder(val);
+               }
+
+               private string ReadString(byte[] msgData, ref int index)
+               {
+                       int stringLen = ReadInt(msgData, ref index);
+                       string stringVal = string.Empty;
+
+                       if(stringLen > 0)
+                       {
+                               stringVal = Encoding.UTF8.GetString(msgData, 
index, stringLen);
+                               index += stringLen;
+                       }
+
+                       return stringVal;
+               }
+
+               private byte[] ReadBytes(byte[] msgData, ref int index)
+               {
+                       int bytesLen = ReadInt(msgData, ref index);
+                       byte[] bytesVal = null;
+
+                       if(bytesLen >= 0)
+                       {
+                               bytesVal = new byte[bytesLen];
+                               for(int byteIndex = 0; byteIndex < bytesLen; 
byteIndex++, index++)
+                               {
+                                       bytesVal[byteIndex] = msgData[index];
+                               }
+                       }
+
+                       return bytesVal;
+               }
        }
 }

Modified: 
activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/MessageProducer.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/MessageProducer.cs?rev=1629998&r1=1629997&r2=1629998&view=diff
==============================================================================
--- 
activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/MessageProducer.cs
 (original)
+++ 
activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/MessageProducer.cs
 Wed Oct  8 00:59:46 2014
@@ -18,9 +18,9 @@
 #define PUBSUB
 
 using System;
+using System.Collections.Generic;
 using System.Text;
-using ZeroMQ;
-
+using System.Net;
 
 namespace Apache.NMS.ZMQ
 {
@@ -59,24 +59,30 @@ namespace Apache.NMS.ZMQ
 
                public void Send(IMessage message)
                {
-                       Send(this.destination, message);
+                       Send(this.destination, message, this.deliveryMode, 
this.priority, this.timeToLive, false);
                }
 
-               public void Send(IMessage message, MsgDeliveryMode 
deliveryMode, MsgPriority priority, TimeSpan timeToLive)
+               public void Send(IDestination dest, IMessage message)
                {
-                       Send(this.destination, message, deliveryMode, priority, 
timeToLive);
+                       Send(dest, message, this.deliveryMode, this.priority, 
this.timeToLive, false);
                }
 
-               public void Send(IDestination dest, IMessage message)
+               public void Send(IMessage message, MsgDeliveryMode 
deliveryMode, MsgPriority priority, TimeSpan timeToLive)
                {
-                       Send(dest, message, this.DeliveryMode, this.Priority, 
this.TimeToLive);
+                       Send(this.destination, message, deliveryMode, priority, 
timeToLive, true);
                }
 
                public void Send(IDestination dest, IMessage message, 
MsgDeliveryMode deliveryMode, MsgPriority priority, TimeSpan timeToLive)
                {
-                       // UNUSED_PARAM(deliveryMode);  // No concept of 
different delivery modes in ZMQ
-                       // UNUSED_PARAM(priority);              // No concept 
of priority messages in ZMQ
-                       // UNUSED_PARAM(timeToLive);    // No concept of 
time-to-live in ZMQ
+                       Send(destination, message, deliveryMode, priority, 
timeToLive, true);
+               }
+
+               public void Send(IDestination dest, IMessage message, 
MsgDeliveryMode deliveryMode, MsgPriority priority, TimeSpan timeToLive, bool 
specifiedTimeToLive)
+               {
+                       if(null == dest)
+                       {
+                               return;
+                       }
 
                        if(null != this.ProducerTransformer)
                        {
@@ -88,14 +94,145 @@ namespace Apache.NMS.ZMQ
                                }
                        }
 
-                       // TODO: Support encoding of all message types + all 
meta data (e.g., headers and properties)
+                       // Serialize the message data
+                       Destination theDest = (Destination) dest;
+                       List<byte> msgDataBuilder = new List<byte>();
+
+                       // Always set the message Id.
+                       message.NMSMessageId = Guid.NewGuid().ToString();
+                       message.NMSTimestamp = DateTime.UtcNow;
+                       if(specifiedTimeToLive)
+                       {
+                               message.NMSTimeToLive = timeToLive;
+                       }
 
                        // Prefix the message with the destination name. The 
client will subscribe to this destination name
                        // in order to receive messages.
-                       Destination theDest = (Destination) dest;
+                       
msgDataBuilder.AddRange(Encoding.UTF8.GetBytes(theDest.Name));
+
+                       // Encode all meta data (e.g., headers and properties)
+                       EncodeField(msgDataBuilder, WireFormat.MFT_MESSAGEID, 
message.NMSMessageId);
+                       EncodeField(msgDataBuilder, WireFormat.MFT_TIMESTAMP, 
message.NMSTimestamp.ToBinary());
+                       if(null != message.NMSType)
+                       {
+                               EncodeField(msgDataBuilder, 
WireFormat.MFT_NMSTYPE, message.NMSType);
+                       }
+
+                       if(null != message.NMSCorrelationID)
+                       {
+                               EncodeField(msgDataBuilder, 
WireFormat.MFT_CORRELATIONID, message.NMSCorrelationID);
+                       }
+
+                       if(null != message.NMSReplyTo)
+                       {
+                               EncodeField(msgDataBuilder, 
WireFormat.MFT_REPLYTO, ((Destination) message.NMSReplyTo).Name);
+                       }
+
+                       EncodeField(msgDataBuilder, 
WireFormat.MFT_DELIVERYMODE, message.NMSDeliveryMode);
+                       EncodeField(msgDataBuilder, WireFormat.MFT_PRIORITY, 
message.NMSPriority);
+                       EncodeField(msgDataBuilder, WireFormat.MFT_TIMETOLIVE, 
message.NMSTimeToLive.Ticks);
+
+                       IPrimitiveMap properties = message.Properties;
+                       if(null != properties && properties.Count > 0)
+                       {
+                               // Encode into a temporary buffer, and then 
place a single buffer into the msgDataBuilder.
+                               List<byte> propertiesBuilder = new List<byte>();
+
+                               EncodeFieldData(propertiesBuilder, 
propertiesBuilder.Count);
+                               foreach(string propertyKey in properties.Keys)
+                               {
+                                       EncodeFieldData(propertiesBuilder, 
propertyKey);
+                                       EncodeFieldData(propertiesBuilder, 
properties.GetBytes(propertyKey));
+                               }
+
+                               EncodeField(msgDataBuilder, 
WireFormat.MFT_HEADERS, propertiesBuilder.ToArray());
+                       }
+
+                       if(message is ITextMessage)
+                       {
+                               EncodeField(msgDataBuilder, 
WireFormat.MFT_MSGTYPE, WireFormat.MT_TEXTMESSAGE);
+                               // Append the message text body to the msg.
+                               string msgBody = ((ITextMessage) message).Text;
+
+                               if(null != msgBody)
+                               {
+                                       EncodeField(msgDataBuilder, 
WireFormat.MFT_BODY, msgBody);
+                               }
+                       }
+                       else
+                       {
+                               // TODO: Add support for more message types
+                               EncodeField(msgDataBuilder, 
WireFormat.MFT_MSGTYPE, WireFormat.MT_MESSAGE);
+                       }
+
+                       // Put the sentinal field marker.
+                       EncodeField(msgDataBuilder, WireFormat.MFT_NONE, 0);
+                       theDest.Send(msgDataBuilder.ToArray());
+               }
+
+               private void EncodeField(List<byte> msgDataBuilder, int 
msgFieldType, string fieldData)
+               {
+                       if(null == fieldData)
+                       {
+                               fieldData = string.Empty;
+                       }
+
+                       EncodeField(msgDataBuilder, msgFieldType, 
Encoding.UTF8.GetBytes(fieldData));
+               }
+
+               private void EncodeField(List<byte> msgDataBuilder, int 
msgFieldType, Enum fieldData)
+               {
+                       EncodeField(msgDataBuilder, msgFieldType, 
Convert.ToInt32(fieldData));
+               }
+
+               private void EncodeField(List<byte> msgDataBuilder, int 
msgFieldType, int fieldData)
+               {
+                       EncodeField(msgDataBuilder, msgFieldType, 
BitConverter.GetBytes(IPAddress.HostToNetworkOrder(fieldData)));
+               }
+
+               private void EncodeField(List<byte> msgDataBuilder, int 
msgFieldType, long fieldData)
+               {
+                       EncodeField(msgDataBuilder, msgFieldType, 
BitConverter.GetBytes(IPAddress.HostToNetworkOrder(fieldData)));
+               }
 
-                       string msg = theDest.Name + ((ITextMessage) 
message).Text;
-                       theDest.Send(msg);
+               private void EncodeField(List<byte> msgDataBuilder, int 
msgFieldType, byte[] fieldData)
+               {
+                       // Encode the field type
+                       
msgDataBuilder.AddRange(BitConverter.GetBytes(IPAddress.HostToNetworkOrder(msgFieldType)));
+                       EncodeFieldData(msgDataBuilder, fieldData);
+               }
+
+               private void EncodeFieldData(List<byte> msgDataBuilder, int 
fieldData)
+               {
+                       
msgDataBuilder.AddRange(BitConverter.GetBytes(IPAddress.HostToNetworkOrder(fieldData)));
+               }
+
+               private void EncodeFieldData(List<byte> msgDataBuilder, string 
fieldData)
+               {
+                       if(null == fieldData)
+                       {
+                               fieldData = string.Empty;
+                       }
+
+                       EncodeFieldData(msgDataBuilder, 
Encoding.UTF8.GetBytes(fieldData));
+               }
+
+               private void EncodeFieldData(List<byte> msgDataBuilder, byte[] 
fieldData)
+               {
+                       // Encode the field length
+                       int fieldLength = 0;
+
+                       if(null != fieldData)
+                       {
+                               fieldLength = fieldData.Length;
+                       }
+
+                       EncodeFieldData(msgDataBuilder, fieldLength);
+                       if(0 != fieldLength)
+                       {
+                               // Encode the field data
+                               msgDataBuilder.AddRange(fieldData);
+                       }
                }
 
                public void Dispose()

Modified: 
activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Session.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Session.cs?rev=1629998&r1=1629997&r2=1629998&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Session.cs 
(original)
+++ activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Session.cs 
Wed Oct  8 00:59:46 2014
@@ -16,6 +16,7 @@
  */
 
 using System;
+using System.Collections.Generic;
 using System.Messaging;
 
 namespace Apache.NMS.ZMQ
@@ -28,6 +29,8 @@ namespace Apache.NMS.ZMQ
         private Connection connection;
         private AcknowledgementMode acknowledgementMode;
         private MessageQueueTransaction messageQueueTransaction;
+               private List<Destination> destinations = new 
List<Destination>();
+               private object destinationLock = new object();
 
         public Session(Connection connection, AcknowledgementMode 
acknowledgementMode)
         {
@@ -46,13 +49,49 @@ namespace Apache.NMS.ZMQ
 
         public void Close()
         {
-            if(MessageQueueTransaction != null)
+                       List<Destination> closingDestinations = null;
+
+                       lock(destinationLock)
+                       {
+                               if(destinations.Count > 0)
+                               {
+                                       closingDestinations = new 
List<Destination>(destinations);
+                               }
+
+                               destinations.Clear();
+                       }
+
+                       if(null != closingDestinations)
+                       {
+                               foreach(Destination dest in closingDestinations)
+                               {
+                                       dest.Dispose();
+                               }
+                       }
+                       
+                       if(MessageQueueTransaction != null)
             {
                 MessageQueueTransaction.Dispose();
                 MessageQueueTransaction = null;
             }
         }
 
+               internal void RegisterDestination(Destination dest)
+               {
+                       lock(destinationLock)
+                       {
+                               destinations.Add(dest);
+                       }
+               }
+
+               internal void UnregisterDestination(Destination dest)
+               {
+                       lock(destinationLock)
+                       {
+                               destinations.Remove(dest);
+                       }
+               }
+
         #region Producer methods
         public IMessageProducer CreateProducer()
         {

Added: 
activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/WireFormat.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/WireFormat.cs?rev=1629998&view=auto
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/WireFormat.cs 
(added)
+++ activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/WireFormat.cs 
Wed Oct  8 00:59:46 2014
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.NMS.ZMQ
+{
+       public class WireFormat
+       {
+               /// <summary>
+               /// Message Field Types
+               /// IMPORTANT: These assigned numbers cannot change. If new 
field types
+               /// are added, then they must be assigned new numbers. Do not 
re-use numbers.
+               /// </summary>
+               public const int MFT_NONE = 0;
+               public const int MFT_MESSAGEID = 1;
+               public const int MFT_TIMESTAMP = 2;
+               public const int MFT_NMSTYPE = 3;
+               public const int MFT_CORRELATIONID = 4;
+               public const int MFT_REPLYTO = 5;
+               public const int MFT_DELIVERYMODE = 6;
+               public const int MFT_PRIORITY = 7;
+               public const int MFT_TIMETOLIVE = 8;
+               public const int MFT_HEADERS = 9;
+               public const int MFT_MSGTYPE = 10;
+               public const int MFT_BODY = 11;
+
+               // Message Types
+               /// <summary>
+               /// Message Types
+               /// These are the base message types. This is a sub-field of 
the MFT_MSGTYPE message field type.
+               /// IMPORTANT: These numbers cannot be changed. It will break 
wireformat compatibility.
+               /// </summary>
+               public const int MT_UNKNOWN = 0;
+               public const int MT_MESSAGE = 1;
+               public const int MT_TEXTMESSAGE = 2;
+               public const int MT_BYTESMESSAGE = 3;
+               public const int MT_MAPMESSAGE = 4;
+               public const int MT_OBJECTMESSAGE = 5;
+               public const int MT_STREAMMESSAGE = 6;
+
+       }
+}

Modified: 
activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/vs2010-zmq-net-4.0-test.csproj
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/vs2010-zmq-net-4.0-test.csproj?rev=1629998&r1=1629997&r2=1629998&view=diff
==============================================================================
--- 
activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/vs2010-zmq-net-4.0-test.csproj 
(original)
+++ 
activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/vs2010-zmq-net-4.0-test.csproj 
Wed Oct  8 00:59:46 2014
@@ -57,6 +57,9 @@
     <None Include="Apache.NMS.ZMQ.Test.nunit">
       <CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
     </None>
+    <Content Include="nmsprovider-test.config">
+      <CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
+    </Content>
   </ItemGroup>
   <ItemGroup>
     <ProjectReference Include="vs2010-zmq-net-4.0.csproj">

Modified: 
activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/vs2010-zmq-net-4.0.csproj
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/vs2010-zmq-net-4.0.csproj?rev=1629998&r1=1629997&r2=1629998&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/vs2010-zmq-net-4.0.csproj 
(original)
+++ activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/vs2010-zmq-net-4.0.csproj Wed 
Oct  8 00:59:46 2014
@@ -74,6 +74,7 @@
     </Compile>
     <Compile Include="src\main\csharp\TextMessage.cs" />
     <Compile Include="src\main\csharp\Utils.cs" />
+    <Compile Include="src\main\csharp\WireFormat.cs" />
   </ItemGroup>
   <ItemGroup>
     <Content Include="lib\clrzmq\net-4.0\libzmq.dll">


Reply via email to