Author: jgomes
Date: Fri Dec 12 10:02:26 2008
New Revision: 726078

URL: http://svn.apache.org/viewvc?rev=726078&view=rev
Log:
Modify MessageSelectorTest to delay creation of second consumer to show that 
first consumer is blocked until second consumer starts consuming.

Modified:
    
activemq/activemq-dotnet/Apache.NMS/trunk/src/test/csharp/MessageSelectorTest.cs

Modified: 
activemq/activemq-dotnet/Apache.NMS/trunk/src/test/csharp/MessageSelectorTest.cs
URL: 
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS/trunk/src/test/csharp/MessageSelectorTest.cs?rev=726078&r1=726077&r2=726078&view=diff
==============================================================================
--- 
activemq/activemq-dotnet/Apache.NMS/trunk/src/test/csharp/MessageSelectorTest.cs
 (original)
+++ 
activemq/activemq-dotnet/Apache.NMS/trunk/src/test/csharp/MessageSelectorTest.cs
 Fri Dec 12 10:02:26 2008
@@ -31,8 +31,10 @@
                protected const string TOPIC_DESTINATION_NAME = 
"topic://MessageSelectorTopic";
                protected const string TEST_CLIENT_ID = 
"MessageSelectorClientId";
                protected const string TEST_CLIENT_ID2 = 
"MessageSelectorClientId2";
+               protected const string TEST_CLIENT_ID3 = 
"MessageSelectorClientId3";
 
                private int receivedNonIgnoredMsgCount = 0;
+               private int receivedIgnoredMsgCount = 0;
 
 #if !NET_1_1
                [RowTest]
@@ -45,32 +47,39 @@
                {
                        using(IConnection connection1 = 
CreateConnection(TEST_CLIENT_ID))
                        using(IConnection connection2 = 
CreateConnection(TEST_CLIENT_ID2))
+                       using(IConnection connection3 = 
CreateConnection(TEST_CLIENT_ID3))
                        {
                                connection1.Start();
                                connection2.Start();
+                               connection3.Start();
                                using(ISession session1 = 
connection1.CreateSession(AcknowledgementMode.AutoAcknowledge))
                                using(ISession session2 = 
connection2.CreateSession(AcknowledgementMode.AutoAcknowledge))
+                               using(ISession session3 = 
connection3.CreateSession(AcknowledgementMode.AutoAcknowledge))
                                {
                                        IDestination destination1 = 
CreateDestination(session1, destinationName);
-                                       IDestination destination2 = 
CreateDestination(session2, destinationName);
+                                       IDestination destination2 = 
SessionUtil.GetDestination(session2, destinationName);
+                                       IDestination destination3 = 
SessionUtil.GetDestination(session2, destinationName);
 
                                        using(IMessageProducer producer = 
session1.CreateProducer(destination1))
-                                       using(IMessageConsumer consumer = 
session2.CreateConsumer(destination2, "JMSType NOT LIKE '%IGNORE'"))
+                                       using(IMessageConsumer consumer1 = 
session2.CreateConsumer(destination2, "JMSType NOT LIKE '%IGNORE'"))
                                        {
                                                const int MaxNumRequests = 
100000;
                                                int numNonIgnoredMsgsSent = 0;
+                                               int numIgnoredMsgsSent = 0;
 
                                                producer.Persistent = 
persistent;
                                                // producer.RequestTimeout = 
receiveTimeout;
 
                                                receivedNonIgnoredMsgCount = 0;
-                                               consumer.Listener += new 
MessageListener(OnNonIgnoredMessage);
+                                               receivedIgnoredMsgCount = 0;
+                                               consumer1.Listener += new 
MessageListener(OnNonIgnoredMessage);
+                                               IMessageConsumer consumer2 = 
null;
 
                                                for(int index = 1; index <= 
MaxNumRequests; index++)
                                                {
                                                        IMessage request = 
session1.CreateTextMessage(String.Format("Hello World! [{0} of {1}]", index, 
MaxNumRequests));
 
-                                                       //request.NMSTimeToLive 
= TimeSpan.FromSeconds(10);
+                                                       // 
request.NMSTimeToLive = TimeSpan.FromSeconds(10);
                                                        if(0 == (index % 2))
                                                        {
                                                                request.NMSType 
= "ACTIVE";
@@ -79,16 +88,51 @@
                                                        else
                                                        {
                                                                request.NMSType 
= "ACTIVE.IGNORE";
+                                                               
numIgnoredMsgsSent++;
                                                        }
 
                                                        producer.Send(request);
+
+                                                       if(20000 == index)
+                                                       {
+                                                               // Start the 
second consumer
+                                                               consumer2 = 
session3.CreateConsumer(destination2, "JMSType LIKE '%IGNORE'");
+                                                               
consumer2.Listener += new MessageListener(OnIgnoredMessage);
+                                                       }
                                                }
 
-                                               
while(receivedNonIgnoredMsgCount < numNonIgnoredMsgsSent)
+                                               int waitCount = 0;
+                                               int 
lastReceivedINongnoredMsgCount = receivedNonIgnoredMsgCount;
+                                               int lastReceivedIgnoredMsgCount 
= receivedIgnoredMsgCount;
+
+                                               
while(receivedNonIgnoredMsgCount < numNonIgnoredMsgsSent
+                                                               || 
receivedIgnoredMsgCount < numIgnoredMsgsSent)
                                                {
+                                                       
if(lastReceivedINongnoredMsgCount != receivedNonIgnoredMsgCount
+                                                               || 
lastReceivedIgnoredMsgCount != receivedIgnoredMsgCount)
+                                                       {
+                                                               // Reset the 
wait count.
+                                                               waitCount = 0;
+                                                               
Console.WriteLine("Reset the wait count while we are still receiving msgs.");
+                                                               
Thread.Sleep(2000);
+                                                               continue;
+                                                       }
+
+                                                       
lastReceivedINongnoredMsgCount = receivedNonIgnoredMsgCount;
+                                                       
lastReceivedIgnoredMsgCount = receivedIgnoredMsgCount;
+
+                                                       if(waitCount > 60)
+                                                       {
+                                                               
Assert.Fail(String.Format("Timeout waiting for all messages to be delivered. 
Only {0} of {1} non-ignored messages delivered.  Only {2} of {3} ignored 
messages delivered.",
+                                                                       
receivedNonIgnoredMsgCount, numNonIgnoredMsgsSent, receivedIgnoredMsgCount, 
numIgnoredMsgsSent));
+                                                       }
+
                                                        
Console.WriteLine("Waiting to receive all non-ignored messages...");
                                                        Thread.Sleep(1000);
+                                                       waitCount++;
                                                }
+
+                                               consumer2.Dispose();
                                        }
                                }
                        }
@@ -99,5 +143,12 @@
                        receivedNonIgnoredMsgCount++;
                        Assert.AreEqual(message.NMSType, "ACTIVE");
                }
+
+               protected void OnIgnoredMessage(IMessage message)
+               {
+                       receivedIgnoredMsgCount++;
+                       Assert.AreEqual(message.NMSType, "ACTIVE.IGNORE");
+                       Thread.Sleep(100);
+               }
        }
 }


Reply via email to