Author: robbie
Date: Tue Mar 30 11:50:18 2010
New Revision: 929095
URL: http://svn.apache.org/viewvc?rev=929095&view=rev
Log:
QPID-2417 , QPID-2418 , QPID-2449 : expand topic testing, specifically around
the change and unsubscription of durable subscriptions
Modified:
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ct/DurableSubscriberTest.java
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
qpid/trunk/qpid/java/test-profiles/Excludes
qpid/trunk/qpid/java/test-profiles/JavaExcludes
Modified:
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ct/DurableSubscriberTest.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ct/DurableSubscriberTest.java?rev=929095&r1=929094&r2=929095&view=diff
==============================================================================
---
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ct/DurableSubscriberTest.java
(original)
+++
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ct/DurableSubscriberTest.java
Tue Mar 30 11:50:18 2010
@@ -19,6 +19,10 @@ package org.apache.qpid.test.unit.ct;
import javax.jms.*;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.AMQTopic;
import org.apache.qpid.test.utils.QpidTestCase;
/**
@@ -163,5 +167,301 @@ public class DurableSubscriberTest exten
durConnection2.close();
}
}
+
+ /**
+ * create and register a durable subscriber without a message selector and
then unsubscribe it
+ * create and register a durable subscriber with a message selector and
then close it
+ * restart the broker
+ * send matching and non matching messages
+ * recreate and register the durable subscriber with a message selector
+ * verify only the matching messages are received
+ */
+ public void testDurSubChangedToHaveSelectorThenRestart() throws Exception
+ {
+ if (! isBrokerStorePersistent())
+ {
+ _logger.warn("Test skipped due to requirement of a persistent
store");
+ return;
+ }
+
+ final String SUB_NAME=getTestQueueName();
+
+ TopicConnectionFactory factory = getConnectionFactory();
+ Topic topic = (Topic) getInitialContext().lookup(_topicName);
+
+ //create and register a durable subscriber then unsubscribe it
+ TopicConnection durConnection = factory.createTopicConnection("guest",
"guest");
+ TopicSession durSession = durConnection.createTopicSession(false,
Session.AUTO_ACKNOWLEDGE);
+ TopicSubscriber durSub1 = durSession.createDurableSubscriber(topic,
SUB_NAME);
+ durConnection.start();
+ durSub1.close();
+ durSession.unsubscribe(SUB_NAME);
+ durSession.close();
+ durConnection.close();
+
+ //create and register a durable subscriber with a message selector and
then close it
+ TopicConnection durConnection2 =
factory.createTopicConnection("guest", "guest");
+ TopicSession durSession2 = durConnection2.createTopicSession(false,
Session.AUTO_ACKNOWLEDGE);
+ TopicSubscriber durSub2 = durSession2.createDurableSubscriber(topic,
SUB_NAME, "testprop='true'", false);
+ durConnection2.start();
+ durSub2.close();
+ durSession2.close();
+ durConnection2.close();
+
+ //now restart the server
+ try
+ {
+ restartBroker();
+ }
+ catch (Exception e)
+ {
+ _logger.error("problems restarting broker: " + e);
+ throw e;
+ }
+
+ //send messages matching and not matching the selector
+ TopicConnection pubConnection = factory.createTopicConnection("guest",
"guest");
+ TopicSession pubSession = pubConnection.createTopicSession(false,
Session.AUTO_ACKNOWLEDGE);
+ TopicPublisher publisher = pubSession.createPublisher(topic);
+ for (int i = 0; i < 5; i++)
+ {
+ Message message = pubSession.createMessage();
+ message.setStringProperty("testprop", "true");
+ publisher.publish(message);
+ message = pubSession.createMessage();
+ message.setStringProperty("testprop", "false");
+ publisher.publish(message);
+ }
+ publisher.close();
+ pubSession.close();
+
+ //now recreate the durable subscriber with selector to check there are
no exceptions generated
+ //and then verify the messages are received correctly
+ TopicConnection durConnection3 = (TopicConnection)
factory.createConnection("guest", "guest");
+ TopicSession durSession3 = (TopicSession)
durConnection3.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ TopicSubscriber durSub3 = durSession3.createDurableSubscriber(topic,
SUB_NAME, "testprop='true'", false);
+ durConnection3.start();
+
+ for (int i = 0; i < 5; i++)
+ {
+ Message message = durSub3.receive(2000);
+ if (message == null)
+ {
+ fail("testDurSubChangedToHaveSelectorThenRestart test failed.
Expected message " + i + " was not returned");
+ }
+ else
+ {
+ assertTrue("testDurSubChangedToHaveSelectorThenRestart test
failed. Got message not matching selector",
+
message.getStringProperty("testprop").equals("true"));
+ }
+ }
+
+ durSub3.close();
+ durSession3.unsubscribe(SUB_NAME);
+ durSession3.close();
+ durConnection3.close();
+ }
+
+
+ /**
+ * create and register a durable subscriber with a message selector and
then unsubscribe it
+ * create and register a durable subscriber without a message selector and
then close it
+ * restart the broker
+ * send matching and non matching messages
+ * recreate and register the durable subscriber without a message selector
+ * verify ALL the sent messages are received
+ */
+ public void testDurSubChangedToNotHaveSelectorThenRestart() throws
Exception
+ {
+ if (! isBrokerStorePersistent())
+ {
+ _logger.warn("Test skipped due to requirement of a persistent
store");
+ return;
+ }
+
+ final String SUB_NAME=getTestQueueName();
+
+ TopicConnectionFactory factory = getConnectionFactory();
+ Topic topic = (Topic) getInitialContext().lookup(_topicName);
+
+ //create and register a durable subscriber with selector then
unsubscribe it
+ TopicConnection durConnection = factory.createTopicConnection("guest",
"guest");
+ TopicSession durSession = durConnection.createTopicSession(false,
Session.AUTO_ACKNOWLEDGE);
+ TopicSubscriber durSub1 = durSession.createDurableSubscriber(topic,
SUB_NAME, "testprop='true'", false);
+ durConnection.start();
+ durSub1.close();
+ durSession.unsubscribe(SUB_NAME);
+ durSession.close();
+ durConnection.close();
+
+ //create and register a durable subscriber without the message
selector and then close it
+ TopicConnection durConnection2 =
factory.createTopicConnection("guest", "guest");
+ TopicSession durSession2 = durConnection2.createTopicSession(false,
Session.AUTO_ACKNOWLEDGE);
+ TopicSubscriber durSub2 = durSession2.createDurableSubscriber(topic,
SUB_NAME);
+ durConnection2.start();
+ durSub2.close();
+ durSession2.close();
+ durConnection2.close();
+
+ //now restart the server
+ try
+ {
+ restartBroker();
+ }
+ catch (Exception e)
+ {
+ _logger.error("problems restarting broker: " + e);
+ throw e;
+ }
+
+ //send messages matching and not matching the original used selector
+ TopicConnection pubConnection = factory.createTopicConnection("guest",
"guest");
+ TopicSession pubSession = pubConnection.createTopicSession(false,
Session.AUTO_ACKNOWLEDGE);
+ TopicPublisher publisher = pubSession.createPublisher(topic);
+ for (int i = 1; i <= 5; i++)
+ {
+ Message message = pubSession.createMessage();
+ message.setStringProperty("testprop", "true");
+ publisher.publish(message);
+ message = pubSession.createMessage();
+ message.setStringProperty("testprop", "false");
+ publisher.publish(message);
+ }
+ publisher.close();
+ pubSession.close();
+
+ //now recreate the durable subscriber without selector to check there
are no exceptions generated
+ //then verify ALL messages sent are received
+ TopicConnection durConnection3 = (TopicConnection)
factory.createConnection("guest", "guest");
+ TopicSession durSession3 = (TopicSession)
durConnection3.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ TopicSubscriber durSub3 = durSession3.createDurableSubscriber(topic,
SUB_NAME);
+ durConnection3.start();
+
+ for (int i = 1; i <= 10; i++)
+ {
+ Message message = durSub3.receive(2000);
+ if (message == null)
+ {
+ fail("testDurSubChangedToNotHaveSelectorThenRestart test
failed. Expected message " + i + " was not received");
+ }
+ }
+
+ durSub3.close();
+ durSession3.unsubscribe(SUB_NAME);
+ durSession3.close();
+ durConnection3.close();
+ }
+
+
+ public void testResubscribeWithChangedSelectorAndRestart() throws Exception
+ {
+ if (! isBrokerStorePersistent())
+ {
+ _logger.warn("Test skipped due to requirement of a persistent
store");
+ return;
+ }
+
+ Connection conn = getConnection();
+ conn.start();
+ Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ AMQTopic topic = new AMQTopic((AMQConnection) conn,
"testResubscribeWithChangedSelectorAndRestart");
+ MessageProducer producer = session.createProducer(topic);
+
+ // Create durable subscriber that matches A
+ TopicSubscriber subA = session.createDurableSubscriber(topic,
+ "testResubscribeWithChangedSelector",
+ "Match = True", false);
+
+ // Send 1 matching message and 1 non-matching message
+ TextMessage msg =
session.createTextMessage("testResubscribeWithChangedSelectorAndRestart1");
+ msg.setBooleanProperty("Match", true);
+ producer.send(msg);
+ msg =
session.createTextMessage("testResubscribeWithChangedSelectorAndRestart2");
+ msg.setBooleanProperty("Match", false);
+ producer.send(msg);
+
+ Message rMsg = subA.receive(1000);
+ assertNotNull(rMsg);
+ assertEquals("Content was wrong",
+ "testResubscribeWithChangedSelectorAndRestart1",
+ ((TextMessage) rMsg).getText());
+
+ rMsg = subA.receive(1000);
+ assertNull(rMsg);
+
+ // Send another 1 matching message and 1 non-matching message
+ msg =
session.createTextMessage("testResubscribeWithChangedSelectorAndRestart1");
+ msg.setBooleanProperty("Match", true);
+ producer.send(msg);
+ msg =
session.createTextMessage("testResubscribeWithChangedSelectorAndRestart2");
+ msg.setBooleanProperty("Match", false);
+ producer.send(msg);
+
+ // Disconnect subscriber without receiving the message to
+ //leave it on the underlying queue
+ subA.close();
+
+ // Reconnect with new selector that matches B
+ TopicSubscriber subB = session.createDurableSubscriber(topic,
+ "testResubscribeWithChangedSelectorAndRestart","Match =
False", false);
+
+ //verify no messages are now present on the queue as changing selector
should have issued
+ //an unsubscribe and thus deleted the previous durable backing queue
for the subscription.
+ //check the dur sub's underlying queue now has msg count 1
+ AMQQueue subQueue = new AMQQueue("amq.topic", "clientid" + ":" +
"testResubscribeWithChangedSelector");
+ assertEquals("Msg count should be 0", 0,
((AMQSession)session).getQueueDepth(subQueue));
+
+
+ // Check that new messages are received properly
+ msg =
session.createTextMessage("testResubscribeWithChangedSelectorAndRestart1");
+ msg.setBooleanProperty("Match", true);
+ producer.send(msg);
+ msg =
session.createTextMessage("testResubscribeWithChangedSelectorAndRestart2");
+ msg.setBooleanProperty("Match", false);
+ producer.send(msg);
+
+ rMsg = subB.receive(1000);
+ assertNotNull(rMsg);
+ assertEquals("Content was wrong",
+ "testResubscribeWithChangedSelectorAndRestart2",
+ ((TextMessage) rMsg).getText());
+
+ rMsg = subB.receive(1000);
+ assertNull(rMsg);
+
+ //now restart the server
+ try
+ {
+ restartBroker();
+ }
+ catch (Exception e)
+ {
+ _logger.error("problems restarting broker: " + e);
+ throw e;
+ }
+
+ // Check that new messages are still received properly
+ msg =
session.createTextMessage("testResubscribeWithChangedSelectorAndRestart1");
+ msg.setBooleanProperty("Match", true);
+ producer.send(msg);
+ msg =
session.createTextMessage("testResubscribeWithChangedSelectorAndRestart2");
+ msg.setBooleanProperty("Match", false);
+ producer.send(msg);
+
+ rMsg = subB.receive(1000);
+ assertNotNull(rMsg);
+ assertEquals("Content was wrong",
+ "testResubscribeWithChangedSelectorAndRestart2",
+ ((TextMessage) rMsg).getText());
+
+ rMsg = subB.receive(1000);
+ assertNull(rMsg);
+
+ session.unsubscribe("testResubscribeWithChangedSelectorAndRestart");
+ subB.close();
+ session.close();
+ conn.close();
+ }
+
}
Modified:
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java?rev=929095&r1=929094&r2=929095&view=diff
==============================================================================
---
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
(original)
+++
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
Tue Mar 30 11:50:18 2010
@@ -20,8 +20,13 @@
*/
package org.apache.qpid.test.unit.topic;
+import java.io.IOException;
+import java.util.Set;
+
+import org.apache.qpid.management.common.JMXConnnectionFactory;
import org.apache.qpid.test.utils.QpidTestCase;
import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.AMQTopic;
@@ -39,6 +44,9 @@ import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicSubscriber;
+import javax.management.MBeanServerConnection;
+import javax.management.ObjectName;
+import javax.management.remote.JMXConnector;
/**
* @todo Code to check that a consumer gets only one particular method could
be factored into a re-usable method (as
@@ -58,6 +66,36 @@ public class DurableSubscriptionTest ext
/** Timeout for receive() if we are not expecting a message */
private static final long NEGATIVE_RECEIVE_TIMEOUT = 1000;
+ private JMXConnector _jmxc;
+ private MBeanServerConnection _mbsc;
+ private static final String USER = "admin";
+ private static final String PASSWORD = "admin";
+ private boolean _jmxConnected;
+
+ public void setUp() throws Exception
+ {
+ setConfigurationProperty("management.enabled", "true");
+ _jmxConnected=false;
+ super.setUp();
+ }
+
+ public void tearDown() throws Exception
+ {
+ if(_jmxConnected)
+ {
+ try
+ {
+ _jmxc.close();
+ }
+ catch (IOException e)
+ {
+ e.printStackTrace();
+ }
+ }
+
+ super.tearDown();
+ }
+
public void testUnsubscribe() throws Exception
{
AMQConnection con = (AMQConnection) getConnection("guest", "guest");
@@ -79,6 +117,12 @@ public class DurableSubscriptionTest ext
_logger.info("Producer sending message A");
producer.send(session1.createTextMessage("A"));
+
+ ((AMQSession)session1).sync();
+
+ //check the dur sub's underlying queue now has msg count 1
+ AMQQueue subQueue = new AMQQueue("amq.topic", "clientid" + ":" +
"MySubscription");
+ assertEquals("Msg count should be 1", 1,
((AMQSession)session1).getQueueDepth(subQueue));
Message msg;
_logger.info("Receive message on consumer 1:expecting A");
@@ -96,11 +140,46 @@ public class DurableSubscriptionTest ext
msg = consumer2.receive(NEGATIVE_RECEIVE_TIMEOUT);
_logger.info("Receive message on consumer 1 :expecting null");
assertEquals(null, msg);
+
+ ((AMQSession)session2).sync();
+
+ //check the dur sub's underlying queue now has msg count 0
+ assertEquals("Msg count should be 0", 0,
((AMQSession)session2).getQueueDepth(subQueue));
consumer2.close();
_logger.info("Unsubscribe session2/consumer2");
session2.unsubscribe("MySubscription");
-
+
+ ((AMQSession)session2).sync();
+
+ if(isJavaBroker() && isExternalBroker())
+ {
+ //Verify that the queue was deleted by querying for its JMX MBean
+ _jmxc = JMXConnnectionFactory.getJMXConnection(5000, "127.0.0.1",
+ getManagementPort(getPort()), USER, PASSWORD);
+
+ _jmxConnected = true;
+ _mbsc = _jmxc.getMBeanServerConnection();
+
+ //must replace the occurrence of ':' in queue name with '-'
+ String queueObjectNameText = "clientid" + "-" + "MySubscription";
+
+ ObjectName objName = new
ObjectName("org.apache.qpid:type=VirtualHost.Queue,name="
+ + queueObjectNameText + ",*");
+
+ Set<ObjectName> objectInstances = _mbsc.queryNames(objName, null);
+
+ if(objectInstances.size() != 0)
+ {
+ fail("Queue MBean was found. Expected queue to have been
deleted");
+ }
+ else
+ {
+ _logger.info("Underlying dueue for the durable subscription
was confirmed deleted.");
+ }
+ }
+
+ //verify unsubscribing the durable subscriber did not affect the
non-durable one
_logger.info("Producer sending message B");
producer.send(session1.createTextMessage("B"));
@@ -459,6 +538,9 @@ public class DurableSubscriptionTest ext
rMsg = subA.receive(NEGATIVE_RECEIVE_TIMEOUT);
assertNull(rMsg);
+ // Send another 1 matching message and 1 non-matching message
+ sendMatchingAndNonMatchingMessage(session, producer);
+
// Disconnect subscriber
subA.close();
@@ -466,9 +548,15 @@ public class DurableSubscriptionTest ext
TopicSubscriber subB = session.createDurableSubscriber(topic,
"testResubscribeWithChangedSelector","Match = False", false);
+ //verify no messages are now present as changing selector should have
issued
+ //an unsubscribe and thus deleted the previous backing queue for the
subscription.
+ rMsg = subB.receive(NEGATIVE_RECEIVE_TIMEOUT);
+ assertNull("Should not have received message as the queue underlying
the " +
+ "subscription should have been cleared/deleted when the
selector was changed", rMsg);
- // Check messages are recieved properly
+ // Check that new messages are received properly
sendMatchingAndNonMatchingMessage(session, producer);
+
rMsg = subB.receive(NEGATIVE_RECEIVE_TIMEOUT);
assertNotNull(rMsg);
assertEquals("Content was wrong",
Modified: qpid/trunk/qpid/java/test-profiles/Excludes
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/test-profiles/Excludes?rev=929095&r1=929094&r2=929095&view=diff
==============================================================================
--- qpid/trunk/qpid/java/test-profiles/Excludes (original)
+++ qpid/trunk/qpid/java/test-profiles/Excludes Tue Mar 30 11:50:18 2010
@@ -29,3 +29,5 @@ org.apache.qpid.test.unit.ack.Acknowledg
org.apache.qpid.test.unit.ack.AcknowledgeAfterFailoverOnMessageTest#*
org.apache.qpid.test.unit.ack.AcknowledgeAfterFailoverTest#*
+// QPID-2418 : The queue backing the dur sub is not currently deleted at
subscription change, so the test will fail.
+org.apache.qpid.test.unit.ct.DurableSubscriberTest#testResubscribeWithChangedSelectorAndRestart
Modified: qpid/trunk/qpid/java/test-profiles/JavaExcludes
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/test-profiles/JavaExcludes?rev=929095&r1=929094&r2=929095&view=diff
==============================================================================
--- qpid/trunk/qpid/java/test-profiles/JavaExcludes (original)
+++ qpid/trunk/qpid/java/test-profiles/JavaExcludes Tue Mar 30 11:50:18 2010
@@ -16,6 +16,3 @@ org.apache.qpid.client.SessionCreateTest
// QPID-2097 exclude it from the InVM test runs until InVM JMX Interface is
reliable
org.apache.qpid.management.jmx.ManagementActorLoggingTest#*
org.apache.qpid.server.queue.ModelTest#*
-
-//QPID-2422: Derby currently doesnt persist queue arguments and 0-91 support
causes exclusivity mismatch after restart
-org.apache.qpid.test.unit.ct.DurableSubscriberTest#*
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:[email protected]