Robbie, The change to the test case is causing the test to fail when run against the c++ (which should be expected as we have a bug in the Java client). However it's passing against the Java broker, so perhaps better to investigate this. I suspect the broker is creating a new queue bcos the args list is different (the selector being different)?
Also I think we shouldn't change any tests before a fix is made. Currently our automated builds are failing due to this. I hope andrew gets a chance to check in his fix soon. Regards, Rajith On Tue, Mar 30, 2010 at 7:50 AM, <[email protected]> wrote: > Author: robbie > Date: Tue Mar 30 11:50:18 2010 > New Revision: 929095 > > URL: http://svn.apache.org/viewvc?rev=929095&view=rev > Log: > QPID-2417 , QPID-2418 , QPID-2449 : expand topic testing, specifically around > the change and unsubscription of durable subscriptions > > Modified: > > qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ct/DurableSubscriberTest.java > > qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java > qpid/trunk/qpid/java/test-profiles/Excludes > qpid/trunk/qpid/java/test-profiles/JavaExcludes > > Modified: > qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ct/DurableSubscriberTest.java > URL: > http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ct/DurableSubscriberTest.java?rev=929095&r1=929094&r2=929095&view=diff > ============================================================================== > --- > qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ct/DurableSubscriberTest.java > (original) > +++ > qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ct/DurableSubscriberTest.java > Tue Mar 30 11:50:18 2010 > @@ -19,6 +19,10 @@ package org.apache.qpid.test.unit.ct; > > import javax.jms.*; > > +import org.apache.qpid.client.AMQConnection; > +import org.apache.qpid.client.AMQQueue; > +import org.apache.qpid.client.AMQSession; > +import org.apache.qpid.client.AMQTopic; > import org.apache.qpid.test.utils.QpidTestCase; > > /** > @@ -163,5 +167,301 @@ public class DurableSubscriberTest exten > durConnection2.close(); > } > } > + > + /** > + * create and register a durable subscriber without a message selector > and then unsubscribe it > + * create and register a durable subscriber with a message selector and > then close it > + * restart the broker > + * send matching and non matching messages > + * recreate and register the durable subscriber with a message selector > + * verify only the matching messages are received > + */ > + public void testDurSubChangedToHaveSelectorThenRestart() throws Exception > + { > + if (! isBrokerStorePersistent()) > + { > + _logger.warn("Test skipped due to requirement of a persistent > store"); > + return; > + } > + > + final String SUB_NAME=getTestQueueName(); > + > + TopicConnectionFactory factory = getConnectionFactory(); > + Topic topic = (Topic) getInitialContext().lookup(_topicName); > + > + //create and register a durable subscriber then unsubscribe it > + TopicConnection durConnection = > factory.createTopicConnection("guest", "guest"); > + TopicSession durSession = durConnection.createTopicSession(false, > Session.AUTO_ACKNOWLEDGE); > + TopicSubscriber durSub1 = durSession.createDurableSubscriber(topic, > SUB_NAME); > + durConnection.start(); > + durSub1.close(); > + durSession.unsubscribe(SUB_NAME); > + durSession.close(); > + durConnection.close(); > + > + //create and register a durable subscriber with a message selector > and then close it > + TopicConnection durConnection2 = > factory.createTopicConnection("guest", "guest"); > + TopicSession durSession2 = durConnection2.createTopicSession(false, > Session.AUTO_ACKNOWLEDGE); > + TopicSubscriber durSub2 = durSession2.createDurableSubscriber(topic, > SUB_NAME, "testprop='true'", false); > + durConnection2.start(); > + durSub2.close(); > + durSession2.close(); > + durConnection2.close(); > + > + //now restart the server > + try > + { > + restartBroker(); > + } > + catch (Exception e) > + { > + _logger.error("problems restarting broker: " + e); > + throw e; > + } > + > + //send messages matching and not matching the selector > + TopicConnection pubConnection = > factory.createTopicConnection("guest", "guest"); > + TopicSession pubSession = pubConnection.createTopicSession(false, > Session.AUTO_ACKNOWLEDGE); > + TopicPublisher publisher = pubSession.createPublisher(topic); > + for (int i = 0; i < 5; i++) > + { > + Message message = pubSession.createMessage(); > + message.setStringProperty("testprop", "true"); > + publisher.publish(message); > + message = pubSession.createMessage(); > + message.setStringProperty("testprop", "false"); > + publisher.publish(message); > + } > + publisher.close(); > + pubSession.close(); > + > + //now recreate the durable subscriber with selector to check there > are no exceptions generated > + //and then verify the messages are received correctly > + TopicConnection durConnection3 = (TopicConnection) > factory.createConnection("guest", "guest"); > + TopicSession durSession3 = (TopicSession) > durConnection3.createSession(false, Session.AUTO_ACKNOWLEDGE); > + TopicSubscriber durSub3 = durSession3.createDurableSubscriber(topic, > SUB_NAME, "testprop='true'", false); > + durConnection3.start(); > + > + for (int i = 0; i < 5; i++) > + { > + Message message = durSub3.receive(2000); > + if (message == null) > + { > + fail("testDurSubChangedToHaveSelectorThenRestart test > failed. Expected message " + i + " was not returned"); > + } > + else > + { > + assertTrue("testDurSubChangedToHaveSelectorThenRestart test > failed. Got message not matching selector", > + > message.getStringProperty("testprop").equals("true")); > + } > + } > + > + durSub3.close(); > + durSession3.unsubscribe(SUB_NAME); > + durSession3.close(); > + durConnection3.close(); > + } > + > + > + /** > + * create and register a durable subscriber with a message selector and > then unsubscribe it > + * create and register a durable subscriber without a message selector > and then close it > + * restart the broker > + * send matching and non matching messages > + * recreate and register the durable subscriber without a message > selector > + * verify ALL the sent messages are received > + */ > + public void testDurSubChangedToNotHaveSelectorThenRestart() throws > Exception > + { > + if (! isBrokerStorePersistent()) > + { > + _logger.warn("Test skipped due to requirement of a persistent > store"); > + return; > + } > + > + final String SUB_NAME=getTestQueueName(); > + > + TopicConnectionFactory factory = getConnectionFactory(); > + Topic topic = (Topic) getInitialContext().lookup(_topicName); > + > + //create and register a durable subscriber with selector then > unsubscribe it > + TopicConnection durConnection = > factory.createTopicConnection("guest", "guest"); > + TopicSession durSession = durConnection.createTopicSession(false, > Session.AUTO_ACKNOWLEDGE); > + TopicSubscriber durSub1 = durSession.createDurableSubscriber(topic, > SUB_NAME, "testprop='true'", false); > + durConnection.start(); > + durSub1.close(); > + durSession.unsubscribe(SUB_NAME); > + durSession.close(); > + durConnection.close(); > + > + //create and register a durable subscriber without the message > selector and then close it > + TopicConnection durConnection2 = > factory.createTopicConnection("guest", "guest"); > + TopicSession durSession2 = durConnection2.createTopicSession(false, > Session.AUTO_ACKNOWLEDGE); > + TopicSubscriber durSub2 = durSession2.createDurableSubscriber(topic, > SUB_NAME); > + durConnection2.start(); > + durSub2.close(); > + durSession2.close(); > + durConnection2.close(); > + > + //now restart the server > + try > + { > + restartBroker(); > + } > + catch (Exception e) > + { > + _logger.error("problems restarting broker: " + e); > + throw e; > + } > + > + //send messages matching and not matching the original used selector > + TopicConnection pubConnection = > factory.createTopicConnection("guest", "guest"); > + TopicSession pubSession = pubConnection.createTopicSession(false, > Session.AUTO_ACKNOWLEDGE); > + TopicPublisher publisher = pubSession.createPublisher(topic); > + for (int i = 1; i <= 5; i++) > + { > + Message message = pubSession.createMessage(); > + message.setStringProperty("testprop", "true"); > + publisher.publish(message); > + message = pubSession.createMessage(); > + message.setStringProperty("testprop", "false"); > + publisher.publish(message); > + } > + publisher.close(); > + pubSession.close(); > + > + //now recreate the durable subscriber without selector to check > there are no exceptions generated > + //then verify ALL messages sent are received > + TopicConnection durConnection3 = (TopicConnection) > factory.createConnection("guest", "guest"); > + TopicSession durSession3 = (TopicSession) > durConnection3.createSession(false, Session.AUTO_ACKNOWLEDGE); > + TopicSubscriber durSub3 = durSession3.createDurableSubscriber(topic, > SUB_NAME); > + durConnection3.start(); > + > + for (int i = 1; i <= 10; i++) > + { > + Message message = durSub3.receive(2000); > + if (message == null) > + { > + fail("testDurSubChangedToNotHaveSelectorThenRestart test > failed. Expected message " + i + " was not received"); > + } > + } > + > + durSub3.close(); > + durSession3.unsubscribe(SUB_NAME); > + durSession3.close(); > + durConnection3.close(); > + } > + > + > + public void testResubscribeWithChangedSelectorAndRestart() throws > Exception > + { > + if (! isBrokerStorePersistent()) > + { > + _logger.warn("Test skipped due to requirement of a persistent > store"); > + return; > + } > + > + Connection conn = getConnection(); > + conn.start(); > + Session session = conn.createSession(false, > Session.AUTO_ACKNOWLEDGE); > + AMQTopic topic = new AMQTopic((AMQConnection) conn, > "testResubscribeWithChangedSelectorAndRestart"); > + MessageProducer producer = session.createProducer(topic); > + > + // Create durable subscriber that matches A > + TopicSubscriber subA = session.createDurableSubscriber(topic, > + "testResubscribeWithChangedSelector", > + "Match = True", false); > + > + // Send 1 matching message and 1 non-matching message > + TextMessage msg = > session.createTextMessage("testResubscribeWithChangedSelectorAndRestart1"); > + msg.setBooleanProperty("Match", true); > + producer.send(msg); > + msg = > session.createTextMessage("testResubscribeWithChangedSelectorAndRestart2"); > + msg.setBooleanProperty("Match", false); > + producer.send(msg); > + > + Message rMsg = subA.receive(1000); > + assertNotNull(rMsg); > + assertEquals("Content was wrong", > + "testResubscribeWithChangedSelectorAndRestart1", > + ((TextMessage) rMsg).getText()); > + > + rMsg = subA.receive(1000); > + assertNull(rMsg); > + > + // Send another 1 matching message and 1 non-matching message > + msg = > session.createTextMessage("testResubscribeWithChangedSelectorAndRestart1"); > + msg.setBooleanProperty("Match", true); > + producer.send(msg); > + msg = > session.createTextMessage("testResubscribeWithChangedSelectorAndRestart2"); > + msg.setBooleanProperty("Match", false); > + producer.send(msg); > + > + // Disconnect subscriber without receiving the message to > + //leave it on the underlying queue > + subA.close(); > + > + // Reconnect with new selector that matches B > + TopicSubscriber subB = session.createDurableSubscriber(topic, > + "testResubscribeWithChangedSelectorAndRestart","Match = > False", false); > + > + //verify no messages are now present on the queue as changing > selector should have issued > + //an unsubscribe and thus deleted the previous durable backing queue > for the subscription. > + //check the dur sub's underlying queue now has msg count 1 > + AMQQueue subQueue = new AMQQueue("amq.topic", "clientid" + ":" + > "testResubscribeWithChangedSelector"); > + assertEquals("Msg count should be 0", 0, > ((AMQSession)session).getQueueDepth(subQueue)); > + > + > + // Check that new messages are received properly > + msg = > session.createTextMessage("testResubscribeWithChangedSelectorAndRestart1"); > + msg.setBooleanProperty("Match", true); > + producer.send(msg); > + msg = > session.createTextMessage("testResubscribeWithChangedSelectorAndRestart2"); > + msg.setBooleanProperty("Match", false); > + producer.send(msg); > + > + rMsg = subB.receive(1000); > + assertNotNull(rMsg); > + assertEquals("Content was wrong", > + "testResubscribeWithChangedSelectorAndRestart2", > + ((TextMessage) rMsg).getText()); > + > + rMsg = subB.receive(1000); > + assertNull(rMsg); > + > + //now restart the server > + try > + { > + restartBroker(); > + } > + catch (Exception e) > + { > + _logger.error("problems restarting broker: " + e); > + throw e; > + } > + > + // Check that new messages are still received properly > + msg = > session.createTextMessage("testResubscribeWithChangedSelectorAndRestart1"); > + msg.setBooleanProperty("Match", true); > + producer.send(msg); > + msg = > session.createTextMessage("testResubscribeWithChangedSelectorAndRestart2"); > + msg.setBooleanProperty("Match", false); > + producer.send(msg); > + > + rMsg = subB.receive(1000); > + assertNotNull(rMsg); > + assertEquals("Content was wrong", > + "testResubscribeWithChangedSelectorAndRestart2", > + ((TextMessage) rMsg).getText()); > + > + rMsg = subB.receive(1000); > + assertNull(rMsg); > + > + session.unsubscribe("testResubscribeWithChangedSelectorAndRestart"); > + subB.close(); > + session.close(); > + conn.close(); > + } > + > } > > > Modified: > qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java > URL: > http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java?rev=929095&r1=929094&r2=929095&view=diff > ============================================================================== > --- > qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java > (original) > +++ > qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java > Tue Mar 30 11:50:18 2010 > @@ -20,8 +20,13 @@ > */ > package org.apache.qpid.test.unit.topic; > > +import java.io.IOException; > +import java.util.Set; > + > +import org.apache.qpid.management.common.JMXConnnectionFactory; > import org.apache.qpid.test.utils.QpidTestCase; > import org.apache.qpid.client.AMQConnection; > +import org.apache.qpid.client.AMQQueue; > import org.apache.qpid.client.AMQSession; > import org.apache.qpid.client.AMQTopic; > > @@ -39,6 +44,9 @@ import javax.jms.Session; > import javax.jms.TextMessage; > import javax.jms.Topic; > import javax.jms.TopicSubscriber; > +import javax.management.MBeanServerConnection; > +import javax.management.ObjectName; > +import javax.management.remote.JMXConnector; > > /** > * @todo Code to check that a consumer gets only one particular method could > be factored into a re-usable method (as > @@ -58,6 +66,36 @@ public class DurableSubscriptionTest ext > /** Timeout for receive() if we are not expecting a message */ > private static final long NEGATIVE_RECEIVE_TIMEOUT = 1000; > > + private JMXConnector _jmxc; > + private MBeanServerConnection _mbsc; > + private static final String USER = "admin"; > + private static final String PASSWORD = "admin"; > + private boolean _jmxConnected; > + > + public void setUp() throws Exception > + { > + setConfigurationProperty("management.enabled", "true"); > + _jmxConnected=false; > + super.setUp(); > + } > + > + public void tearDown() throws Exception > + { > + if(_jmxConnected) > + { > + try > + { > + _jmxc.close(); > + } > + catch (IOException e) > + { > + e.printStackTrace(); > + } > + } > + > + super.tearDown(); > + } > + > public void testUnsubscribe() throws Exception > { > AMQConnection con = (AMQConnection) getConnection("guest", "guest"); > @@ -79,6 +117,12 @@ public class DurableSubscriptionTest ext > > _logger.info("Producer sending message A"); > producer.send(session1.createTextMessage("A")); > + > + ((AMQSession)session1).sync(); > + > + //check the dur sub's underlying queue now has msg count 1 > + AMQQueue subQueue = new AMQQueue("amq.topic", "clientid" + ":" + > "MySubscription"); > + assertEquals("Msg count should be 1", 1, > ((AMQSession)session1).getQueueDepth(subQueue)); > > Message msg; > _logger.info("Receive message on consumer 1:expecting A"); > @@ -96,11 +140,46 @@ public class DurableSubscriptionTest ext > msg = consumer2.receive(NEGATIVE_RECEIVE_TIMEOUT); > _logger.info("Receive message on consumer 1 :expecting null"); > assertEquals(null, msg); > + > + ((AMQSession)session2).sync(); > + > + //check the dur sub's underlying queue now has msg count 0 > + assertEquals("Msg count should be 0", 0, > ((AMQSession)session2).getQueueDepth(subQueue)); > > consumer2.close(); > _logger.info("Unsubscribe session2/consumer2"); > session2.unsubscribe("MySubscription"); > - > + > + ((AMQSession)session2).sync(); > + > + if(isJavaBroker() && isExternalBroker()) > + { > + //Verify that the queue was deleted by querying for its JMX MBean > + _jmxc = JMXConnnectionFactory.getJMXConnection(5000, "127.0.0.1", > + getManagementPort(getPort()), USER, PASSWORD); > + > + _jmxConnected = true; > + _mbsc = _jmxc.getMBeanServerConnection(); > + > + //must replace the occurrence of ':' in queue name with '-' > + String queueObjectNameText = "clientid" + "-" + "MySubscription"; > + > + ObjectName objName = new > ObjectName("org.apache.qpid:type=VirtualHost.Queue,name=" > + + queueObjectNameText + > ",*"); > + > + Set<ObjectName> objectInstances = _mbsc.queryNames(objName, > null); > + > + if(objectInstances.size() != 0) > + { > + fail("Queue MBean was found. Expected queue to have been > deleted"); > + } > + else > + { > + _logger.info("Underlying dueue for the durable subscription > was confirmed deleted."); > + } > + } > + > + //verify unsubscribing the durable subscriber did not affect the > non-durable one > _logger.info("Producer sending message B"); > producer.send(session1.createTextMessage("B")); > > @@ -459,6 +538,9 @@ public class DurableSubscriptionTest ext > rMsg = subA.receive(NEGATIVE_RECEIVE_TIMEOUT); > assertNull(rMsg); > > + // Send another 1 matching message and 1 non-matching message > + sendMatchingAndNonMatchingMessage(session, producer); > + > // Disconnect subscriber > subA.close(); > > @@ -466,9 +548,15 @@ public class DurableSubscriptionTest ext > TopicSubscriber subB = session.createDurableSubscriber(topic, > "testResubscribeWithChangedSelector","Match = False", false); > > + //verify no messages are now present as changing selector should > have issued > + //an unsubscribe and thus deleted the previous backing queue for the > subscription. > + rMsg = subB.receive(NEGATIVE_RECEIVE_TIMEOUT); > + assertNull("Should not have received message as the queue underlying > the " + > + "subscription should have been cleared/deleted when > the selector was changed", rMsg); > > - // Check messages are recieved properly > + // Check that new messages are received properly > sendMatchingAndNonMatchingMessage(session, producer); > + > rMsg = subB.receive(NEGATIVE_RECEIVE_TIMEOUT); > assertNotNull(rMsg); > assertEquals("Content was wrong", > > Modified: qpid/trunk/qpid/java/test-profiles/Excludes > URL: > http://svn.apache.org/viewvc/qpid/trunk/qpid/java/test-profiles/Excludes?rev=929095&r1=929094&r2=929095&view=diff > ============================================================================== > --- qpid/trunk/qpid/java/test-profiles/Excludes (original) > +++ qpid/trunk/qpid/java/test-profiles/Excludes Tue Mar 30 11:50:18 2010 > @@ -29,3 +29,5 @@ org.apache.qpid.test.unit.ack.Acknowledg > org.apache.qpid.test.unit.ack.AcknowledgeAfterFailoverOnMessageTest#* > org.apache.qpid.test.unit.ack.AcknowledgeAfterFailoverTest#* > > +// QPID-2418 : The queue backing the dur sub is not currently deleted at > subscription change, so the test will fail. > +org.apache.qpid.test.unit.ct.DurableSubscriberTest#testResubscribeWithChangedSelectorAndRestart > > Modified: qpid/trunk/qpid/java/test-profiles/JavaExcludes > URL: > http://svn.apache.org/viewvc/qpid/trunk/qpid/java/test-profiles/JavaExcludes?rev=929095&r1=929094&r2=929095&view=diff > ============================================================================== > --- qpid/trunk/qpid/java/test-profiles/JavaExcludes (original) > +++ qpid/trunk/qpid/java/test-profiles/JavaExcludes Tue Mar 30 11:50:18 2010 > @@ -16,6 +16,3 @@ org.apache.qpid.client.SessionCreateTest > // QPID-2097 exclude it from the InVM test runs until InVM JMX Interface is > reliable > org.apache.qpid.management.jmx.ManagementActorLoggingTest#* > org.apache.qpid.server.queue.ModelTest#* > - > -//QPID-2422: Derby currently doesnt persist queue arguments and 0-91 support > causes exclusivity mismatch after restart > -org.apache.qpid.test.unit.ct.DurableSubscriberTest#* > > > > --------------------------------------------------------------------- > Apache Qpid - AMQP Messaging Implementation > Project: http://qpid.apache.org > Use/Interact: mailto:[email protected] > > -- Regards, Rajith Attapattu Red Hat http://rajith.2rlabs.com/ --------------------------------------------------------------------- Apache Qpid - AMQP Messaging Implementation Project: http://qpid.apache.org Use/Interact: mailto:[email protected]
