Author: jgomes
Date: Wed Mar 19 19:09:34 2014
New Revision: 1579357

URL: http://svn.apache.org/r1579357
Log:
Implement disposable pattern for Connection.
Only dispose the producer endpoint after its final release.
Add overloaded send/receive API for destinations.
Initialize the sockets on the correct message handler thread.

Modified:
    activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/BaseMessage.cs
    activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Connection.cs
    activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Destination.cs
    
activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/MessageConsumer.cs
    
activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/MessageProducer.cs
    activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/test/csharp/ZMQTest.cs

Modified: 
activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/BaseMessage.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/BaseMessage.cs?rev=1579357&r1=1579356&r2=1579357&view=diff
==============================================================================
--- 
activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/BaseMessage.cs 
(original)
+++ 
activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/BaseMessage.cs 
Wed Mar 19 19:09:34 2014
@@ -29,7 +29,7 @@ namespace Apache.NMS.ZMQ
                private string correlationId;
                private TimeSpan timeToLive;
                private string messageId;
-               private MsgDeliveryMode deliveryMode;
+               private MsgDeliveryMode deliveryMode = 
MsgDeliveryMode.NonPersistent;
                private MsgPriority priority;
                private Destination replyTo;
                private byte[] content;

Modified: 
activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Connection.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Connection.cs?rev=1579357&r1=1579356&r2=1579357&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Connection.cs 
(original)
+++ activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Connection.cs 
Wed Mar 19 19:09:34 2014
@@ -19,6 +19,7 @@ using System;
 using ZeroMQ;
 using System.Collections.Generic;
 using System.Text;
+using System.Collections;
 
 namespace Apache.NMS.ZMQ
 {
@@ -46,24 +47,87 @@ namespace Apache.NMS.ZMQ
         /// <summary>
         /// ZMQ context
         /// </summary>
+               private static object contextLock = new object();
+               private static int instanceCount = 0;
                private static ZmqContext _context;
-               private static Dictionary<string, ProducerRef> producerCache;
-               private static object producerCacheLock;
+               private static Dictionary<string, ProducerRef> producerCache = 
new Dictionary<string, ProducerRef>();
+               private static object producerCacheLock = new object();
+               private TimeSpan zeroTimeout = new TimeSpan(0);
 
-               static Connection()
+               private bool disposed = false;
+
+               private static void InitContext()
+               {
+                       lock(contextLock)
+                       {
+                               if(0 == instanceCount++)
+                               {
+                                       Connection._context = 
ZmqContext.Create();
+                               }
+                       }
+               }
+
+               private static void DestroyContext()
                {
-                       Connection._context = ZmqContext.Create();
-                       Connection.producerCache = new Dictionary<string, 
ProducerRef>();
-                       Connection.producerCacheLock = new object();
+                       lock(contextLock)
+                       {
+                               if(0 == --instanceCount)
+                               {
+                                       Connection._context.Dispose();
+                               }
+                       }
                }
 
                public Connection(Uri connectionUri)
                {
+                       InitContext();
                        this.brokerUri = connectionUri;
                        this.producerContextBinding = 
string.Format("{0}://*:{1}", this.brokerUri.Scheme, this.brokerUri.Port);
                        this.consumerContextBinding = 
string.Format("{0}://{1}:{2}", brokerUri.Scheme, brokerUri.Host, 
this.brokerUri.Port);
                }
 
+               ~Connection()
+               {
+                       Dispose(false);
+               }
+
+               public void Dispose()
+               {
+                       Dispose(true);
+                       GC.SuppressFinalize(this);
+               }
+
+               private void Dispose(bool disposing)
+               {
+                       if(disposed)
+                       {
+                               return;
+                       }
+
+                       if(disposing)
+                       {
+                               try
+                               {
+                                       OnDispose();
+                               }
+                               catch(Exception ex)
+                               {
+                                       Tracer.ErrorFormat("Exception disposing 
Connection {0}: {1}", this.brokerUri.AbsoluteUri, ex.Message);
+                               }
+                       }
+
+                       disposed = true;
+               }
+
+               /// <summary>
+               /// Child classes can override this method to perform clean-up 
logic.
+               /// </summary>
+               protected virtual void OnDispose()
+               {
+                       Close();
+                       DestroyContext();
+               }
+
                /// <summary>
         /// Starts message delivery for this connection.
         /// </summary>
@@ -147,12 +211,13 @@ namespace Apache.NMS.ZMQ
                                        {
                                                
producerCache.Remove(contextBinding);
                                                
producerRef.producer.Unbind(contextBinding);
+                                               producerRef.producer.Dispose();
                                        }
                                }
                        }
                }
 
