Repository: activemq Updated Branches: refs/heads/trunk 226e012d8 -> 004568234
https://issues.apache.org/jira/browse/AMQ-5401 Ensure that the sender is closed on error and add some tests for unsubscribe failures. Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/00456823 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/00456823 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/00456823 Branch: refs/heads/trunk Commit: 004568234b3964441fa122b04bdb509e754585f4 Parents: 226e012 Author: Timothy Bish <[email protected]> Authored: Fri Oct 17 14:58:56 2014 -0400 Committer: Timothy Bish <[email protected]> Committed: Fri Oct 17 14:58:56 2014 -0400 ---------------------------------------------------------------------- .../transport/amqp/AmqpProtocolConverter.java | 4 +- .../activemq/transport/amqp/JMSClientTest.java | 58 ++++++++++++++++++++ 2 files changed, 61 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/00456823/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java index f4df997..fa49665 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java @@ -1277,8 +1277,10 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { } else { sender.setCondition(new ErrorCondition(AmqpError.INTERNAL_ERROR, exception.getMessage())); } + sender.close(); + } else { + sender.open(); } - sender.open(); pumpProtonToSocket(); } }); http://git-wip-us.apache.org/repos/asf/activemq/blob/00456823/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java index 5a29ee2..64a4f3c 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java @@ -49,6 +49,7 @@ import org.apache.activemq.transport.amqp.joram.ActiveMQAdmin; import org.apache.activemq.util.Wait; import org.junit.After; import org.junit.Before; +import org.junit.Ignore; import org.junit.Test; import org.objectweb.jtests.jms.framework.TestConfig; import org.slf4j.Logger; @@ -894,4 +895,61 @@ public class JMSClientTest extends JMSClientTestSupport { } })); } + + @Test(timeout=30000) + public void testDurableConsumerUnsubscribeWhileNoSubscription() throws Exception { + ActiveMQAdmin.enableJMSFrameTracing(); + + final BrokerViewMBean broker = getProxyToBroker(); + + connection = createConnection(); + connection.start(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + assertTrue(Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return broker.getInactiveDurableTopicSubscribers().length == 0 && + broker.getDurableTopicSubscribers().length == 0; + } + })); + + try { + session.unsubscribe("DurbaleTopic"); + fail("Should have thrown as subscription is in use."); + } catch (JMSException ex) { + } + } + + @Ignore("Requires version 0.30 or higher to work.") // TODO + @Test(timeout=30000) + public void testDurableConsumerUnsubscribeWhileActive() throws Exception { + ActiveMQAdmin.enableJMSFrameTracing(); + + final BrokerViewMBean broker = getProxyToBroker(); + + connection = createConnection(); + connection.start(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Topic topic = session.createTopic(getDestinationName()); + session.createDurableSubscriber(topic, "DurbaleTopic"); + + assertTrue(Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return broker.getInactiveDurableTopicSubscribers().length == 0 && + broker.getDurableTopicSubscribers().length == 1; + } + })); + + try { + session.unsubscribe("DurbaleTopic"); + fail("Should have thrown as subscription is in use."); + } catch (JMSException ex) { + } + } }
