Robbie,

The change to the test case is causing the test to fail when run
against the c++ (which should be expected as we have a bug in the Java
client).
However it's passing against the Java broker, so perhaps better to
investigate this.
I suspect the broker is creating a new queue bcos the args list is
different (the selector being different)?

Also I think we shouldn't change any tests before a fix is made.
Currently our automated builds are failing due to this.
I hope andrew gets a chance to check in his fix soon.

Regards,

Rajith

On Tue, Mar 30, 2010 at 7:50 AM,  <[email protected]> wrote:
> Author: robbie
> Date: Tue Mar 30 11:50:18 2010
> New Revision: 929095
>
> URL: http://svn.apache.org/viewvc?rev=929095&view=rev
> Log:
> QPID-2417 , QPID-2418 , QPID-2449 : expand topic testing, specifically around 
> the change and unsubscription of durable subscriptions
>
> Modified:
>    
> qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ct/DurableSubscriberTest.java
>    
> qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
>    qpid/trunk/qpid/java/test-profiles/Excludes
>    qpid/trunk/qpid/java/test-profiles/JavaExcludes
>
> Modified: 
> qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ct/DurableSubscriberTest.java
> URL: 
> http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ct/DurableSubscriberTest.java?rev=929095&r1=929094&r2=929095&view=diff
> ==============================================================================
> --- 
> qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ct/DurableSubscriberTest.java
>  (original)
> +++ 
> qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ct/DurableSubscriberTest.java
>  Tue Mar 30 11:50:18 2010
> @@ -19,6 +19,10 @@ package org.apache.qpid.test.unit.ct;
>
>  import javax.jms.*;
>
> +import org.apache.qpid.client.AMQConnection;
> +import org.apache.qpid.client.AMQQueue;
> +import org.apache.qpid.client.AMQSession;
> +import org.apache.qpid.client.AMQTopic;
>  import org.apache.qpid.test.utils.QpidTestCase;
>
>  /**
> @@ -163,5 +167,301 @@ public class DurableSubscriberTest exten
>             durConnection2.close();
>         }
>     }
> +
> +    /**
> +     * create and register a durable subscriber without a message selector 
> and then unsubscribe it
> +     * create and register a durable subscriber with a message selector and 
> then close it
> +     * restart the broker
> +     * send matching and non matching messages
> +     * recreate and register the durable subscriber with a message selector
> +     * verify only the matching messages are received
> +     */
> +    public void testDurSubChangedToHaveSelectorThenRestart() throws Exception
> +    {
> +        if (! isBrokerStorePersistent())
> +        {
> +            _logger.warn("Test skipped due to requirement of a persistent 
> store");
> +            return;
> +        }
> +
> +        final String SUB_NAME=getTestQueueName();
> +
> +        TopicConnectionFactory factory = getConnectionFactory();
> +        Topic topic = (Topic) getInitialContext().lookup(_topicName);
> +
> +        //create and register a durable subscriber then unsubscribe it
> +        TopicConnection durConnection = 
> factory.createTopicConnection("guest", "guest");
> +        TopicSession durSession = durConnection.createTopicSession(false, 
> Session.AUTO_ACKNOWLEDGE);
> +        TopicSubscriber durSub1 = durSession.createDurableSubscriber(topic, 
> SUB_NAME);
> +        durConnection.start();
> +        durSub1.close();
> +        durSession.unsubscribe(SUB_NAME);
> +        durSession.close();
> +        durConnection.close();
> +
> +        //create and register a durable subscriber with a message selector 
> and then close it
> +        TopicConnection durConnection2 = 
> factory.createTopicConnection("guest", "guest");
> +        TopicSession durSession2 = durConnection2.createTopicSession(false, 
> Session.AUTO_ACKNOWLEDGE);
> +        TopicSubscriber durSub2 = durSession2.createDurableSubscriber(topic, 
> SUB_NAME, "testprop='true'", false);
> +        durConnection2.start();
> +        durSub2.close();
> +        durSession2.close();
> +        durConnection2.close();
> +
> +        //now restart the server
> +        try
> +        {
> +            restartBroker();
> +        }
> +        catch (Exception e)
> +        {
> +            _logger.error("problems restarting broker: " + e);
> +            throw e;
> +        }
> +
> +        //send messages matching and not matching the selector
> +        TopicConnection pubConnection = 
> factory.createTopicConnection("guest", "guest");
> +        TopicSession pubSession = pubConnection.createTopicSession(false, 
> Session.AUTO_ACKNOWLEDGE);
> +        TopicPublisher publisher = pubSession.createPublisher(topic);
> +        for (int i = 0; i < 5; i++)
> +        {
> +            Message message = pubSession.createMessage();
> +            message.setStringProperty("testprop", "true");
> +            publisher.publish(message);
> +            message = pubSession.createMessage();
> +            message.setStringProperty("testprop", "false");
> +            publisher.publish(message);
> +        }
> +        publisher.close();
> +        pubSession.close();
> +
> +        //now recreate the durable subscriber with selector to check there 
> are no exceptions generated
> +        //and then verify the messages are received correctly
> +        TopicConnection durConnection3 = (TopicConnection) 
> factory.createConnection("guest", "guest");
> +        TopicSession durSession3 = (TopicSession) 
> durConnection3.createSession(false, Session.AUTO_ACKNOWLEDGE);
> +        TopicSubscriber durSub3 = durSession3.createDurableSubscriber(topic, 
> SUB_NAME, "testprop='true'", false);
> +        durConnection3.start();
> +
> +        for (int i = 0; i < 5; i++)
> +        {
> +            Message message = durSub3.receive(2000);
> +            if (message == null)
> +            {
> +                fail("testDurSubChangedToHaveSelectorThenRestart test 
> failed. Expected message " + i + " was not returned");
> +            }
> +            else
> +            {
> +                assertTrue("testDurSubChangedToHaveSelectorThenRestart test 
> failed. Got message not matching selector",
> +                           
> message.getStringProperty("testprop").equals("true"));
> +            }
> +        }
> +
> +        durSub3.close();
> +        durSession3.unsubscribe(SUB_NAME);
> +        durSession3.close();
> +        durConnection3.close();
> +    }
> +
> +
> +    /**
> +     * create and register a durable subscriber with a message selector and 
> then unsubscribe it
> +     * create and register a durable subscriber without a message selector 
> and then close it
> +     * restart the broker
> +     * send matching and non matching messages
> +     * recreate and register the durable subscriber without a message 
> selector
> +     * verify ALL the sent messages are received
> +     */
> +    public void testDurSubChangedToNotHaveSelectorThenRestart() throws 
> Exception
> +    {
> +        if (! isBrokerStorePersistent())
> +        {
> +            _logger.warn("Test skipped due to requirement of a persistent 
> store");
> +            return;
> +        }
> +
> +        final String SUB_NAME=getTestQueueName();
> +
> +        TopicConnectionFactory factory = getConnectionFactory();
> +        Topic topic = (Topic) getInitialContext().lookup(_topicName);
> +
> +        //create and register a durable subscriber with selector then 
> unsubscribe it
> +        TopicConnection durConnection = 
> factory.createTopicConnection("guest", "guest");
> +        TopicSession durSession = durConnection.createTopicSession(false, 
> Session.AUTO_ACKNOWLEDGE);
> +        TopicSubscriber durSub1 = durSession.createDurableSubscriber(topic, 
> SUB_NAME, "testprop='true'", false);
> +        durConnection.start();
> +        durSub1.close();
> +        durSession.unsubscribe(SUB_NAME);
> +        durSession.close();
> +        durConnection.close();
> +
> +        //create and register a durable subscriber without the message 
> selector and then close it
> +        TopicConnection durConnection2 = 
> factory.createTopicConnection("guest", "guest");
> +        TopicSession durSession2 = durConnection2.createTopicSession(false, 
> Session.AUTO_ACKNOWLEDGE);
> +        TopicSubscriber durSub2 = durSession2.createDurableSubscriber(topic, 
> SUB_NAME);
> +        durConnection2.start();
> +        durSub2.close();
> +        durSession2.close();
> +        durConnection2.close();
> +
> +        //now restart the server
> +        try
> +        {
> +            restartBroker();
> +        }
> +        catch (Exception e)
> +        {
> +            _logger.error("problems restarting broker: " + e);
> +            throw e;
> +        }
> +
> +        //send messages matching and not matching the original used selector
> +        TopicConnection pubConnection = 
> factory.createTopicConnection("guest", "guest");
> +        TopicSession pubSession = pubConnection.createTopicSession(false, 
> Session.AUTO_ACKNOWLEDGE);
> +        TopicPublisher publisher = pubSession.createPublisher(topic);
> +        for (int i = 1; i <= 5; i++)
> +        {
> +            Message message = pubSession.createMessage();
> +            message.setStringProperty("testprop", "true");
> +            publisher.publish(message);
> +            message = pubSession.createMessage();
> +            message.setStringProperty("testprop", "false");
> +            publisher.publish(message);
> +        }
> +        publisher.close();
> +        pubSession.close();
> +
> +        //now recreate the durable subscriber without selector to check 
> there are no exceptions generated
> +        //then verify ALL messages sent are received
> +        TopicConnection durConnection3 = (TopicConnection) 
> factory.createConnection("guest", "guest");
> +        TopicSession durSession3 = (TopicSession) 
> durConnection3.createSession(false, Session.AUTO_ACKNOWLEDGE);
> +        TopicSubscriber durSub3 = durSession3.createDurableSubscriber(topic, 
> SUB_NAME);
> +        durConnection3.start();
> +
> +        for (int i = 1; i <= 10; i++)
> +        {
> +            Message message = durSub3.receive(2000);
> +            if (message == null)
> +            {
> +                fail("testDurSubChangedToNotHaveSelectorThenRestart test 
> failed. Expected message " + i + " was not received");
> +            }
> +        }
> +
> +        durSub3.close();
> +        durSession3.unsubscribe(SUB_NAME);
> +        durSession3.close();
> +        durConnection3.close();
> +    }
> +
> +
> +    public void testResubscribeWithChangedSelectorAndRestart() throws 
> Exception
> +    {
> +        if (! isBrokerStorePersistent())
> +        {
> +            _logger.warn("Test skipped due to requirement of a persistent 
> store");
> +            return;
> +        }
> +
> +        Connection conn = getConnection();
> +        conn.start();
> +        Session session = conn.createSession(false, 
> Session.AUTO_ACKNOWLEDGE);
> +        AMQTopic topic = new AMQTopic((AMQConnection) conn, 
> "testResubscribeWithChangedSelectorAndRestart");
> +        MessageProducer producer = session.createProducer(topic);
> +
> +        // Create durable subscriber that matches A
> +        TopicSubscriber subA = session.createDurableSubscriber(topic,
> +                "testResubscribeWithChangedSelector",
> +                "Match = True", false);
> +
> +        // Send 1 matching message and 1 non-matching message
> +        TextMessage msg = 
> session.createTextMessage("testResubscribeWithChangedSelectorAndRestart1");
> +        msg.setBooleanProperty("Match", true);
> +        producer.send(msg);
> +        msg = 
> session.createTextMessage("testResubscribeWithChangedSelectorAndRestart2");
> +        msg.setBooleanProperty("Match", false);
> +        producer.send(msg);
> +
> +        Message rMsg = subA.receive(1000);
> +        assertNotNull(rMsg);
> +        assertEquals("Content was wrong",
> +                     "testResubscribeWithChangedSelectorAndRestart1",
> +                     ((TextMessage) rMsg).getText());
> +
> +        rMsg = subA.receive(1000);
> +        assertNull(rMsg);
> +
> +        // Send another 1 matching message and 1 non-matching message
> +        msg = 
> session.createTextMessage("testResubscribeWithChangedSelectorAndRestart1");
> +        msg.setBooleanProperty("Match", true);
> +        producer.send(msg);
> +        msg = 
> session.createTextMessage("testResubscribeWithChangedSelectorAndRestart2");
> +        msg.setBooleanProperty("Match", false);
> +        producer.send(msg);
> +
> +        // Disconnect subscriber without receiving the message to
> +        //leave it on the underlying queue
> +        subA.close();
> +
> +        // Reconnect with new selector that matches B
> +        TopicSubscriber subB = session.createDurableSubscriber(topic,
> +                "testResubscribeWithChangedSelectorAndRestart","Match = 
> False", false);
> +
> +        //verify no messages are now present on the queue as changing 
> selector should have issued
> +        //an unsubscribe and thus deleted the previous durable backing queue 
> for the subscription.
> +        //check the dur sub's underlying queue now has msg count 1
> +        AMQQueue subQueue = new AMQQueue("amq.topic", "clientid" + ":" + 
> "testResubscribeWithChangedSelector");
> +        assertEquals("Msg count should be 0", 0, 
> ((AMQSession)session).getQueueDepth(subQueue));
> +
> +
> +        // Check that new messages are received properly
> +        msg = 
> session.createTextMessage("testResubscribeWithChangedSelectorAndRestart1");
> +        msg.setBooleanProperty("Match", true);
> +        producer.send(msg);
> +        msg = 
> session.createTextMessage("testResubscribeWithChangedSelectorAndRestart2");
> +        msg.setBooleanProperty("Match", false);
> +        producer.send(msg);
> +
> +        rMsg = subB.receive(1000);
> +        assertNotNull(rMsg);
> +        assertEquals("Content was wrong",
> +                     "testResubscribeWithChangedSelectorAndRestart2",
> +                     ((TextMessage) rMsg).getText());
> +
> +        rMsg = subB.receive(1000);
> +        assertNull(rMsg);
> +
> +        //now restart the server
> +        try
> +        {
> +            restartBroker();
> +        }
> +        catch (Exception e)
> +        {
> +            _logger.error("problems restarting broker: " + e);
> +            throw e;
> +        }
> +
> +        // Check that new messages are still received properly
> +        msg = 
> session.createTextMessage("testResubscribeWithChangedSelectorAndRestart1");
> +        msg.setBooleanProperty("Match", true);
> +        producer.send(msg);
> +        msg = 
> session.createTextMessage("testResubscribeWithChangedSelectorAndRestart2");
> +        msg.setBooleanProperty("Match", false);
> +        producer.send(msg);
> +
> +        rMsg = subB.receive(1000);
> +        assertNotNull(rMsg);
> +        assertEquals("Content was wrong",
> +                     "testResubscribeWithChangedSelectorAndRestart2",
> +                     ((TextMessage) rMsg).getText());
> +
> +        rMsg = subB.receive(1000);
> +        assertNull(rMsg);
> +
> +        session.unsubscribe("testResubscribeWithChangedSelectorAndRestart");
> +        subB.close();
> +        session.close();
> +        conn.close();
> +    }
> +
>  }
>
>
> Modified: 
> qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
> URL: 
> http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java?rev=929095&r1=929094&r2=929095&view=diff
> ==============================================================================
> --- 
> qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
>  (original)
> +++ 
> qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
>  Tue Mar 30 11:50:18 2010
> @@ -20,8 +20,13 @@
>  */
>  package org.apache.qpid.test.unit.topic;
>
> +import java.io.IOException;
> +import java.util.Set;
> +
> +import org.apache.qpid.management.common.JMXConnnectionFactory;
>  import org.apache.qpid.test.utils.QpidTestCase;
>  import org.apache.qpid.client.AMQConnection;
> +import org.apache.qpid.client.AMQQueue;
>  import org.apache.qpid.client.AMQSession;
>  import org.apache.qpid.client.AMQTopic;
>
> @@ -39,6 +44,9 @@ import javax.jms.Session;
>  import javax.jms.TextMessage;
>  import javax.jms.Topic;
>  import javax.jms.TopicSubscriber;
> +import javax.management.MBeanServerConnection;
> +import javax.management.ObjectName;
> +import javax.management.remote.JMXConnector;
>
>  /**
>  * @todo Code to check that a consumer gets only one particular method could 
> be factored into a re-usable method (as
> @@ -58,6 +66,36 @@ public class DurableSubscriptionTest ext
>     /** Timeout for receive() if we are not expecting a message */
>     private static final long NEGATIVE_RECEIVE_TIMEOUT = 1000;
>
> +    private JMXConnector _jmxc;
> +    private MBeanServerConnection _mbsc;
> +    private static final String USER = "admin";
> +    private static final String PASSWORD = "admin";
> +    private boolean _jmxConnected;
> +
> +    public void setUp() throws Exception
> +    {
> +        setConfigurationProperty("management.enabled", "true");
> +        _jmxConnected=false;
> +        super.setUp();
> +    }
> +
> +    public void tearDown() throws Exception
> +    {
> +        if(_jmxConnected)
> +        {
> +            try
> +            {
> +                _jmxc.close();
> +            }
> +            catch (IOException e)
> +            {
> +                e.printStackTrace();
> +            }
> +        }
> +
> +        super.tearDown();
> +    }
> +
>     public void testUnsubscribe() throws Exception
>     {
>         AMQConnection con = (AMQConnection) getConnection("guest", "guest");
> @@ -79,6 +117,12 @@ public class DurableSubscriptionTest ext
>
>         _logger.info("Producer sending message A");
>         producer.send(session1.createTextMessage("A"));
> +
> +        ((AMQSession)session1).sync();
> +
> +        //check the dur sub's underlying queue now has msg count 1
> +        AMQQueue subQueue = new AMQQueue("amq.topic", "clientid" + ":" + 
> "MySubscription");
> +        assertEquals("Msg count should be 1", 1, 
> ((AMQSession)session1).getQueueDepth(subQueue));
>
>         Message msg;
>         _logger.info("Receive message on consumer 1:expecting A");
> @@ -96,11 +140,46 @@ public class DurableSubscriptionTest ext
>         msg = consumer2.receive(NEGATIVE_RECEIVE_TIMEOUT);
>         _logger.info("Receive message on consumer 1 :expecting null");
>         assertEquals(null, msg);
> +
> +        ((AMQSession)session2).sync();
> +
> +        //check the dur sub's underlying queue now has msg count 0
> +        assertEquals("Msg count should be 0", 0, 
> ((AMQSession)session2).getQueueDepth(subQueue));
>
>         consumer2.close();
>         _logger.info("Unsubscribe session2/consumer2");
>         session2.unsubscribe("MySubscription");
> -
> +
> +        ((AMQSession)session2).sync();
> +
> +        if(isJavaBroker() && isExternalBroker())
> +        {
> +            //Verify that the queue was deleted by querying for its JMX MBean
> +            _jmxc = JMXConnnectionFactory.getJMXConnection(5000, "127.0.0.1",
> +                    getManagementPort(getPort()), USER, PASSWORD);
> +
> +            _jmxConnected = true;
> +            _mbsc = _jmxc.getMBeanServerConnection();
> +
> +            //must replace the occurrence of ':' in queue name with '-'
> +            String queueObjectNameText = "clientid" + "-" + "MySubscription";
> +
> +            ObjectName objName = new 
> ObjectName("org.apache.qpid:type=VirtualHost.Queue,name="
> +                                                + queueObjectNameText + 
> ",*");
> +
> +            Set<ObjectName> objectInstances = _mbsc.queryNames(objName, 
> null);
> +
> +            if(objectInstances.size() != 0)
> +            {
> +                fail("Queue MBean was found. Expected queue to have been 
> deleted");
> +            }
> +            else
> +            {
> +                _logger.info("Underlying dueue for the durable subscription 
> was confirmed deleted.");
> +            }
> +        }
> +
> +        //verify unsubscribing the durable subscriber did not affect the 
> non-durable one
>         _logger.info("Producer sending message B");
>         producer.send(session1.createTextMessage("B"));
>
> @@ -459,6 +538,9 @@ public class DurableSubscriptionTest ext
>         rMsg = subA.receive(NEGATIVE_RECEIVE_TIMEOUT);
>         assertNull(rMsg);
>
> +        // Send another 1 matching message and 1 non-matching message
> +        sendMatchingAndNonMatchingMessage(session, producer);
> +
>         // Disconnect subscriber
>         subA.close();
>
> @@ -466,9 +548,15 @@ public class DurableSubscriptionTest ext
>         TopicSubscriber subB = session.createDurableSubscriber(topic,
>                 "testResubscribeWithChangedSelector","Match = False", false);
>
> +        //verify no messages are now present as changing selector should 
> have issued
> +        //an unsubscribe and thus deleted the previous backing queue for the 
> subscription.
> +        rMsg = subB.receive(NEGATIVE_RECEIVE_TIMEOUT);
> +        assertNull("Should not have received message as the queue underlying 
> the " +
> +                       "subscription should have been cleared/deleted when 
> the selector was changed", rMsg);
>
> -        // Check messages are recieved properly
> +        // Check that new messages are received properly
>         sendMatchingAndNonMatchingMessage(session, producer);
> +
>         rMsg = subB.receive(NEGATIVE_RECEIVE_TIMEOUT);
>         assertNotNull(rMsg);
>         assertEquals("Content was wrong",
>
> Modified: qpid/trunk/qpid/java/test-profiles/Excludes
> URL: 
> http://svn.apache.org/viewvc/qpid/trunk/qpid/java/test-profiles/Excludes?rev=929095&r1=929094&r2=929095&view=diff
> ==============================================================================
> --- qpid/trunk/qpid/java/test-profiles/Excludes (original)
> +++ qpid/trunk/qpid/java/test-profiles/Excludes Tue Mar 30 11:50:18 2010
> @@ -29,3 +29,5 @@ org.apache.qpid.test.unit.ack.Acknowledg
>  org.apache.qpid.test.unit.ack.AcknowledgeAfterFailoverOnMessageTest#*
>  org.apache.qpid.test.unit.ack.AcknowledgeAfterFailoverTest#*
>
> +// QPID-2418 : The queue backing the dur sub is not currently deleted at 
> subscription change, so the test will fail.
> +org.apache.qpid.test.unit.ct.DurableSubscriberTest#testResubscribeWithChangedSelectorAndRestart
>
> Modified: qpid/trunk/qpid/java/test-profiles/JavaExcludes
> URL: 
> http://svn.apache.org/viewvc/qpid/trunk/qpid/java/test-profiles/JavaExcludes?rev=929095&r1=929094&r2=929095&view=diff
> ==============================================================================
> --- qpid/trunk/qpid/java/test-profiles/JavaExcludes (original)
> +++ qpid/trunk/qpid/java/test-profiles/JavaExcludes Tue Mar 30 11:50:18 2010
> @@ -16,6 +16,3 @@ org.apache.qpid.client.SessionCreateTest
>  // QPID-2097 exclude it from the InVM test runs until InVM JMX Interface is 
> reliable
>  org.apache.qpid.management.jmx.ManagementActorLoggingTest#*
>  org.apache.qpid.server.queue.ModelTest#*
> -
> -//QPID-2422: Derby currently doesnt persist queue arguments and 0-91 support 
> causes exclusivity mismatch after restart
> -org.apache.qpid.test.unit.ct.DurableSubscriberTest#*
>
>
>
> ---------------------------------------------------------------------
> Apache Qpid - AMQP Messaging Implementation
> Project:      http://qpid.apache.org
> Use/Interact: mailto:[email protected]
>
>



-- 
Regards,

Rajith Attapattu
Red Hat
http://rajith.2rlabs.com/

---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:[email protected]

Reply via email to