-               internal ZmqSocket GetConsumer(Encoding encoding, string 
destinationName)
+               internal ZmqSocket GetConsumer()
                {
                        ZmqSocket endpoint = 
this.Context.CreateSocket(SocketType.SUB);
 
@@ -160,8 +225,6 @@ namespace Apache.NMS.ZMQ
                        {
                                throw new ResourceAllocationException();
                        }
-                       endpoint.Subscribe(encoding.GetBytes(destinationName));
-                       endpoint.Connect(GetConsumerBindingPath());
 
                        return endpoint;
                }
@@ -169,6 +232,7 @@ namespace Apache.NMS.ZMQ
                internal void ReleaseConsumer(ZmqSocket endpoint)
                {
                        endpoint.Disconnect(GetConsumerBindingPath());
+                       endpoint.Dispose();
                }
 
                internal string GetProducerContextBinding()
@@ -176,16 +240,11 @@ namespace Apache.NMS.ZMQ
                        return this.producerContextBinding;
                }
 
-               private string GetConsumerBindingPath()
+               internal string GetConsumerBindingPath()
                {
                        return this.consumerContextBinding;
                }
 
-               public void Dispose()
-        {
-            Close();
-        }
-
         public void Close()
         {
             Stop();

Modified: 
activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Destination.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Destination.cs?rev=1579357&r1=1579356&r2=1579357&view=diff
==============================================================================
--- 
activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Destination.cs 
(original)
+++ 
activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/Destination.cs 
Wed Mar 19 19:09:34 2014
@@ -18,6 +18,7 @@
 using System;
 using System.Text;
 using ZeroMQ;
+using System.Diagnostics;
 
 namespace Apache.NMS.ZMQ
 {
@@ -26,6 +27,8 @@ namespace Apache.NMS.ZMQ
        /// </summary>
        public abstract class Destination : IDestination
        {
+               public static Encoding encoding = Encoding.UTF8;
+
                protected Session session;
                /// <summary>
                /// Socket object
@@ -92,14 +95,12 @@ namespace Apache.NMS.ZMQ
                                        
this.session.Connection.ReleaseProducer(this.producerEndpoint);
                                }
 
-                               this.producerEndpoint.Dispose();
                                this.producerEndpoint = null;
                        }
 
                        if(null != this.consumerEndpoint)
                        {
                                
this.session.Connection.ReleaseConsumer(this.consumerEndpoint);
-                               this.consumerEndpoint.Dispose();
                                this.consumerEndpoint = null;
                        }
                }
@@ -178,35 +179,82 @@ namespace Apache.NMS.ZMQ
                        get;
                }
 
-               internal int Send(byte[] buffer, TimeSpan timeout)
+               internal void InitSender()
                {
                        if(null == this.producerEndpoint)
                        {
                                this.producerEndpoint = 
this.session.Connection.GetProducer();
                        }
-
-                       return this.producerEndpoint.Send(buffer, 
buffer.Length, SocketFlags.None, timeout);
                }
 
