Repository: qpid-jms Updated Branches: refs/heads/master d1f0b32ba -> f29381d98
Ensure that pending requests also fail when connection is lost otherwise they can hang. Added some tests that showed this intermittently happening. Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/f29381d9 Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/f29381d9 Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/f29381d9 Branch: refs/heads/master Commit: f29381d9869865ff0a5c4e0f1fd4ece61196f604 Parents: d1f0b32 Author: Timothy Bish <tabish...@gmail.com> Authored: Tue Mar 3 11:00:54 2015 -0500 Committer: Timothy Bish <tabish...@gmail.com> Committed: Tue Mar 3 11:01:36 2015 -0500 ---------------------------------------------------------------------- .../qpid/jms/provider/amqp/AmqpProvider.java | 35 ++--- .../org/apache/qpid/jms/JmsConnectionTest.java | 27 ++++ .../jms/consumer/JmsMessageConsumerTest.java | 136 +++++++++++++++++++ .../apache/qpid/jms/session/JmsSessionTest.java | 33 +++++ .../qpid/jms/support/AmqpTestSupport.java | 14 ++ 5 files changed, 229 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/f29381d9/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java index 0face7b..71e00d3 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java @@ -48,6 +48,7 @@ import org.apache.qpid.jms.meta.JmsSessionId; import org.apache.qpid.jms.meta.JmsSessionInfo; import org.apache.qpid.jms.meta.JmsTransactionInfo; import org.apache.qpid.jms.provider.AsyncResult; +import org.apache.qpid.jms.provider.NoOpAsyncResult; import org.apache.qpid.jms.provider.Provider; import org.apache.qpid.jms.provider.ProviderClosedException; import org.apache.qpid.jms.provider.ProviderConstants.ACK_TYPE; @@ -92,6 +93,7 @@ public class AmqpProvider implements Provider, TransportListener { // brokers that don't currently handle the unsigned range well. private static final int DEFAULT_CHANNEL_MAX = 32767; private static final AtomicInteger PROVIDER_SEQUENCE = new AtomicInteger(); + private static final NoOpAsyncResult NOOP_REQUEST = new NoOpAsyncResult(); private ProviderListener listener; private AmqpConnection connection; @@ -177,7 +179,7 @@ public class AmqpProvider implements Provider, TransportListener { if (connection != null) { connection.close(request); - pumpToProtonTransport(); + pumpToProtonTransport(request); } else { request.onSuccess(); } @@ -294,7 +296,7 @@ public class AmqpProvider implements Provider, TransportListener { } }); - pumpToProtonTransport(); + pumpToProtonTransport(request); } catch (Exception error) { request.onFailure(error); } @@ -321,7 +323,7 @@ public class AmqpProvider implements Provider, TransportListener { } }); - pumpToProtonTransport(); + pumpToProtonTransport(request); } catch (Exception error) { request.onFailure(error); } @@ -348,7 +350,7 @@ public class AmqpProvider implements Provider, TransportListener { } }); - pumpToProtonTransport(); + pumpToProtonTransport(request); } catch (Exception error) { request.onFailure(error); } @@ -404,7 +406,7 @@ public class AmqpProvider implements Provider, TransportListener { } }); - pumpToProtonTransport(); + pumpToProtonTransport(request); } catch (Exception error) { request.onFailure(error); } @@ -433,7 +435,7 @@ public class AmqpProvider implements Provider, TransportListener { } boolean couldSend = producer.send(envelope, request); - pumpToProtonTransport(); + pumpToProtonTransport(request); if (couldSend && envelope.isSendAsync()) { request.onSuccess(); } @@ -455,7 +457,7 @@ public class AmqpProvider implements Provider, TransportListener { checkClosed(); AmqpSession amqpSession = connection.getSession(sessionId); amqpSession.acknowledge(); - pumpToProtonTransport(); + pumpToProtonTransport(request); request.onSuccess(); } catch (Exception error) { request.onFailure(error); @@ -488,9 +490,9 @@ public class AmqpProvider implements Provider, TransportListener { if (consumer.getSession().isAsyncAck()) { request.onSuccess(); - pumpToProtonTransport(); + pumpToProtonTransport(request); } else { - pumpToProtonTransport(); + pumpToProtonTransport(request); request.onSuccess(); } } catch (Exception error) { @@ -511,7 +513,7 @@ public class AmqpProvider implements Provider, TransportListener { checkClosed(); AmqpSession session = connection.getSession(sessionId); session.commit(request); - pumpToProtonTransport(); + pumpToProtonTransport(request); } catch (Exception error) { request.onFailure(error); } @@ -530,7 +532,7 @@ public class AmqpProvider implements Provider, TransportListener { checkClosed(); AmqpSession session = connection.getSession(sessionId); session.rollback(request); - pumpToProtonTransport(); + pumpToProtonTransport(request); } catch (Exception error) { request.onFailure(error); } @@ -549,7 +551,7 @@ public class AmqpProvider implements Provider, TransportListener { checkClosed(); AmqpSession session = connection.getSession(sessionId); session.recover(); - pumpToProtonTransport(); + pumpToProtonTransport(request); request.onSuccess(); } catch (Exception error) { request.onFailure(error); @@ -568,7 +570,7 @@ public class AmqpProvider implements Provider, TransportListener { try { checkClosed(); connection.unsubscribe(subscription, request); - pumpToProtonTransport(); + pumpToProtonTransport(request); } catch (Exception error) { request.onFailure(error); } @@ -595,7 +597,7 @@ public class AmqpProvider implements Provider, TransportListener { } consumer.pull(timeout); - pumpToProtonTransport(); + pumpToProtonTransport(request); request.onSuccess(); } catch (Exception error) { request.onFailure(error); @@ -651,7 +653,7 @@ public class AmqpProvider implements Provider, TransportListener { // Process the state changes from the latest data and then answer back // any pending updates to the Broker. processUpdates(); - pumpToProtonTransport(); + pumpToProtonTransport(NOOP_REQUEST); } }); } @@ -764,7 +766,7 @@ public class AmqpProvider implements Provider, TransportListener { } } - private void pumpToProtonTransport() { + private void pumpToProtonTransport(AsyncResult request) { try { boolean done = false; while (!done) { @@ -785,6 +787,7 @@ public class AmqpProvider implements Provider, TransportListener { } } catch (IOException e) { fireProviderException(e); + request.onFailure(e); } } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/f29381d9/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/JmsConnectionTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/JmsConnectionTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/JmsConnectionTest.java index 2adcfcb..32a7c9c 100644 --- a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/JmsConnectionTest.java +++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/JmsConnectionTest.java @@ -23,9 +23,13 @@ import static org.junit.Assert.fail; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import javax.jms.DeliveryMode; import javax.jms.ExceptionListener; import javax.jms.JMSException; import javax.jms.JMSSecurityException; +import javax.jms.Message; +import javax.jms.MessageProducer; +import javax.jms.Queue; import javax.jms.Session; import org.apache.qpid.jms.support.AmqpTestSupport; @@ -122,6 +126,29 @@ public class JmsConnectionTest extends AmqpTestSupport { connection.start(); } + @Test(timeout=30000) + public void testBrokerStopWontHangConnectionClose() throws Exception { + connection = createAmqpConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue(getDestinationName()); + connection.start(); + + MessageProducer producer = session.createProducer(queue); + producer.setDeliveryMode(DeliveryMode.PERSISTENT); + + Message m = session.createTextMessage("Sample text"); + producer.send(m); + + stopPrimaryBroker(); + + try { + connection.close(); + } catch (Exception ex) { + LOG.error("Should not thrown on disconnected connection close(): {}", ex); + fail("Should not have thrown an exception."); + } + } + @Test(timeout=60000) public void testConnectionExceptionBrokerStop() throws Exception { final CountDownLatch latch = new CountDownLatch(1); http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/f29381d9/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsMessageConsumerTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsMessageConsumerTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsMessageConsumerTest.java index b514b08..d2a3b4d 100644 --- a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsMessageConsumerTest.java +++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsMessageConsumerTest.java @@ -28,6 +28,7 @@ import java.util.concurrent.atomic.AtomicInteger; import javax.jms.Connection; import javax.jms.DeliveryMode; +import javax.jms.ExceptionListener; import javax.jms.InvalidSelectorException; import javax.jms.JMSException; import javax.jms.JMSSecurityException; @@ -469,4 +470,139 @@ public class JmsMessageConsumerTest extends AmqpTestSupport { Queue queue = session.createQueue(name.getMethodName()); session.createConsumer(queue, "3+5"); } + + @Test(timeout=30000) + public void testConsumerReceiveNoWaitThrowsWhenBrokerStops() throws Exception { + final CountDownLatch consumerReady = new CountDownLatch(1); + final CountDownLatch connectionFailed = new CountDownLatch(1); + + connection = createAmqpConnection(); + connection.setExceptionListener(new ExceptionListener() { + + @Override + public void onException(JMSException exception) { + LOG.info("Connection to Broker stopped"); + connectionFailed.countDown(); + } + }); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue(getDestinationName()); + connection.start(); + + final MessageConsumer consumer=session.createConsumer(queue); + Testable test = new Testable() { + + @Override + public synchronized void run() { + try { + consumerReady.countDown(); + assertTrue("Broker connection needs to fail.", connectionFailed.await(20, TimeUnit.SECONDS)); + + // Might not propagate state right away, so check a few times. + for (int i = 0; i < 250; i++) { + consumer.receiveNoWait(); + TimeUnit.MILLISECONDS.sleep(3); + } + + failure = "Should have thrown an IllegalStateException"; + } catch (Exception ex) { + LOG.info("Caught exception on receiveNoWait: {}", ex); + } + } + }; + + new Thread(test).start(); + assertTrue(consumerReady.await(20, TimeUnit.SECONDS)); + + stopPrimaryBroker(); + + assertTrue("Consumer did not fail as expected", test.passed()); + } + + @Test(timeout=30000) + public void testConsumerReceiveTimedReturnsIfConnectionLost() throws Exception { + final CountDownLatch consumerReady = new CountDownLatch(1); + + connection = createAmqpConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue(getDestinationName()); + connection.start(); + + final MessageConsumer consumer=session.createConsumer(queue); + + Testable test = new Testable() { + @Override + public synchronized void run() { + try { + consumer.receive(1); + new Thread(new Runnable() { + + @Override + public void run() { + try { + TimeUnit.MILLISECONDS.sleep(2); + } catch (InterruptedException e) { + } + consumerReady.countDown(); + } + }).start(); + consumer.receive(TimeUnit.SECONDS.toMillis(30)); + } catch (Exception ex) { + LOG.info("Caught exception on receive(): {}", ex); + failure = "Should not have thrown: " + ex.getMessage(); + } + } + }; + + new Thread(test).start(); + assertTrue(consumerReady.await(20, TimeUnit.SECONDS)); + + stopPrimaryBroker(); + + assertTrue(test.passed()); + } + + @Test(timeout=30000) + public void testConsumerReceiveReturnsIfConnectionLost() throws Exception { + final CountDownLatch consumerReady = new CountDownLatch(1); + + connection = createAmqpConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue(getDestinationName()); + connection.start(); + + final MessageConsumer consumer=session.createConsumer(queue); + + Testable test = new Testable() { + @Override + public synchronized void run() { + try { + consumer.receive(1); + new Thread(new Runnable() { + + @Override + public void run() { + try { + TimeUnit.MILLISECONDS.sleep(2); + } catch (InterruptedException e) { + } + consumerReady.countDown(); + } + }).start(); + consumer.receive(); + } catch (Exception ex) { + LOG.info("Caught exception on receive(): {}", ex); + failure = "Should not have thrown: " + ex.getMessage(); + } + } + }; + + new Thread(test).start(); + assertTrue(consumerReady.await(20, TimeUnit.SECONDS)); + + stopPrimaryBroker(); + + assertTrue(test.passed()); + } } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/f29381d9/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/session/JmsSessionTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/session/JmsSessionTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/session/JmsSessionTest.java index 63d0636..e164be8 100644 --- a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/session/JmsSessionTest.java +++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/session/JmsSessionTest.java @@ -17,6 +17,7 @@ package org.apache.qpid.jms.session; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.fail; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; @@ -79,4 +80,36 @@ public class JmsSessionTest extends AmqpTestSupport { session.close(); session.close(); } + + @Test(timeout=30000) + public void testConsumerCreateThrowsWhenBrokerStops() throws Exception { + connection = createAmqpConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue(getDestinationName()); + connection.start(); + + stopPrimaryBroker(); + try { + session.createConsumer(queue); + fail("Should have thrown an IllegalStateException"); + } catch (Exception ex) { + LOG.info("Caught exception on create consumer: {}", ex); + } + } + + @Test(timeout=30000) + public void testProducerCreateThrowsWhenBrokerStops() throws Exception { + connection = createAmqpConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue(getDestinationName()); + connection.start(); + + stopPrimaryBroker(); + try { + session.createProducer(queue); + fail("Should have thrown an IllegalStateException"); + } catch (Exception ex) { + LOG.info("Caught exception on create producer: {}", ex); + } + } } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/f29381d9/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/support/AmqpTestSupport.java ---------------------------------------------------------------------- diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/support/AmqpTestSupport.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/support/AmqpTestSupport.java index 97b3fb5..5161b0d 100644 --- a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/support/AmqpTestSupport.java +++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/support/AmqpTestSupport.java @@ -141,4 +141,18 @@ public class AmqpTestSupport extends QpidJmsTestSupport { } return factory; } + + public abstract class Testable implements Runnable { + + protected String failure; + + @Override + public String toString() { + return failure; + } + + public synchronized boolean passed() { + return failure == null; + } + } } \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org