I recently checked in a fix for QPID-2347 to the c++ broker, such that active subscribers are now informed (by an exception) if their queue is deleted from under them.

This broke some of the java tests in the 'cpp' profile. However, on inspection I think that the test logic is questionable.

The first set of tests are those in TopicSessionTest that use the subscriptionNameReuseForDifferentTopic() method.

I believe that method is incorrect as written. It relies on having two subscriptions active at the same time for different topics with the same subscription name.

The javadoc for createDurableSubscriber() states "each client must specify a name that uniquely identifies (within client identifier) each durable subscription it creates" and "changing a durable subscriber is equivalent to unsubscribing (deleting) the old one and creating a new one"[1].

However the subscriptionNameReuseForDifferentTopic() method creates a new subscription - sub2 - with the same name as an existing subscription - sub - and the goes on (in some cases) to make a receive() call on the first subscription (after the second subscription has been created with the same name). Perhaps the intention was for the receive() call after the second subscription to be made on the second subscriber?

Another test I believe to be incorrect is testUnsubscribe() from DurableSubscriptionTest.

The javadoc for Session.unsubscribe() states: "It is erroneous for a client to delete a durable subscription while there is an active MessageConsumer or TopicSubscriber for the subscription"[2]. However this test unsubscribes with an active subscriber and subsequently tries to call receive() on that subscriber.

Also in DurableSubscriptionTest, testDurableWithInvalidDestination calls unsubscribe() when an active subscription still exists.

Attached is a patch that fixes these specific tests. It also closes any active subscriber from the current session on unsubscribe(), which is used when creating a new durable subscriber if the name already refers to another subscription. Without that change then I see an error in DurableSubscriberTest.testDurableWithInvalidDestination even with the included fix to that test. Not quite sure what that's about.

The attached patch will fix the failures with the 'cpp' profile and doesn't break the default profile. If there are no objections I will commit this.

[1] http://java.sun.com/j2ee/1.4/docs/api/javax/jms/TopicSession.html#createDurableSubscriber%28javax.jms.Topic,%20java.lang.String%29

[2] http://java.sun.com/j2ee/1.4/docs/api/javax/jms/Session.html#unsubscribe%28java.lang.String%29
Index: systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java
===================================================================
--- systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java	(revision 901596)
+++ systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java	(working copy)
@@ -126,12 +126,13 @@
             session1 = con.createTopicSession(true, AMQSession.NO_ACKNOWLEDGE);
             publisher = session1.createPublisher(null);
         }
+        sub.close();
         TopicSubscriber sub2 = session1.createDurableSubscriber(topic2, "subscription0");
         publisher.publish(topic, session1.createTextMessage("hello"));
         session1.commit();
         if (!shutdown)
         {
-            m = (TextMessage) sub.receive(2000);
+            m = (TextMessage) sub2.receive(2000);
             assertNull(m);
             session1.commit();
         }
Index: systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
===================================================================
--- systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java	(revision 901596)
+++ systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java	(working copy)
@@ -97,6 +97,7 @@
         _logger.info("Receive message on consumer 1 :expecting null");
         assertEquals(null, msg);
 
+        consumer2.close();
         _logger.info("Unsubscribe session2/consumer2");
         session2.unsubscribe("MySubscription");
 
@@ -111,10 +112,6 @@
         msg = consumer1.receive(NEGATIVE_RECEIVE_TIMEOUT);
         assertEquals(null, msg);
 
-        _logger.info("Receive message on consumer 2 :expecting null");
-        msg = consumer2.receive(NEGATIVE_RECEIVE_TIMEOUT);
-        assertEquals(null, msg);
-
         _logger.info("Close connection");
         con.close();
     }
@@ -301,7 +298,6 @@
     	{
     		assertTrue("Wrong type of exception thrown", e instanceof InvalidSelectorException);
     	}
-    	
     	TopicSubscriber liveSubscriber = session.createDurableSubscriber(topic, "testDurableWithInvalidSelectorSub");
     	assertNotNull("Subscriber should have been created", liveSubscriber);
 
@@ -311,6 +307,7 @@
     	assertNotNull ("Message should have been received", msg);
     	assertEquals ("testDurableWithInvalidSelector2", ((TextMessage) msg).getText());
     	assertNull("Should not receive subsequent message", liveSubscriber.receive(200));
+        liveSubscriber.close();
         session.unsubscribe("testDurableWithInvalidSelectorSub");
     }
     
Index: client/src/main/java/org/apache/qpid/client/AMQSession.java
===================================================================
--- client/src/main/java/org/apache/qpid/client/AMQSession.java	(revision 901596)
+++ client/src/main/java/org/apache/qpid/client/AMQSession.java	(working copy)
@@ -1664,6 +1664,7 @@
         TopicSubscriberAdaptor subscriber = _subscriptions.get(name);
         if (subscriber != null)
         {
+            subscriber.close();
             // send a queue.delete for the subscription
             deleteQueue(AMQTopic.getDurableTopicQueueName(name, _connection));
             _subscriptions.remove(name);

---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:dev-subscr...@qpid.apache.org

Reply via email to