-               internal string Receive(Encoding encoding, TimeSpan timeout)
+               internal void InitReceiver()
                {
                        if(null == this.consumerEndpoint)
                        {
-                               this.consumerEndpoint = 
this.session.Connection.GetConsumer(encoding, this.destinationName);
+                               Connection connection = this.session.Connection;
+
+                               this.consumerEndpoint = 
connection.GetConsumer();
+                               // Must subscribe first before connecting to 
the endpoint binding
+                               
this.consumerEndpoint.Subscribe(Destination.encoding.GetBytes(this.destinationName));
+                               
this.consumerEndpoint.Connect(connection.GetConsumerBindingPath());
                        }
+               }
+
+               internal void Subscribe(string prefixName)
+               {
+                       InitReceiver();
+                       
this.consumerEndpoint.Subscribe(Destination.encoding.GetBytes(prefixName));
+               }
+
+               internal void Unsubscribe(string prefixName)
+               {
+                       if(null != this.consumerEndpoint)
+                       {
+                               
this.consumerEndpoint.Unsubscribe(Destination.encoding.GetBytes(prefixName));
+                       }
+               }
 
-                       return consumerEndpoint.Receive(encoding, timeout);
+               internal SendStatus Send(string msg)
+               {
+                       Debug.Assert(null != this.producerEndpoint, "Call 
InitSender() before calling Send().");
+                       return this.producerEndpoint.Send(msg, 
Destination.encoding);
+               }
+
+               internal SendStatus Send(byte[] buffer)
+               {
+                       Debug.Assert(null != this.producerEndpoint, "Call 
InitSender() before calling Send().");
+                       return this.producerEndpoint.Send(buffer);
+               }
+
+               internal string ReceiveString(TimeSpan timeout)
+               {
+                       this.InitReceiver();
+                       return 
this.consumerEndpoint.Receive(Destination.encoding, timeout);
+               }
+
+               internal byte[] ReceiveBytes(TimeSpan timeout, out int size)
+               {
+                       this.InitReceiver();
+                       return this.consumerEndpoint.Receive(null, timeout, out 
size);
+               }
+
+               internal byte[] ReceiveBytes(SocketFlags flags, out int size)
+               {
+                       this.InitReceiver();
+                       return this.consumerEndpoint.Receive(null, flags, out 
size);
                }
 
                internal Frame ReceiveFrame()
                {
                        // TODO: Implement
+                       this.InitReceiver();
                        return null;
                }
 
                internal ZmqMessage ReceiveMessage()
                {
                        // TODO: Implement
+                       this.InitReceiver();
                        return null;
                }
        }

Modified: 
activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/MessageConsumer.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/MessageConsumer.cs?rev=1579357&r1=1579356&r2=1579357&view=diff
==============================================================================
--- 
activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/MessageConsumer.cs
 (original)
+++ 
activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/MessageConsumer.cs
 Wed Mar 19 19:09:34 2014
@@ -15,14 +15,10 @@
  * limitations under the License.
  */
 
-#define PUBSUB
-
 using System;
+using System.Diagnostics;
 using System.Text;
 using System.Threading;
-using Apache.NMS.Util;
-using ZeroMQ;
-using System.Diagnostics;
 
 namespace Apache.NMS.ZMQ
 {
@@ -31,7 +27,7 @@ namespace Apache.NMS.ZMQ
        /// </summary>
        public class MessageConsumer : IMessageConsumer
        {
-               protected TimeSpan zeroTimeout = new TimeSpan(0);
+               protected static readonly TimeSpan zeroTimeout = new 
TimeSpan(0);
 
                private readonly Session session;
                private readonly AcknowledgementMode acknowledgementMode;
@@ -41,6 +37,8 @@ namespace Apache.NMS.ZMQ
                private Thread asyncDeliveryThread = null;
                private object asyncDeliveryLock = new object();
                private bool asyncDelivery = false;
+               private bool asyncInit = false;
+               private byte[] rawDestinationName;
 
                private ConsumerTransformerDelegate consumerTransformer;
                public ConsumerTransformerDelegate ConsumerTransformer
@@ -60,29 +58,40 @@ namespace Apache.NMS.ZMQ
 
                        this.session = sess;
                        this.destination = (Destination) dest;
+                       this.rawDestinationName = 
Destination.encoding.GetBytes(this.destination.Name);
                        this.acknowledgementMode = ackMode;
                }
 
+               private object listenerLock = new object();
                public event MessageListener Listener
                {
                        add
                        {
-                               this.listener += value;
-                               this.listenerCount++;
-                               StartAsyncDelivery();
+                               lock(listenerLock)
+                               {
+                                       this.listener += value;
+                                       if(0 == this.listenerCount)
+                                       {
+                                               StartAsyncDelivery();
+                                       }
+
+                                       this.listenerCount++;
+                               }
                        }
 
                        remove
                        {
-                               if(this.listenerCount > 0)
+                               lock(listenerLock)
                                {
                                        this.listener -= value;
-                                       this.listenerCount--;
-                               }
-
-                               if(0 == listenerCount)
-                               {
-                                       StopAsyncDelivery();
+                                       if(this.listenerCount > 0)
+                                       {
+                                               this.listenerCount--;
+                                               if(0 == this.listenerCount)
+                                               {
+                                                       StopAsyncDelivery();
+                                               }
+                                       }
                                }
                        }
                }
@@ -106,15 +115,17 @@ namespace Apache.NMS.ZMQ
                /// </returns>
                public IMessage Receive(TimeSpan timeout)
                {
-                       // TODO: Support decoding of all message types + all 
meta data (e.g., headers and properties)
-                       string msgContent = 
this.destination.Receive(Encoding.UTF8, timeout);
+                       int size;
+                       byte[] receivedMsg = 
this.destination.ReceiveBytes(timeout, out size);
 
-                       if(null != msgContent)
+                       if(size > 0)
                        {
                                // Strip off the subscribed destination name.
-                               string destinationName = this.destination.Name;
-                               string messageText = 
msgContent.Substring(destinationName.Length, msgContent.Length - 
destinationName.Length);
-                               return ToNmsMessage(messageText);
+                               // TODO: Support decoding of all message types 
+ all meta data (e.g., headers and properties)
+                               int msgStart = this.rawDestinationName.Length;
+                               int msgLength = receivedMsg.Length - msgStart;
+                               string msgContent = 
Encoding.UTF8.GetString(receivedMsg, msgStart, msgLength);
+                               return ToNmsMessage(msgContent);
                        }
 
                        return null;
@@ -150,7 +161,7 @@ namespace Apache.NMS.ZMQ
 
                protected virtual void StopAsyncDelivery()
                {
-                       lock(asyncDeliveryLock)
+                       lock(this.asyncDeliveryLock)
                        {
                                this.asyncDelivery = false;
                                if(null != this.asyncDeliveryThread)
@@ -174,33 +185,49 @@ namespace Apache.NMS.ZMQ
                        Debug.Assert(null == this.asyncDeliveryThread);
                        lock(this.asyncDeliveryLock)
                        {
+                               this.asyncInit = false;
                                this.asyncDelivery = true;
-                               this.asyncDeliveryThread = new Thread(new 
ThreadStart(DispatchLoop));
+                               this.asyncDeliveryThread = new Thread(new 
ThreadStart(MsgDispatchLoop));
                                this.asyncDeliveryThread.Name = 
string.Format("MsgConsumerAsync: {0}", this.destination.Name);
                                this.asyncDeliveryThread.IsBackground = true;
                                this.asyncDeliveryThread.Start();
+                               while(!asyncInit)
+                               {
+                                       Thread.Sleep(1);
+                               }
                        }
                }
 
-               protected virtual void DispatchLoop()
+               protected virtual void MsgDispatchLoop()
                {
                        Tracer.InfoFormat("Starting dispatcher thread consumer: 
{0}", this.asyncDeliveryThread.Name);
-                       TimeSpan receiveWait = TimeSpan.FromSeconds(3);
+                       TimeSpan receiveWait = TimeSpan.FromSeconds(2);
+
+                       // Signal that this thread has started.
+                       asyncInit = true;
 
                        while(asyncDelivery)
                        {
                                try
                                {
                                        IMessage message = Receive(receiveWait);
-                                       if(asyncDelivery && message != null)
+
+                                       if(asyncDelivery)
                                        {
-                                               try
+                                               if(null != message)
                                                {
-                                                       listener(message);
+                                                       try
+                                                       {
+                                                               
listener(message);
+                                                       }
+                                                       catch(Exception ex)
+                                                       {
+                                                               
HandleAsyncException(ex);
+                                                       }
                                                }
-                                               catch(Exception ex)
+                                               else
                                                {
-                                                       
HandleAsyncException(ex);
+                                                       Thread.Sleep(0);
                                                }
                                        }
                                }

Modified: 
activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/MessageProducer.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/MessageProducer.cs?rev=1579357&r1=1579356&r2=1579357&view=diff
==============================================================================
--- 
activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/MessageProducer.cs
 (original)
+++ 
activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/main/csharp/MessageProducer.cs
 Wed Mar 19 19:09:34 2014
@@ -30,9 +30,9 @@ namespace Apache.NMS.ZMQ
        public class MessageProducer : IMessageProducer
        {
                private readonly Session session;
-               private IDestination destination;
+               private Destination destination;
 
-               private MsgDeliveryMode deliveryMode;
+               private MsgDeliveryMode deliveryMode = 
MsgDeliveryMode.NonPersistent;
                private TimeSpan timeToLive;
                private MsgPriority priority;
                private bool disableMessageID;
@@ -53,17 +53,18 @@ namespace Apache.NMS.ZMQ
                        }
 
                        this.session = sess;
-                       this.destination = dest;
+                       this.destination = (Destination) dest;
+                       this.destination.InitSender();
                }
 
                public void Send(IMessage message)
                {
-                       Send(this.Destination, message);
+                       Send(this.destination, message);
                }
 
                public void Send(IMessage message, MsgDeliveryMode 
deliveryMode, MsgPriority priority, TimeSpan timeToLive)
                {
-                       Send(this.Destination, message, deliveryMode, priority, 
timeToLive);
+                       Send(this.destination, message, deliveryMode, priority, 
timeToLive);
                }
 
                public void Send(IDestination dest, IMessage message)
@@ -94,7 +95,7 @@ namespace Apache.NMS.ZMQ
                        Destination theDest = (Destination) dest;
 
                        string msg = theDest.Name + ((ITextMessage) 
message).Text;
-                       theDest.Send(Encoding.UTF8.GetBytes(msg), 
this.session.Connection.RequestTimeout);
+                       theDest.Send(msg);
                }
 
                public void Dispose()
@@ -168,12 +169,6 @@ namespace Apache.NMS.ZMQ
                        set { }
                }
 
-               public IDestination Destination
-               {
-                       get { return this.destination; }
-                       set { this.destination = value; }
-               }
-
                public MsgPriority Priority
                {
                        get { return this.priority; }

Modified: 
activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/test/csharp/ZMQTest.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/test/csharp/ZMQTest.cs?rev=1579357&r1=1579356&r2=1579357&view=diff
==============================================================================
--- activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/test/csharp/ZMQTest.cs 
(original)
+++ activemq/activemq-dotnet/Apache.NMS.ZMQ/trunk/src/test/csharp/ZMQTest.cs 
Wed Mar 19 19:09:34 2014
@@ -24,7 +24,7 @@ namespace Apache.NMS.ZMQ
        [TestFixture]
        public class ZMQTest : BaseTest
        {
-               private bool receivedTestMessage = false;
+               private int receivedMsgCount = 0;
 
                [Test]
                public void TestConnection()
@@ -132,46 +132,62 @@ namespace Apache.NMS.ZMQ
 
                [Test]
                public void TestSendReceive(
+                       // inproc, ipc, tcp, pgm, or epgm
+                       [Values("zmq:tcp://localhost:5556", 
"zmq:inproc://localhost:5557")]
+                       string connectionName,
                        [Values("queue://ZMQTestQueue", "topic://ZMQTestTopic", 
"temp-queue://ZMQTempQueue", "temp-topic://ZMQTempTopic")]
-                       string destination)
+                       string destinationName)
                {
-                       IConnectionFactory factory = 
NMSConnectionFactory.CreateConnectionFactory(new 
Uri("zmq:tcp://localhost:5556"));
+                       IConnectionFactory factory = 
NMSConnectionFactory.CreateConnectionFactory(new Uri(connectionName));
                        Assert.IsNotNull(factory, "Error creating connection 
factory.");
 
-                       this.receivedTestMessage = false;
+                       this.receivedMsgCount = 0;
                        using(IConnection connection = 
factory.CreateConnection())
                        {
                                Assert.IsNotNull(connection, "Problem creating 
connection class. Usually problem with libzmq and clrzmq ");
                                using(ISession session = 
connection.CreateSession())
                                {
                                        Assert.IsNotNull(session, "Error 
creating Session.");
-                                       using(IDestination testDestination = 
session.GetDestination(destination))
+                                       using(IDestination testDestination = 
session.GetDestination(destinationName))
                                        {
-                                               
Assert.IsNotNull(testDestination, "Error creating test destination: {0}", 
destination);
+                                               
Assert.IsNotNull(testDestination, "Error creating test destination: {0}", 
destinationName);
                                                using(IMessageConsumer consumer 
= session.CreateConsumer(testDestination))
                                                {
-                                                       
Assert.IsNotNull(consumer, "Error creating consumer on {0}", destination);
-                                                       consumer.Listener += 
OnMessage;
-                                                       using(IMessageProducer 
producer = session.CreateProducer(testDestination))
+                                                       
Assert.IsNotNull(consumer, "Error creating consumer on {0}", destinationName);
+                                                       int sendMsgCount = 0;
+                                                       try
                                                        {
-                                                               
Assert.IsNotNull(consumer, "Error creating producer on {0}", destination);
-                                                               ITextMessage 
testMsg = producer.CreateTextMessage("Zero Message.");
-                                                               
Assert.IsNotNull(testMsg, "Error creating test message.");
-                                                               
producer.Send(testMsg);
-                                                       }
-
-                                                       // Wait for the message
-                                                       DateTime startWaitTime 
= DateTime.Now;
-                                                       TimeSpan maxWaitTime = 
TimeSpan.FromSeconds(5);
-
-                                                       
while(!receivedTestMessage)
-                                                       {
-                                                               
if((DateTime.Now - startWaitTime) > maxWaitTime)
+                                                               
consumer.Listener += OnMessage;
+                                                               
using(IMessageProducer producer = session.CreateProducer(testDestination))
                                                                {
-                                                                       
Assert.Fail("Timeout waiting for message receive.");
-                                                               }
+                                                                       
Assert.IsNotNull(consumer, "Error creating producer on {0}", destinationName);
+                                                                       
ITextMessage testMsg = producer.CreateTextMessage("Zero Message.");
+                                                                       
Assert.IsNotNull(testMsg, "Error creating test message.");
+
+                                                                       // Wait 
for the message
+                                                                       
DateTime startWaitTime = DateTime.Now;
+                                                                       
TimeSpan maxWaitTime = TimeSpan.FromSeconds(5);
+
+                                                                       // 
Continually send the message to compensate for the
+                                                                       // slow 
joiner problem inherent to spinning up the
+                                                                       // 
internal dispatching threads in ZeroMQ.
+                                                                       
while(this.receivedMsgCount < 1)
+                                                                       {
+                                                                               
++sendMsgCount;
+                                                                               
producer.Send(testMsg);
+                                                                               
if((DateTime.Now - startWaitTime) > maxWaitTime)
+                                                                               
{
+                                                                               
        Assert.Fail("Timeout waiting for message receive.");
+                                                                               
}
 
-                                                               Thread.Sleep(5);
+                                                                               
Thread.Sleep(1);
+                                                                       }
+                                                               }
+                                                       }
+                                                       finally
+                                                       {
+                                                               
consumer.Listener -= OnMessage;
+                                                               
Console.WriteLine("Sent {0} msgs.\nReceived {1} msgs", sendMsgCount, 
this.receivedMsgCount);
                                                        }
                                                }
                                        }
@@ -188,7 +204,7 @@ namespace Apache.NMS.ZMQ
                        Assert.IsInstanceOf<TextMessage>(message, "Wrong 
message type received.");
                        ITextMessage textMsg = (ITextMessage) message;
                        Assert.AreEqual(textMsg.Text, "Zero Message.");
-                       receivedTestMessage = true;
+                       this.receivedMsgCount++;
                }
        }
 }


Reply via email to