I recently checked in a fix for QPID-2347 to the c++ broker, such that
active subscribers are now informed (by an exception) if their queue is
deleted from under them.
This broke some of the java tests in the 'cpp' profile. However, on
inspection I think that the test logic is questionable.
The first set of tests are those in TopicSessionTest that use the
subscriptionNameReuseForDifferentTopic() method.
I believe that method is incorrect as written. It relies on having two
subscriptions active at the same time for different topics with the same
subscription name.
The javadoc for createDurableSubscriber() states "each client must
specify a name that uniquely identifies (within client identifier) each
durable subscription it creates" and "changing a durable subscriber is
equivalent to unsubscribing (deleting) the old one and creating a new
one"[1].
However the subscriptionNameReuseForDifferentTopic() method creates a
new subscription - sub2 - with the same name as an existing subscription
- sub - and the goes on (in some cases) to make a receive() call on the
first subscription (after the second subscription has been created with
the same name). Perhaps the intention was for the receive() call after
the second subscription to be made on the second subscriber?
Another test I believe to be incorrect is testUnsubscribe() from
DurableSubscriptionTest.
The javadoc for Session.unsubscribe() states: "It is erroneous for a
client to delete a durable subscription while there is an active
MessageConsumer or TopicSubscriber for the subscription"[2]. However
this test unsubscribes with an active subscriber and subsequently tries
to call receive() on that subscriber.
Also in DurableSubscriptionTest, testDurableWithInvalidDestination calls
unsubscribe() when an active subscription still exists.
Attached is a patch that fixes these specific tests. It also closes any
active subscriber from the current session on unsubscribe(), which is
used when creating a new durable subscriber if the name already refers
to another subscription. Without that change then I see an error in
DurableSubscriberTest.testDurableWithInvalidDestination even with the
included fix to that test. Not quite sure what that's about.
The attached patch will fix the failures with the 'cpp' profile and
doesn't break the default profile. If there are no objections I will
commit this.
[1]
http://java.sun.com/j2ee/1.4/docs/api/javax/jms/TopicSession.html#createDurableSubscriber%28javax.jms.Topic,%20java.lang.String%29
[2]
http://java.sun.com/j2ee/1.4/docs/api/javax/jms/Session.html#unsubscribe%28java.lang.String%29
Index: systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java
===================================================================
--- systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java (revision 901596)
+++ systests/src/main/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java (working copy)
@@ -126,12 +126,13 @@
session1 = con.createTopicSession(true, AMQSession.NO_ACKNOWLEDGE);
publisher = session1.createPublisher(null);
}
+ sub.close();
TopicSubscriber sub2 = session1.createDurableSubscriber(topic2, "subscription0");
publisher.publish(topic, session1.createTextMessage("hello"));
session1.commit();
if (!shutdown)
{
- m = (TextMessage) sub.receive(2000);
+ m = (TextMessage) sub2.receive(2000);
assertNull(m);
session1.commit();
}
Index: systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
===================================================================
--- systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java (revision 901596)
+++ systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java (working copy)
@@ -97,6 +97,7 @@
_logger.info("Receive message on consumer 1 :expecting null");
assertEquals(null, msg);
+ consumer2.close();
_logger.info("Unsubscribe session2/consumer2");
session2.unsubscribe("MySubscription");
@@ -111,10 +112,6 @@
msg = consumer1.receive(NEGATIVE_RECEIVE_TIMEOUT);
assertEquals(null, msg);
- _logger.info("Receive message on consumer 2 :expecting null");
- msg = consumer2.receive(NEGATIVE_RECEIVE_TIMEOUT);
- assertEquals(null, msg);
-
_logger.info("Close connection");
con.close();
}
@@ -301,7 +298,6 @@
{
assertTrue("Wrong type of exception thrown", e instanceof InvalidSelectorException);
}
-
TopicSubscriber liveSubscriber = session.createDurableSubscriber(topic, "testDurableWithInvalidSelectorSub");
assertNotNull("Subscriber should have been created", liveSubscriber);
@@ -311,6 +307,7 @@
assertNotNull ("Message should have been received", msg);
assertEquals ("testDurableWithInvalidSelector2", ((TextMessage) msg).getText());
assertNull("Should not receive subsequent message", liveSubscriber.receive(200));
+ liveSubscriber.close();
session.unsubscribe("testDurableWithInvalidSelectorSub");
}
Index: client/src/main/java/org/apache/qpid/client/AMQSession.java
===================================================================
--- client/src/main/java/org/apache/qpid/client/AMQSession.java (revision 901596)
+++ client/src/main/java/org/apache/qpid/client/AMQSession.java (working copy)
@@ -1664,6 +1664,7 @@
TopicSubscriberAdaptor subscriber = _subscriptions.get(name);
if (subscriber != null)
{
+ subscriber.close();
// send a queue.delete for the subscription
deleteQueue(AMQTopic.getDurableTopicQueueName(name, _connection));
_subscriptions.remove(name);
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:dev-subscr...@qpid.apache.org