Author: jgomes
Date: Fri Dec 12 10:02:26 2008
New Revision: 726078
URL: http://svn.apache.org/viewvc?rev=726078&view=rev
Log:
Modify MessageSelectorTest to delay creation of second consumer to show that
first consumer is blocked until second consumer starts consuming.
Modified:
activemq/activemq-dotnet/Apache.NMS/trunk/src/test/csharp/MessageSelectorTest.cs
Modified:
activemq/activemq-dotnet/Apache.NMS/trunk/src/test/csharp/MessageSelectorTest.cs
URL:
http://svn.apache.org/viewvc/activemq/activemq-dotnet/Apache.NMS/trunk/src/test/csharp/MessageSelectorTest.cs?rev=726078&r1=726077&r2=726078&view=diff
==============================================================================
---
activemq/activemq-dotnet/Apache.NMS/trunk/src/test/csharp/MessageSelectorTest.cs
(original)
+++
activemq/activemq-dotnet/Apache.NMS/trunk/src/test/csharp/MessageSelectorTest.cs
Fri Dec 12 10:02:26 2008
@@ -31,8 +31,10 @@
protected const string TOPIC_DESTINATION_NAME =
"topic://MessageSelectorTopic";
protected const string TEST_CLIENT_ID =
"MessageSelectorClientId";
protected const string TEST_CLIENT_ID2 =
"MessageSelectorClientId2";
+ protected const string TEST_CLIENT_ID3 =
"MessageSelectorClientId3";
private int receivedNonIgnoredMsgCount = 0;
+ private int receivedIgnoredMsgCount = 0;
#if !NET_1_1
[RowTest]
@@ -45,32 +47,39 @@
{
using(IConnection connection1 =
CreateConnection(TEST_CLIENT_ID))
using(IConnection connection2 =
CreateConnection(TEST_CLIENT_ID2))
+ using(IConnection connection3 =
CreateConnection(TEST_CLIENT_ID3))
{
connection1.Start();
connection2.Start();
+ connection3.Start();
using(ISession session1 =
connection1.CreateSession(AcknowledgementMode.AutoAcknowledge))
using(ISession session2 =
connection2.CreateSession(AcknowledgementMode.AutoAcknowledge))
+ using(ISession session3 =
connection3.CreateSession(AcknowledgementMode.AutoAcknowledge))
{
IDestination destination1 =
CreateDestination(session1, destinationName);
- IDestination destination2 =
CreateDestination(session2, destinationName);
+ IDestination destination2 =
SessionUtil.GetDestination(session2, destinationName);
+ IDestination destination3 =
SessionUtil.GetDestination(session2, destinationName);
using(IMessageProducer producer =
session1.CreateProducer(destination1))
- using(IMessageConsumer consumer =
session2.CreateConsumer(destination2, "JMSType NOT LIKE '%IGNORE'"))
+ using(IMessageConsumer consumer1 =
session2.CreateConsumer(destination2, "JMSType NOT LIKE '%IGNORE'"))
{
const int MaxNumRequests =
100000;
int numNonIgnoredMsgsSent = 0;
+ int numIgnoredMsgsSent = 0;
producer.Persistent =
persistent;
// producer.RequestTimeout =
receiveTimeout;
receivedNonIgnoredMsgCount = 0;
- consumer.Listener += new
MessageListener(OnNonIgnoredMessage);
+ receivedIgnoredMsgCount = 0;
+ consumer1.Listener += new
MessageListener(OnNonIgnoredMessage);
+ IMessageConsumer consumer2 =
null;
for(int index = 1; index <=
MaxNumRequests; index++)
{
IMessage request =
session1.CreateTextMessage(String.Format("Hello World! [{0} of {1}]", index,
MaxNumRequests));
- //request.NMSTimeToLive
= TimeSpan.FromSeconds(10);
+ //
request.NMSTimeToLive = TimeSpan.FromSeconds(10);
if(0 == (index % 2))
{
request.NMSType
= "ACTIVE";
@@ -79,16 +88,51 @@
else
{
request.NMSType
= "ACTIVE.IGNORE";
+
numIgnoredMsgsSent++;
}
producer.Send(request);
+
+ if(20000 == index)
+ {
+ // Start the
second consumer
+ consumer2 =
session3.CreateConsumer(destination2, "JMSType LIKE '%IGNORE'");
+
consumer2.Listener += new MessageListener(OnIgnoredMessage);
+ }
}
-
while(receivedNonIgnoredMsgCount < numNonIgnoredMsgsSent)
+ int waitCount = 0;
+ int
lastReceivedINongnoredMsgCount = receivedNonIgnoredMsgCount;
+ int lastReceivedIgnoredMsgCount
= receivedIgnoredMsgCount;
+
+
while(receivedNonIgnoredMsgCount < numNonIgnoredMsgsSent
+ ||
receivedIgnoredMsgCount < numIgnoredMsgsSent)
{
+
if(lastReceivedINongnoredMsgCount != receivedNonIgnoredMsgCount
+ ||
lastReceivedIgnoredMsgCount != receivedIgnoredMsgCount)
+ {
+ // Reset the
wait count.
+ waitCount = 0;
+
Console.WriteLine("Reset the wait count while we are still receiving msgs.");
+
Thread.Sleep(2000);
+ continue;
+ }
+
+
lastReceivedINongnoredMsgCount = receivedNonIgnoredMsgCount;
+
lastReceivedIgnoredMsgCount = receivedIgnoredMsgCount;
+
+ if(waitCount > 60)
+ {
+
Assert.Fail(String.Format("Timeout waiting for all messages to be delivered.
Only {0} of {1} non-ignored messages delivered. Only {2} of {3} ignored
messages delivered.",
+
receivedNonIgnoredMsgCount, numNonIgnoredMsgsSent, receivedIgnoredMsgCount,
numIgnoredMsgsSent));
+ }
+
Console.WriteLine("Waiting to receive all non-ignored messages...");
Thread.Sleep(1000);
+ waitCount++;
}
+
+ consumer2.Dispose();
}
}
}
@@ -99,5 +143,12 @@
receivedNonIgnoredMsgCount++;
Assert.AreEqual(message.NMSType, "ACTIVE");
}
+
+ protected void OnIgnoredMessage(IMessage message)
+ {
+ receivedIgnoredMsgCount++;
+ Assert.AreEqual(message.NMSType, "ACTIVE.IGNORE");
+ Thread.Sleep(100);
+ }
}
}