http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/3a03663b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionCoordinator.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionCoordinator.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionCoordinator.java index 2fa2644..d18e95b 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionCoordinator.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionCoordinator.java @@ -67,7 +67,7 @@ public class AmqpTransactionCoordinator extends AmqpAbstractResource<JmsSessionI } @Override - public void processDeliveryUpdates(AmqpProvider provider) throws IOException { + public void processDeliveryUpdates(AmqpProvider provider, Delivery delivery) throws IOException { try { if (pendingDelivery != null && pendingDelivery.remotelySettled()) { DeliveryState state = pendingDelivery.getRemoteState(); @@ -105,7 +105,7 @@ public class AmqpTransactionCoordinator extends AmqpAbstractResource<JmsSessionI } } - super.processDeliveryUpdates(provider); + super.processDeliveryUpdates(provider, delivery); } catch (Exception e) { throw IOExceptionSupport.create(e); }
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/3a03663b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpResourceBuilder.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpResourceBuilder.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpResourceBuilder.java index 229e61f..5912f7f 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpResourceBuilder.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpResourceBuilder.java @@ -24,10 +24,12 @@ import org.apache.qpid.jms.meta.JmsConnectionInfo; import org.apache.qpid.jms.meta.JmsResource; import org.apache.qpid.jms.provider.AsyncResult; import org.apache.qpid.jms.provider.amqp.AmqpEventSink; +import org.apache.qpid.jms.provider.amqp.AmqpExceptionBuilder; import org.apache.qpid.jms.provider.amqp.AmqpProvider; import org.apache.qpid.jms.provider.amqp.AmqpResource; import org.apache.qpid.jms.provider.amqp.AmqpResourceParent; import org.apache.qpid.jms.provider.amqp.AmqpSupport; +import org.apache.qpid.proton.engine.Delivery; import org.apache.qpid.proton.engine.Endpoint; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,7 +42,7 @@ import org.slf4j.LoggerFactory; * @param <INFO> The Type of JmsResource used to describe the target resource. * @param <ENDPOINT> The AMQP Endpoint that the target resource encapsulates. */ -public abstract class AmqpResourceBuilder<TARGET extends AmqpResource, PARENT extends AmqpResourceParent, INFO extends JmsResource, ENDPOINT extends Endpoint> implements AmqpEventSink { +public abstract class AmqpResourceBuilder<TARGET extends AmqpResource, PARENT extends AmqpResourceParent, INFO extends JmsResource, ENDPOINT extends Endpoint> implements AmqpEventSink, AmqpExceptionBuilder { private static final Logger LOG = LoggerFactory.getLogger(AmqpResourceBuilder.class); @@ -97,7 +99,7 @@ public abstract class AmqpResourceBuilder<TARGET extends AmqpResource, PARENT ex return request.isComplete(); } - }, getRequestTimeout(), new JmsOperationTimedOutException("Request to open resource " + getResource() + " timed out")); + }, getRequestTimeout(), this); } } @@ -119,7 +121,7 @@ public abstract class AmqpResourceBuilder<TARGET extends AmqpResource, PARENT ex } @Override - public void processDeliveryUpdates(AmqpProvider provider) throws IOException { + public void processDeliveryUpdates(AmqpProvider provider, Delivery delivery) throws IOException { // No implementation needed here for this event. } @@ -185,6 +187,11 @@ public abstract class AmqpResourceBuilder<TARGET extends AmqpResource, PARENT ex getRequest().onFailure(openError); } + @Override + public Exception createException() { + return new JmsOperationTimedOutException("Request to open resource " + getResource() + " timed out"); + } + //----- Implementation methods used to customize the build process -------// /** http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/3a03663b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java index e6e36df..2490b19 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java @@ -801,6 +801,24 @@ public class FailoverProvider extends DefaultProviderListener implements Provide } @Override + public void onCompletedMessageSend(final JmsOutboundMessageDispatch envelope) { + if (closingConnection.get() || closed.get() || failed.get()) { + return; + } + + listener.onCompletedMessageSend(envelope); + } + + @Override + public void onFailedMessageSend(final JmsOutboundMessageDispatch envelope, Throwable cause) { + if (closingConnection.get() || closed.get() || failed.get()) { + return; + } + + listener.onFailedMessageSend(envelope, cause); + } + + @Override public void onConnectionFailure(final IOException ex) { if (closingConnection.get() || closed.get() || failed.get()) { return; http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/3a03663b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java index c8914ae..a8fcf2d 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java @@ -29,6 +29,7 @@ import java.io.IOException; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import javax.jms.Connection; import javax.jms.IllegalStateException; @@ -932,4 +933,112 @@ public class ConsumerIntegrationTest extends QpidJmsTestCase { testPeer.waitForAllHandlersToComplete(3000); } } + + @Test(timeout=20000) + public void testMessageListenerCallsConnectionCloseThrowsIllegalStateException() throws Exception { + final CountDownLatch latch = new CountDownLatch(1); + final AtomicReference<Exception> asyncError = new AtomicReference<Exception>(null); + + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + final Connection connection = testFixture.establishConnecton(testPeer); + connection.start(); + + testPeer.expectBegin(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue destination = session.createQueue(getTestName()); + connection.start(); + + testPeer.expectReceiverAttach(); + testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, new AmqpValueDescribedType("content"), 1); + + MessageConsumer consumer = session.createConsumer(destination); + + testPeer.expectDisposition(true, new AcceptedMatcher()); + + consumer.setMessageListener(new MessageListener() { + @Override + public void onMessage(Message m) { + try { + LOG.debug("Async consumer got Message: {}", m); + connection.close(); + } catch (Exception ex) { + asyncError.set(ex); + } + + latch.countDown(); + } + }); + + boolean await = latch.await(3000, TimeUnit.MILLISECONDS); + assertTrue("Messages not received within given timeout. Count remaining: " + latch.getCount(), await); + + assertNotNull(asyncError.get()); + assertTrue(asyncError.get() instanceof IllegalStateException); + + testPeer.waitForAllHandlersToComplete(2000); + + testPeer.expectDetach(true, true, true); + consumer.close(); + + testPeer.expectClose(); + connection.close(); + + testPeer.waitForAllHandlersToComplete(2000); + } + } + + @Test(timeout=20000) + public void testMessageListenerCallsSessionCloseThrowsIllegalStateException() throws Exception { + final CountDownLatch latch = new CountDownLatch(1); + final AtomicReference<Exception> asyncError = new AtomicReference<Exception>(null); + + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + Connection connection = testFixture.establishConnecton(testPeer); + connection.start(); + + testPeer.expectBegin(); + + final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue destination = session.createQueue(getTestName()); + connection.start(); + + testPeer.expectReceiverAttach(); + testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, new AmqpValueDescribedType("content"), 1); + + MessageConsumer consumer = session.createConsumer(destination); + + testPeer.expectDisposition(true, new AcceptedMatcher()); + + consumer.setMessageListener(new MessageListener() { + @Override + public void onMessage(Message m) { + try { + LOG.debug("Async consumer got Message: {}", m); + session.close(); + } catch (Exception ex) { + asyncError.set(ex); + } + + latch.countDown(); + } + }); + + boolean await = latch.await(3000, TimeUnit.MILLISECONDS); + assertTrue("Messages not received within given timeout. Count remaining: " + latch.getCount(), await); + + assertNotNull(asyncError.get()); + assertTrue(asyncError.get() instanceof IllegalStateException); + + testPeer.waitForAllHandlersToComplete(2000); + + testPeer.expectDetach(true, true, true); + consumer.close(); + + testPeer.expectClose(); + connection.close(); + + testPeer.waitForAllHandlersToComplete(2000); + } + } } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/3a03663b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/PresettledProducerIntegrationTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/PresettledProducerIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/PresettledProducerIntegrationTest.java index 5485857..2ed9f41 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/PresettledProducerIntegrationTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/PresettledProducerIntegrationTest.java @@ -20,8 +20,15 @@ import static org.apache.qpid.jms.provider.amqp.AmqpSupport.ANONYMOUS_RELAY; import static org.hamcrest.Matchers.arrayContaining; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.nullValue; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + import javax.jms.Connection; import javax.jms.Destination; import javax.jms.Message; @@ -30,8 +37,11 @@ import javax.jms.Queue; import javax.jms.Session; import javax.jms.TemporaryQueue; import javax.jms.TemporaryTopic; +import javax.jms.TextMessage; import javax.jms.Topic; +import org.apache.qpid.jms.JmsCompletionListener; +import org.apache.qpid.jms.JmsMessageProducer; import org.apache.qpid.jms.test.QpidJmsTestCase; import org.apache.qpid.jms.test.testpeer.ListDescribedType; import org.apache.qpid.jms.test.testpeer.TestAmqpPeer; @@ -47,12 +57,16 @@ import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.amqp.transaction.TxnCapability; import org.hamcrest.Matcher; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Test MessageProducers created using various configuration of the presettle options */ public class PresettledProducerIntegrationTest extends QpidJmsTestCase { + private static final Logger LOG = LoggerFactory.getLogger(PresettledProducerIntegrationTest.class); + private final IntegrationTestFixture testFixture = new IntegrationTestFixture(); private final Symbol[] serverCapabilities = new Symbol[] { ANONYMOUS_RELAY }; @@ -419,4 +433,221 @@ public class PresettledProducerIntegrationTest extends QpidJmsTestCase { testPeer.waitForAllHandlersToComplete(1000); } } + + //----- Test the jms.presettleAll with asynchronous completion -----------// + + @Test(timeout = 20000) + public void testAsyncCompletionPresettleAllSendToTopic() throws Exception { + String presettleConfig = "?jms.presettlePolicy.presettleAll=true"; + doTestAsyncCompletionProducerWithPresettleOptions(presettleConfig, false, false, true, true, Topic.class); + } + + @Test(timeout = 20000) + public void testAsyncCompletionPresettleAllSendToQueue() throws Exception { + String presettleConfig = "?jms.presettlePolicy.presettleAll=true"; + doTestAsyncCompletionProducerWithPresettleOptions(presettleConfig, false, false, true, true, Queue.class); + } + + @Test(timeout = 20000) + public void testsyncCompletionPresettleAllAnonymousSendToTopic() throws Exception { + String presettleConfig = "?jms.presettlePolicy.presettleAll=true"; + doTestAsyncCompletionProducerWithPresettleOptions(presettleConfig, false, true, true, true, Topic.class); + } + + @Test(timeout = 20000) + public void testsyncCompletionPresettleAllAnonymousSendToQueue() throws Exception { + String presettleConfig = "?jms.presettlePolicy.presettleAll=true"; + doTestAsyncCompletionProducerWithPresettleOptions(presettleConfig, false, true, true, true, Queue.class); + } + + //----- Test the jms.presettleProducers with asynchronous completion -----// + + @Test(timeout = 20000) + public void testAsyncCompletionPresettleProducersTopic() throws Exception { + String presettleConfig = "?jms.presettlePolicy.presettleProducers=true"; + doTestAsyncCompletionProducerWithPresettleOptions(presettleConfig, false, false, true, true, Topic.class); + } + + @Test(timeout = 20000) + public void testAsyncCompletionPresettleProducersQueue() throws Exception { + String presettleConfig = "?jms.presettlePolicy.presettleProducers=true"; + doTestAsyncCompletionProducerWithPresettleOptions(presettleConfig, false, false, true, true, Queue.class); + } + + @Test(timeout = 20000) + public void testAsyncCompletionPresettleProducersAnonymousTopic() throws Exception { + String presettleConfig = "?jms.presettlePolicy.presettleProducers=true"; + doTestAsyncCompletionProducerWithPresettleOptions(presettleConfig, false, true, true, true, Topic.class); + } + + @Test(timeout = 20000) + public void testAsyncCompletionPresettleProducersAnonymousQueue() throws Exception { + String presettleConfig = "?jms.presettlePolicy.presettleProducers=true"; + doTestAsyncCompletionProducerWithPresettleOptions(presettleConfig, false, true, true, true, Queue.class); + } + + //----- Asynchronous Completion test method implementation ---------------// + + private void doTestAsyncCompletionProducerWithPresettleOptions(String uriOptions, boolean transacted, boolean anonymous, boolean senderSettled, boolean transferSettled, Class<? extends Destination> destType) throws Exception { + doTestAsyncCompletionProducerWithPresettleOptions(uriOptions, transacted, anonymous, true, senderSettled, transferSettled, destType); + } + + private void doTestAsyncCompletionProducerWithPresettleOptions(String uriOptions, boolean transacted, boolean anonymous, boolean relaySupported, boolean senderSettled, boolean transferSettled, Class<? extends Destination> destType) throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + Connection connection = testFixture.establishConnecton(testPeer, uriOptions, relaySupported ? serverCapabilities : null, null); + testPeer.expectBegin(); + + Session session = null; + Binary txnId = null; + + if (transacted) { + // Expect the session, with an immediate link to the transaction coordinator + // using a target with the expected capabilities only. + CoordinatorMatcher txCoordinatorMatcher = new CoordinatorMatcher(); + txCoordinatorMatcher.withCapabilities(arrayContaining(TxnCapability.LOCAL_TXN)); + testPeer.expectSenderAttach(txCoordinatorMatcher, false, false); + + // First expect an unsettled 'declare' transfer to the txn coordinator, and + // reply with a declared disposition state containing the txnId. + txnId = new Binary(new byte[]{ (byte) 1, (byte) 2, (byte) 3, (byte) 4}); + testPeer.expectDeclare(txnId); + + session = connection.createSession(true, Session.SESSION_TRANSACTED); + } else { + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + } + + Destination destination = null; + if (destType == Queue.class) { + destination = session.createQueue("MyQueue"); + } else if (destType == Topic.class) { + destination = session.createTopic("MyTopis"); + } else if (destType == TemporaryQueue.class) { + String dynamicAddress = "myTempQueueAddress"; + testPeer.expectTempQueueCreationAttach(dynamicAddress); + destination = session.createTemporaryQueue(); + } else if (destType == TemporaryTopic.class) { + String dynamicAddress = "myTempTopicAddress"; + testPeer.expectTempTopicCreationAttach(dynamicAddress); + destination = session.createTemporaryTopic(); + } else { + fail("unexpected type"); + } + + if (senderSettled) { + testPeer.expectSettledSenderAttach(); + } else { + testPeer.expectSenderAttach(); + } + + TestJmsCompletionListener listener = new TestJmsCompletionListener(); + // TODO Can change to plain MessageProducer when JMS 2.0 API dependency is added. + JmsMessageProducer producer = null; + if (anonymous) { + producer = (JmsMessageProducer) session.createProducer(null); + } else { + producer = (JmsMessageProducer) session.createProducer(destination); + } + + // Create and transfer a new message + MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true); + headersMatcher.withDurable(equalTo(true)); + MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true); + TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher(); + messageMatcher.setHeadersMatcher(headersMatcher); + messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher); + + Matcher<?> stateMatcher = nullValue(); + if (transacted) { + stateMatcher = new TransactionalStateMatcher(); + ((TransactionalStateMatcher) stateMatcher).withTxnId(equalTo(txnId)); + ((TransactionalStateMatcher) stateMatcher).withOutcome(nullValue()); + } + + ListDescribedType responseState = new Accepted(); + if (transacted) { + TransactionalState txState = new TransactionalState(); + txState.setTxnId(txnId); + txState.setOutcome(new Accepted()); + } + + if (transferSettled) { + testPeer.expectTransfer(messageMatcher, stateMatcher, true, false, responseState, false); + } else { + testPeer.expectTransfer(messageMatcher, stateMatcher, false, true, responseState, true); + } + + if (anonymous && !relaySupported) { + testPeer.expectDetach(true, true, true); + } + + Message message = session.createTextMessage(); + + if (anonymous) { + producer.send(destination, message, listener); + } else { + producer.send(message, listener); + } + + if (transacted) { + testPeer.expectDischarge(txnId, true); + } + + testPeer.expectClose(); + + assertTrue("Did not get async callback", listener.awaitCompletion(2000, TimeUnit.SECONDS)); + assertNull(listener.exception); + assertNotNull(listener.message); + assertTrue(listener.message instanceof TextMessage); + assertEquals(1, listener.successCount); + assertEquals(0, listener.errorCount); + + connection.close(); + + testPeer.waitForAllHandlersToComplete(1000); + } + } + + private class TestJmsCompletionListener implements JmsCompletionListener { + + private final CountDownLatch completed; + + public volatile int successCount; + public volatile int errorCount; + + public volatile Message message; + public volatile Exception exception; + + public TestJmsCompletionListener() { + this(1); + } + + public TestJmsCompletionListener(int expected) { + this.completed = new CountDownLatch(expected); + } + + public boolean awaitCompletion(long timeout, TimeUnit units) throws InterruptedException { + return completed.await(timeout, units); + } + + @Override + public void onCompletion(Message message) { + LOG.info("JmsCompletionListener onCompletion called with message: {}", message); + this.message = message; + this.successCount++; + + completed.countDown(); + } + + @Override + public void onException(Message message, Exception exception) { + LOG.info("JmsCompletionListener onException called with message: {} error {}", message, exception); + + this.message = message; + this.exception = exception; + this.errorCount++; + + completed.countDown(); + } + } } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/3a03663b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java index 6c53398..1e69eb8 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java @@ -40,7 +40,10 @@ import java.util.Set; import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import javax.jms.BytesMessage; import javax.jms.Connection; import javax.jms.DeliveryMode; import javax.jms.ExceptionListener; @@ -54,9 +57,11 @@ import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic; +import org.apache.qpid.jms.JmsCompletionListener; import org.apache.qpid.jms.JmsConnection; import org.apache.qpid.jms.JmsConnectionFactory; import org.apache.qpid.jms.JmsDefaultConnectionListener; +import org.apache.qpid.jms.JmsMessageProducer; import org.apache.qpid.jms.JmsOperationTimedOutException; import org.apache.qpid.jms.JmsSendTimedOutException; import org.apache.qpid.jms.message.foreign.ForeignJmsMessage; @@ -68,10 +73,13 @@ import org.apache.qpid.jms.test.testpeer.ListDescribedType; import org.apache.qpid.jms.test.testpeer.TestAmqpPeer; import org.apache.qpid.jms.test.testpeer.basictypes.AmqpError; import org.apache.qpid.jms.test.testpeer.basictypes.TerminusDurability; +import org.apache.qpid.jms.test.testpeer.describedtypes.Accepted; import org.apache.qpid.jms.test.testpeer.describedtypes.Modified; import org.apache.qpid.jms.test.testpeer.describedtypes.Rejected; import org.apache.qpid.jms.test.testpeer.describedtypes.Released; +import org.apache.qpid.jms.test.testpeer.describedtypes.TransactionalState; import org.apache.qpid.jms.test.testpeer.matchers.TargetMatcher; +import org.apache.qpid.jms.test.testpeer.matchers.TransactionalStateMatcher; import org.apache.qpid.jms.test.testpeer.matchers.sections.MessageAnnotationsSectionMatcher; import org.apache.qpid.jms.test.testpeer.matchers.sections.MessageHeaderSectionMatcher; import org.apache.qpid.jms.test.testpeer.matchers.sections.MessagePropertiesSectionMatcher; @@ -1000,6 +1008,79 @@ public class ProducerIntegrationTest extends QpidJmsTestCase { } @Test(timeout = 20000) + public void testRemotelyEndProducerCompletesAsyncSends() throws Exception { + final String BREAD_CRUMB = "ErrorMessage"; + + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + final AtomicBoolean producerClosed = new AtomicBoolean(); + JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer); + connection.addConnectionListener(new JmsDefaultConnectionListener() { + @Override + public void onProducerClosed(MessageProducer producer, Exception exception) { + producerClosed.set(true); + } + }); + + testPeer.expectBegin(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + // Create a producer, then remotely end the session afterwards. + testPeer.expectSenderAttach(); + + Queue queue = session.createQueue("myQueue"); + // TODO - Can revert to just MessageProducer once JMS 2.0 API is used + final JmsMessageProducer producer = (JmsMessageProducer) session.createProducer(queue); + + Message message = session.createTextMessage("content"); + + final int MSG_COUNT = 3; + + for (int i = 0; i < MSG_COUNT; ++i) { + testPeer.expectTransferButDoNotRespond(new TransferPayloadCompositeMatcher()); + } + + testPeer.remotelyDetachLastOpenedLinkOnLastOpenedSession(true, true, AmqpError.RESOURCE_LIMIT_EXCEEDED, BREAD_CRUMB, 50); + + TestJmsCompletionListener listener = new TestJmsCompletionListener(MSG_COUNT); + try { + for (int i = 0; i < MSG_COUNT; ++i) { + producer.send(message, listener); + } + } catch (JMSException e) { + LOG.warn("Caught unexpected error: {}", e.getMessage()); + fail("No expected exception for this send."); + } + + testPeer.waitForAllHandlersToComplete(1000); + + // Verify the producer gets marked closed + assertTrue(listener.awaitCompletion(2000, TimeUnit.SECONDS)); + assertEquals(MSG_COUNT, listener.errorCount); + + // Verify the session is now marked closed + try { + producer.getDeliveryMode(); + fail("Expected ISE to be thrown due to being closed"); + } catch (IllegalStateException jmsise) { + String errorMessage = jmsise.getCause().getMessage(); + assertTrue(errorMessage.contains(AmqpError.RESOURCE_LIMIT_EXCEEDED.toString())); + assertTrue(errorMessage.contains(BREAD_CRUMB)); + } + + assertTrue("Producer closed callback didn't trigger", producerClosed.get()); + + // Try closing it explicitly, should effectively no-op in client. + // The test peer will throw during close if it sends anything. + producer.close(); + + testPeer.expectClose(); + connection.close(); + + testPeer.waitForAllHandlersToComplete(1000); + } + } + + @Test(timeout = 20000) public void testRemotelyCloseConnectionDuringSyncSend() throws Exception { final String BREAD_CRUMB = "ErrorMessageBreadCrumb"; @@ -1150,6 +1231,50 @@ public class ProducerIntegrationTest extends QpidJmsTestCase { } @Test(timeout = 20000) + public void testAsyncCompletionGetsTimedOutErrorWhenNoDispostionArrives() throws Exception { + try(TestAmqpPeer testPeer = new TestAmqpPeer();) { + JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer); + connection.setSendTimeout(500); + + testPeer.expectBegin(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + String queueName = "myQueue"; + Queue queue = session.createQueue(queueName); + + Message message = session.createTextMessage("text"); + TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher(); + + // Expect the producer to attach and grant it some credit, it should send + // a transfer which we will not send any response for which should cause the + // send operation to time out. + testPeer.expectSenderAttach(); + testPeer.expectTransferButDoNotRespond(messageMatcher); + testPeer.expectClose(); + + // TODO - Can revert to plain JMS once 2.0 is supported. + JmsMessageProducer producer = (JmsMessageProducer) session.createProducer(queue); + TestJmsCompletionListener listener = new TestJmsCompletionListener(); + + try { + producer.send(message, listener); + } catch (Throwable error) { + LOG.info("Caught expected error: {}", error.getMessage()); + fail("Send should not fail for async."); + } + + assertTrue("Did not get async callback", listener.awaitCompletion(2000, TimeUnit.SECONDS)); + assertNotNull(listener.exception); + assertTrue(listener.exception instanceof JmsSendTimedOutException); + assertNotNull(listener.message); + + connection.close(); + + testPeer.waitForAllHandlersToComplete(1000); + } + } + + @Test(timeout = 20000) public void testSyncSendMessageRejected() throws Exception { doSyncSendMessageNotAcceptedTestImpl(new Rejected()); } @@ -1709,6 +1834,430 @@ public class ProducerIntegrationTest extends QpidJmsTestCase { } @Test(timeout = 20000) + public void testAsyncCompletionAfterSendMessageGetDispoation() throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + Connection connection = testFixture.establishConnecton(testPeer); + testPeer.expectBegin(); + testPeer.expectSenderAttach(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue("myQueue"); + + // TODO Can change to plain MessageProducer when JMS 2.0 API dependency is added. + JmsMessageProducer producer = (JmsMessageProducer) session.createProducer(queue); + + // Create and transfer a new message + String text = "myMessage"; + testPeer.expectTransfer(new TransferPayloadCompositeMatcher()); + testPeer.expectClose(); + + TextMessage message = session.createTextMessage(text); + TestJmsCompletionListener listener = new TestJmsCompletionListener(); + + producer.send(message, listener); + + assertTrue("Did not get async callback", listener.awaitCompletion(2000, TimeUnit.SECONDS)); + assertNull(listener.exception); + assertNotNull(listener.message); + assertTrue(listener.message instanceof TextMessage); + + connection.close(); + + testPeer.waitForAllHandlersToComplete(1000); + } + } + + @Test(timeout = 20000) + public void testAsyncCompletionResetsBytesMessage() throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + Connection connection = testFixture.establishConnecton(testPeer); + testPeer.expectBegin(); + testPeer.expectSenderAttach(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue("myQueue"); + + // TODO Can change to plain MessageProducer when JMS 2.0 API dependency is added. + JmsMessageProducer producer = (JmsMessageProducer) session.createProducer(queue); + + // Create and transfer a new message + testPeer.expectTransfer(new TransferPayloadCompositeMatcher()); + testPeer.expectClose(); + + Binary payload = new Binary(new byte[] {1, 2, 3, 4}); + BytesMessage message = session.createBytesMessage(); + message.writeBytes(payload.getArray()); + + TestJmsCompletionListener listener = new TestJmsCompletionListener(); + + producer.send(message, listener); + + assertTrue("Did not get async callback", listener.awaitCompletion(2000, TimeUnit.SECONDS)); + assertNull(listener.exception); + assertNotNull(listener.message); + assertTrue(listener.message instanceof BytesMessage); + + BytesMessage completed = (BytesMessage) listener.message; + assertEquals(payload.getLength(), completed.getBodyLength()); + byte[] data = new byte[payload.getLength()]; + completed.readBytes(data); + + connection.close(); + + testPeer.waitForAllHandlersToComplete(1000); + } + } + + @Test(timeout = 20000) + public void testAsyncCompletionSendMessageRejected() throws Exception { + doAsyncCompletionSendMessageNotAcceptedTestImpl(new Rejected()); + } + + @Test(timeout = 20000) + public void testAsyncCompletionSendMessageReleased() throws Exception { + doAsyncCompletionSendMessageNotAcceptedTestImpl(new Released()); + } + + @Test(timeout = 20000) + public void testAsyncCompletionSendMessageModifiedDeliveryFailed() throws Exception { + Modified modified = new Modified(); + modified.setDeliveryFailed(true); + + doAsyncCompletionSendMessageNotAcceptedTestImpl(modified); + } + + @Test(timeout = 20000) + public void testAsyncCompletionSendMessageModifiedUndeliverable() throws Exception { + Modified modified = new Modified(); + modified.setUndeliverableHere(true); + + doAsyncCompletionSendMessageNotAcceptedTestImpl(modified); + } + + @Test(timeout = 20000) + public void testAsyncCompletionSendMessageModifiedDeliveryFailedUndeliverable() throws Exception { + Modified modified = new Modified(); + modified.setDeliveryFailed(true); + modified.setUndeliverableHere(true); + + doAsyncCompletionSendMessageNotAcceptedTestImpl(modified); + } + + private void doAsyncCompletionSendMessageNotAcceptedTestImpl(ListDescribedType responseState) throws JMSException, InterruptedException, Exception, IOException { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer); + + final CountDownLatch asyncError = new CountDownLatch(1); + + connection.setExceptionListener(new ExceptionListener() { + + @Override + public void onException(JMSException exception) { + LOG.debug("ExceptionListener got error: {}", exception.getMessage()); + asyncError.countDown(); + } + }); + + testPeer.expectBegin(); + testPeer.expectSenderAttach(); + testPeer.expectSenderAttach(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue("myQueue"); + + // TODO Can change to plain MessageProducer when JMS 2.0 API dependency is added. + JmsMessageProducer producer = (JmsMessageProducer) session.createProducer(queue); + + // Create a second producer which allows for a safe wait for credit for the + // first producer without the need for a sleep. Otherwise the first producer + // might not do an actual async send due to not having received credit yet. + session.createProducer(queue); + + Message message = session.createTextMessage("content"); + + testPeer.expectTransfer(new TransferPayloadCompositeMatcher(), nullValue(), false, responseState, true); + + assertNull("Should not yet have a JMSDestination", message.getJMSDestination()); + + TestJmsCompletionListener listener = new TestJmsCompletionListener(); + try { + producer.send(message, listener); + } catch (JMSException e) { + LOG.warn("Caught unexpected error: {}", e.getMessage()); + fail("No expected exception for this send."); + } + + assertTrue("Did not get async callback", listener.awaitCompletion(2000, TimeUnit.SECONDS)); + assertNotNull(listener.exception); + assertNotNull(listener.message); + assertTrue(listener.message instanceof TextMessage); + + testPeer.expectTransfer(new TransferPayloadCompositeMatcher()); + testPeer.expectClose(); + + listener = new TestJmsCompletionListener(); + try { + producer.send(message, listener); + } catch (JMSException e) { + LOG.warn("Caught unexpected error: {}", e.getMessage()); + fail("No expected exception for this send."); + } + + assertTrue("Did not get async callback", listener.awaitCompletion(2000, TimeUnit.SECONDS)); + assertNull(listener.exception); + assertNotNull(listener.message); + assertTrue(listener.message instanceof TextMessage); + + connection.close(); + + testPeer.waitForAllHandlersToComplete(2000); + } + } + + @Test(timeout = 20000) + public void testAsyncCompletionSessionCloseThrowsIllegalStateException() throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + Connection connection = testFixture.establishConnecton(testPeer); + testPeer.expectBegin(); + testPeer.expectSenderAttach(); + + final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue("myQueue"); + + // TODO Can change to plain MessageProducer when JMS 2.0 API dependency is added. + JmsMessageProducer producer = (JmsMessageProducer) session.createProducer(queue); + + // Create and transfer a new message + String text = "myMessage"; + testPeer.expectTransfer(new TransferPayloadCompositeMatcher()); + testPeer.expectClose(); + + final AtomicReference<JMSException> closeError = new AtomicReference<JMSException>(null); + TextMessage message = session.createTextMessage(text); + TestJmsCompletionListener listener = new TestJmsCompletionListener() { + + @Override + public void onCompletion(Message message) { + + try { + session.close(); + } catch (JMSException jmsEx) { + closeError.set(jmsEx); + } + + super.onCompletion(message); + }; + }; + + producer.send(message, listener); + + assertTrue("Did not get async callback", listener.awaitCompletion(2000, TimeUnit.SECONDS)); + assertNull(listener.exception); + assertNotNull(listener.message); + assertTrue(listener.message instanceof TextMessage); + assertNotNull(closeError.get()); + assertTrue(closeError.get() instanceof IllegalStateException); + + connection.close(); + + testPeer.waitForAllHandlersToComplete(1000); + } + } + + @Test(timeout = 20000) + public void testAsyncCompletionConnectionCloseThrowsIllegalStateException() throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + final Connection connection = testFixture.establishConnecton(testPeer); + testPeer.expectBegin(); + testPeer.expectSenderAttach(); + + final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue("myQueue"); + + // TODO Can change to plain MessageProducer when JMS 2.0 API dependency is added. + JmsMessageProducer producer = (JmsMessageProducer) session.createProducer(queue); + + // Create and transfer a new message + String text = "myMessage"; + testPeer.expectTransfer(new TransferPayloadCompositeMatcher()); + testPeer.expectClose(); + + final AtomicReference<JMSException> closeError = new AtomicReference<JMSException>(null); + TextMessage message = session.createTextMessage(text); + TestJmsCompletionListener listener = new TestJmsCompletionListener() { + + @Override + public void onCompletion(Message message) { + + try { + connection.close(); + } catch (JMSException jmsEx) { + closeError.set(jmsEx); + } + + super.onCompletion(message); + }; + }; + + producer.send(message, listener); + + assertTrue("Did not get async callback", listener.awaitCompletion(2000, TimeUnit.SECONDS)); + assertNull(listener.exception); + assertNotNull(listener.message); + assertNotNull(closeError.get()); + assertTrue(closeError.get() instanceof IllegalStateException); + + connection.close(); + + testPeer.waitForAllHandlersToComplete(1000); + } + } + + @Test(timeout = 20000) + public void testAsyncCompletionSessionCommitThrowsIllegalStateException() throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + final Connection connection = testFixture.establishConnecton(testPeer); + testPeer.expectBegin(); + testPeer.expectCoordinatorAttach(); + + // First expect an unsettled 'declare' transfer to the txn coordinator, and + // reply with a declared disposition state containing the txnId. + Binary txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8}); + testPeer.expectDeclare(txnId); + + testPeer.expectSenderAttach(); + + final Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + Queue queue = session.createQueue("myQueue"); + + // TODO Can change to plain MessageProducer when JMS 2.0 API dependency is added. + JmsMessageProducer producer = (JmsMessageProducer) session.createProducer(queue); + + // Create and transfer a new message + String text = "myMessage"; + MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true); + MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true); + MessagePropertiesSectionMatcher propsMatcher = new MessagePropertiesSectionMatcher(true); + TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher(); + messageMatcher.setHeadersMatcher(headersMatcher); + messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher); + messageMatcher.setPropertiesMatcher(propsMatcher); + messageMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(text)); + TransactionalStateMatcher stateMatcher = new TransactionalStateMatcher(); + stateMatcher.withTxnId(equalTo(txnId)); + stateMatcher.withOutcome(nullValue()); + TransactionalState txState = new TransactionalState(); + txState.setTxnId(txnId); + txState.setOutcome(new Accepted()); + + testPeer.expectTransfer(messageMatcher, stateMatcher, false, txState, true); + testPeer.expectDischarge(txnId, true); + testPeer.expectClose(); + + final AtomicReference<JMSException> commitError = new AtomicReference<JMSException>(null); + TextMessage message = session.createTextMessage(text); + TestJmsCompletionListener listener = new TestJmsCompletionListener() { + + @Override + public void onCompletion(Message message) { + + try { + session.commit(); + } catch (JMSException jmsEx) { + commitError.set(jmsEx); + } + + super.onCompletion(message); + }; + }; + + producer.send(message, listener); + + assertTrue("Did not get async callback", listener.awaitCompletion(2000, TimeUnit.SECONDS)); + assertNull(listener.exception); + assertNotNull(listener.message); + assertNotNull(commitError.get()); + assertTrue(commitError.get() instanceof IllegalStateException); + + connection.close(); + + testPeer.waitForAllHandlersToComplete(1000); + } + } + + @Test(timeout = 20000) + public void testAsyncCompletionSessionRollbackThrowsIllegalStateException() throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + final Connection connection = testFixture.establishConnecton(testPeer); + testPeer.expectBegin(); + testPeer.expectCoordinatorAttach(); + + // First expect an unsettled 'declare' transfer to the txn coordinator, and + // reply with a declared disposition state containing the txnId. + Binary txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8}); + testPeer.expectDeclare(txnId); + + testPeer.expectSenderAttach(); + + final Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + Queue queue = session.createQueue("myQueue"); + + // TODO Can change to plain MessageProducer when JMS 2.0 API dependency is added. + JmsMessageProducer producer = (JmsMessageProducer) session.createProducer(queue); + + // Create and transfer a new message + String text = "myMessage"; + MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true); + MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true); + MessagePropertiesSectionMatcher propsMatcher = new MessagePropertiesSectionMatcher(true); + TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher(); + messageMatcher.setHeadersMatcher(headersMatcher); + messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher); + messageMatcher.setPropertiesMatcher(propsMatcher); + messageMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(text)); + TransactionalStateMatcher stateMatcher = new TransactionalStateMatcher(); + stateMatcher.withTxnId(equalTo(txnId)); + stateMatcher.withOutcome(nullValue()); + TransactionalState txState = new TransactionalState(); + txState.setTxnId(txnId); + txState.setOutcome(new Accepted()); + + testPeer.expectTransfer(messageMatcher, stateMatcher, false, txState, true); + testPeer.expectDischarge(txnId, true); + testPeer.expectClose(); + + final AtomicReference<JMSException> rollback = new AtomicReference<JMSException>(null); + TextMessage message = session.createTextMessage(text); + TestJmsCompletionListener listener = new TestJmsCompletionListener() { + + @Override + public void onCompletion(Message message) { + + try { + session.rollback(); + } catch (JMSException jmsEx) { + rollback.set(jmsEx); + } + + super.onCompletion(message); + }; + }; + + producer.send(message, listener); + + assertTrue("Did not get async callback", listener.awaitCompletion(2000, TimeUnit.SECONDS)); + assertNull(listener.exception); + assertNotNull(listener.message); + assertNotNull(rollback.get()); + assertTrue(rollback.get() instanceof IllegalStateException); + + connection.close(); + + testPeer.waitForAllHandlersToComplete(1000); + } + } + + @Test(timeout = 20000) public void testAnonymousProducerSendFailureHandledWhenAnonymousRelayNodeIsNotSupported() throws Exception { try (TestAmqpPeer testPeer = new TestAmqpPeer();) { @@ -1831,4 +2380,47 @@ public class ProducerIntegrationTest extends QpidJmsTestCase { testPeer.waitForAllHandlersToComplete(1000); } } + + private class TestJmsCompletionListener implements JmsCompletionListener { + + private final CountDownLatch completed; + + public volatile int successCount; + public volatile int errorCount; + + public volatile Message message; + public volatile Exception exception; + + public TestJmsCompletionListener() { + this(1); + } + + public TestJmsCompletionListener(int expected) { + this.completed = new CountDownLatch(expected); + } + + public boolean awaitCompletion(long timeout, TimeUnit units) throws InterruptedException { + return completed.await(timeout, units); + } + + @Override + public void onCompletion(Message message) { + LOG.info("JmsCompletionListener onCompletion called with message: {}", message); + this.message = message; + this.successCount++; + + completed.countDown(); + } + + @Override + public void onException(Message message, Exception exception) { + LOG.info("JmsCompletionListener onException called with message: {} error {}", message, exception); + + this.message = message; + this.exception = exception; + this.errorCount++; + + completed.countDown(); + } + } } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/3a03663b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java index 7bf35e4..34d60f1 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java @@ -57,8 +57,10 @@ import javax.jms.TextMessage; import javax.jms.Topic; import javax.jms.TopicSubscriber; +import org.apache.qpid.jms.JmsCompletionListener; import org.apache.qpid.jms.JmsConnection; import org.apache.qpid.jms.JmsDefaultConnectionListener; +import org.apache.qpid.jms.JmsMessageProducer; import org.apache.qpid.jms.JmsOperationTimedOutException; import org.apache.qpid.jms.JmsSession; import org.apache.qpid.jms.policy.JmsDefaultPrefetchPolicy; @@ -1446,7 +1448,7 @@ public class SessionIntegrationTest extends QpidJmsTestCase { public void testCreateAnonymousProducerWhenAnonymousRelayNodeIsNotSupported() throws Exception { try (TestAmqpPeer testPeer = new TestAmqpPeer();) { - //DO NOT add capability to indicate server support for ANONYMOUS-RELAY + // DO NOT add capability to indicate server support for ANONYMOUS-RELAY Connection connection = testFixture.establishConnecton(testPeer); connection.start(); @@ -1460,12 +1462,12 @@ public class SessionIntegrationTest extends QpidJmsTestCase { // Expect no AMQP traffic when we create the anonymous producer, as it will wait // for an actual send to occur on the producer before anything occurs on the wire - //Create an anonymous producer + // Create an anonymous producer MessageProducer producer = session.createProducer(null); assertNotNull("Producer object was null", producer); - //Expect a new message sent by the above producer to cause creation of a new - //sender link to the given destination, then closing the link after the message is sent. + // Expect a new message sent by the above producer to cause creation of a new + // sender link to the given destination, then closing the link after the message is sent. TargetMatcher targetMatcher = new TargetMatcher(); targetMatcher.withAddress(equalTo(topicName)); targetMatcher.withDynamic(equalTo(false)); @@ -1484,7 +1486,7 @@ public class SessionIntegrationTest extends QpidJmsTestCase { Message message = session.createMessage(); producer.send(dest, message); - //Repeat the send and observe another attach->transfer->detach. + // Repeat the send and observe another attach->transfer->detach. testPeer.expectSenderAttach(targetMatcher, false, false); testPeer.expectTransfer(messageMatcher); testPeer.expectDetach(true, true, true); @@ -1675,6 +1677,78 @@ public class SessionIntegrationTest extends QpidJmsTestCase { } @Test(timeout = 20000) + public void testRemotelyEndSessionWithProducerCompletesAsyncSends() throws Exception { + final String BREAD_CRUMB = "ErrorMessage"; + + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + final AtomicBoolean sessionClosed = new AtomicBoolean(); + JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer); + connection.addConnectionListener(new JmsDefaultConnectionListener() { + @Override + public void onSessionClosed(Session session, Exception exception) { + sessionClosed.set(true); + } + }); + + testPeer.expectBegin(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + // Create a producer, then remotely end the session afterwards. + testPeer.expectSenderAttach(); + + Queue queue = session.createQueue("myQueue"); + // TODO - Can revert to just MessageProducer once JMS 2.0 API is used + final JmsMessageProducer producer = (JmsMessageProducer) session.createProducer(queue); + + Message message = session.createTextMessage("content"); + + final int MSG_COUNT = 3; + + for (int i = 0; i < MSG_COUNT; ++i) { + testPeer.expectTransferButDoNotRespond(new TransferPayloadCompositeMatcher()); + } + + testPeer.remotelyEndLastOpenedSession(true, 0, AmqpError.RESOURCE_DELETED, BREAD_CRUMB); + + TestJmsCompletionListener listener = new TestJmsCompletionListener(MSG_COUNT); + try { + for (int i = 0; i < MSG_COUNT; ++i) { + producer.send(message, listener); + } + } catch (JMSException e) { + LOG.warn("Caught unexpected error: {}", e.getMessage()); + fail("No expected exception for this send."); + } + + testPeer.waitForAllHandlersToComplete(1000); + + // Verify the producer gets marked closed + assertTrue(listener.awaitCompletion(2000, TimeUnit.SECONDS)); + assertEquals(MSG_COUNT, listener.errorCount); + assertEquals(0, listener.successCount); + + // Verify the session is now marked closed + try { + session.getAcknowledgeMode(); + fail("Expected ISE to be thrown due to being closed"); + } catch (IllegalStateException jmsise) { + String errorMessage = jmsise.getCause().getMessage(); + assertTrue(errorMessage.contains(AmqpError.RESOURCE_DELETED.toString())); + assertTrue(errorMessage.contains(BREAD_CRUMB)); + } + + assertTrue("Session closed callback didn't trigger", sessionClosed.get()); + + // Try closing it explicitly, should effectively no-op in client. + // The test peer will throw during close if it sends anything. + producer.close(); + + testPeer.expectClose(); + connection.close(); + } + } + + @Test(timeout = 20000) public void testRemotelyEndSessionWithConsumer() throws Exception { final String BREAD_CRUMB = "ErrorMessage"; @@ -1920,4 +1994,34 @@ public class SessionIntegrationTest extends QpidJmsTestCase { connection.close(); } } + + private class TestJmsCompletionListener implements JmsCompletionListener { + + private final CountDownLatch completed; + + public volatile int successCount; + public volatile int errorCount; + + public TestJmsCompletionListener(int expected) { + completed = new CountDownLatch(expected); + } + + public boolean awaitCompletion(long timeout, TimeUnit units) throws InterruptedException { + return completed.await(timeout, units); + } + + @Override + public void onCompletion(Message message) { + LOG.info("JmsCompletionListener onCompletion called with message: {}", message); + successCount++; + completed.countDown(); + } + + @Override + public void onException(Message message, Exception exception) { + LOG.info("JmsCompletionListener onException called with message: {} error {}", message, exception); + errorCount++; + completed.countDown(); + } + } } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/3a03663b/qpid-jms-client/src/test/java/org/apache/qpid/jms/producer/JmsMessageProducerTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/producer/JmsMessageProducerTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/producer/JmsMessageProducerTest.java index 80a6e6a..7117e9f 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/producer/JmsMessageProducerTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/producer/JmsMessageProducerTest.java @@ -22,35 +22,63 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import javax.jms.Connection; import javax.jms.DeliveryMode; +import javax.jms.Destination; import javax.jms.InvalidDestinationException; +import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageProducer; import javax.jms.Session; +import org.apache.qpid.jms.JmsCompletionListener; +import org.apache.qpid.jms.JmsConnectionFactory; import org.apache.qpid.jms.JmsConnectionTestSupport; import org.apache.qpid.jms.JmsDestination; +import org.apache.qpid.jms.JmsMessageProducer; import org.apache.qpid.jms.JmsQueue; import org.apache.qpid.jms.JmsSession; +import org.apache.qpid.jms.message.JmsOutboundMessageDispatch; +import org.apache.qpid.jms.provider.mock.MockRemotePeer; +import org.apache.qpid.jms.test.Wait; +import org.junit.After; import org.junit.Before; import org.junit.Test; import org.mockito.Mockito; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Test basic functionality around JmsConnection */ public class JmsMessageProducerTest extends JmsConnectionTestSupport { + private static final Logger LOG = LoggerFactory.getLogger(JmsMessageProducerTest.class); + + private final MyCompletionListener completionListener = new MyCompletionListener(); private JmsSession session; + private final MockRemotePeer remotePeer = new MockRemotePeer(); @Override @Before public void setUp() throws Exception { super.setUp(); + remotePeer.start(); connection = createConnectionToMockProvider(); session = (JmsSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE); } + @Override + @After + public void tearDown() throws Exception { + remotePeer.shutdown(); + super.tearDown(); + } + @Test(timeout = 10000) public void testMultipleCloseCallsNoErrors() throws Exception { MessageProducer producer = session.createProducer(null); @@ -106,7 +134,7 @@ public class JmsMessageProducerTest extends JmsConnectionTestSupport { @Test(timeout = 10000) public void testAnonymousProducerThrowsUOEWhenExplictDestinationNotProvided() throws Exception { - MessageProducer producer = session.createProducer(null); + JmsMessageProducer producer = (JmsMessageProducer) session.createProducer(null); Message message = Mockito.mock(Message.class); try { @@ -117,17 +145,31 @@ public class JmsMessageProducerTest extends JmsConnectionTestSupport { } try { + producer.send(message, completionListener); + fail("Expected exception not thrown"); + } catch (UnsupportedOperationException uoe) { + // expected + } + + try { producer.send(message, Message.DEFAULT_DELIVERY_MODE, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE); fail("Expected exception not thrown"); } catch (UnsupportedOperationException uoe) { // expected } + + try { + producer.send(message, Message.DEFAULT_DELIVERY_MODE, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE, completionListener); + fail("Expected exception not thrown"); + } catch (UnsupportedOperationException uoe) { + // expected + } } @Test(timeout = 10000) public void testExplicitProducerThrowsUOEWhenExplictDestinationIsProvided() throws Exception { JmsDestination dest = new JmsQueue("explicitDestination"); - MessageProducer producer = session.createProducer(new JmsQueue()); + JmsMessageProducer producer = (JmsMessageProducer) session.createProducer(dest); Message message = Mockito.mock(Message.class); try { @@ -138,16 +180,30 @@ public class JmsMessageProducerTest extends JmsConnectionTestSupport { } try { + producer.send(dest, message, completionListener); + fail("Expected exception not thrown"); + } catch (UnsupportedOperationException uoe) { + // expected + } + + try { producer.send(dest, message, Message.DEFAULT_DELIVERY_MODE, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE); fail("Expected exception not thrown"); } catch (UnsupportedOperationException uoe) { // expected } + + try { + producer.send(dest, message, Message.DEFAULT_DELIVERY_MODE, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE, completionListener); + fail("Expected exception not thrown"); + } catch (UnsupportedOperationException uoe) { + // expected + } } @Test(timeout = 10000) public void testAnonymousDestinationProducerThrowsIDEWhenNullDestinationIsProvided() throws Exception { - MessageProducer producer = session.createProducer(null); + JmsMessageProducer producer = (JmsMessageProducer) session.createProducer(null); Message message = Mockito.mock(Message.class); try { @@ -158,10 +214,369 @@ public class JmsMessageProducerTest extends JmsConnectionTestSupport { } try { + producer.send(null, message, completionListener); + fail("Expected exception not thrown"); + } catch (InvalidDestinationException ide) { + // expected + } + + try { producer.send(null, message, Message.DEFAULT_DELIVERY_MODE, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE); fail("Expected exception not thrown"); } catch (InvalidDestinationException ide) { // expected } + + try { + producer.send(null, message, Message.DEFAULT_DELIVERY_MODE, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE, completionListener); + fail("Expected exception not thrown"); + } catch (InvalidDestinationException ide) { + // expected + } + } + + @Test(timeout = 10000) + public void testAnonymousProducerThrowsIAEWhenNullCompletionListenerProvided() throws Exception { + JmsMessageProducer producer = (JmsMessageProducer) session.createProducer(null); + JmsDestination dest = new JmsQueue("explicitDestination"); + + Message message = Mockito.mock(Message.class); + + try { + producer.send(dest, message, null); + fail("Expected exception not thrown"); + } catch (IllegalArgumentException iae) { + // expected + } + + try { + producer.send(dest, message, Message.DEFAULT_DELIVERY_MODE, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE, null); + fail("Expected exception not thrown"); + } catch (IllegalArgumentException iae) { + // expected + } + } + + @Test(timeout = 10000) + public void testExplicitProducerThrowsIAEWhenNullCompletionListenerIsProvided() throws Exception { + JmsDestination dest = new JmsQueue("explicitDestination"); + JmsMessageProducer producer = (JmsMessageProducer) session.createProducer(dest); + + Message message = Mockito.mock(Message.class); + try { + producer.send(message, null); + fail("Expected exception not thrown"); + } catch (IllegalArgumentException iae) { + // expected + } + + try { + producer.send(message, Message.DEFAULT_DELIVERY_MODE, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE, null); + fail("Expected exception not thrown"); + } catch (IllegalArgumentException iae) { + // expected + } + } + + @Test(timeout = 10000) + public void testInOrderSendAcksCompletionsReturnInOrder() throws Exception { + final int MESSAGE_COUNT = 3; + + final MockRemotePeer remotePoor = MockRemotePeer.INSTANCE; + + JmsConnectionFactory factory = new JmsConnectionFactory( + "mock://localhost?mock.delayCompletionCalls=true"); + + Connection connection = factory.createConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + final Destination destination = new JmsQueue("explicitDestination"); + JmsMessageProducer producer = (JmsMessageProducer) session.createProducer(destination); + final MyCompletionListener listener = new MyCompletionListener(); + + sendMessages(MESSAGE_COUNT, producer, listener); + + assertTrue("Not all sends made it to the remote", Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return remotePoor.getPendingCompletions(destination).size() == MESSAGE_COUNT; + } + })); + + remotePoor.completeAllPendingSends(destination); + + assertTrue("Not all completions triggered", Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return listener.getCompletedSends().size() == MESSAGE_COUNT; + } + })); + + assertMessageCompletedInOrder(MESSAGE_COUNT, listener); + + connection.close(); + } + + @Test(timeout = 10000) + public void testReversedOrderSendAcksCompletionsReturnInOrder() throws Exception { + final int MESSAGE_COUNT = 3; + + final MockRemotePeer remotePoor = MockRemotePeer.INSTANCE; + + JmsConnectionFactory factory = new JmsConnectionFactory( + "mock://localhost?mock.delayCompletionCalls=true"); + + Connection connection = factory.createConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + final Destination destination = new JmsQueue("explicitDestination"); + JmsMessageProducer producer = (JmsMessageProducer) session.createProducer(destination); + final MyCompletionListener listener = new MyCompletionListener(); + + sendMessages(MESSAGE_COUNT, producer, listener); + + assertTrue("Not all sends made it to the remote", Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return remotePoor.getPendingCompletions(destination).size() == MESSAGE_COUNT; + } + })); + + List<JmsOutboundMessageDispatch> pending = remotePoor.getPendingCompletions(destination); + assertEquals(MESSAGE_COUNT, pending.size()); + Collections.reverse(pending); + + for (JmsOutboundMessageDispatch envelope : pending) { + LOG.info("Trigger completion of message: {}", envelope.getMessage().getJMSMessageID()); + remotePoor.completePendingSend(envelope); + } + + assertTrue("Not all completions triggered", Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return listener.getCompletedSends().size() == MESSAGE_COUNT; + } + })); + + assertMessageCompletedInOrder(MESSAGE_COUNT, listener); + + connection.close(); + } + + @Test(timeout = 10000) + public void testInOrderSendFailuresCompletionsReturnInOrder() throws Exception { + final int MESSAGE_COUNT = 3; + + final MockRemotePeer remotePoor = MockRemotePeer.INSTANCE; + + JmsConnectionFactory factory = new JmsConnectionFactory( + "mock://localhost?mock.delayCompletionCalls=true"); + + Connection connection = factory.createConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + final Destination destination = new JmsQueue("explicitDestination"); + JmsMessageProducer producer = (JmsMessageProducer) session.createProducer(destination); + final MyCompletionListener listener = new MyCompletionListener(); + + sendMessages(MESSAGE_COUNT, producer, listener); + assertTrue("Not all messages sent", Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return remotePoor.getPendingCompletions(destination).size() == MESSAGE_COUNT; + } + })); + remotePoor.failAllPendingSends(destination, new JMSException("Could not send message")); + + assertTrue("Not all completions triggered", Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return listener.getFailedSends().size() == MESSAGE_COUNT; + } + })); + + assertMessageFailedInOrder(MESSAGE_COUNT, listener); + + connection.close(); + } + + @Test(timeout = 10000) + public void testReversedOrderSendAcksFailuresReturnInOrder() throws Exception { + final int MESSAGE_COUNT = 3; + + final MockRemotePeer remotePoor = MockRemotePeer.INSTANCE; + + JmsConnectionFactory factory = new JmsConnectionFactory( + "mock://localhost?mock.delayCompletionCalls=true"); + + Connection connection = factory.createConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + final Destination destination = new JmsQueue("explicitDestination"); + JmsMessageProducer producer = (JmsMessageProducer) session.createProducer(destination); + final MyCompletionListener listener = new MyCompletionListener(); + + sendMessages(MESSAGE_COUNT, producer, listener); + + assertTrue("Not all sends made it to the remote", Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return remotePoor.getPendingCompletions(destination).size() == MESSAGE_COUNT; + } + })); + + List<JmsOutboundMessageDispatch> pending = remotePoor.getPendingCompletions(destination); + assertEquals(MESSAGE_COUNT, pending.size()); + Collections.reverse(pending); + + for (JmsOutboundMessageDispatch envelope : pending) { + LOG.info("Trigger failure of message: {}", envelope.getMessage().getJMSMessageID()); + remotePoor.failPendingSend(envelope, new JMSException("Failed to send message")); + } + + assertTrue("Not all failures triggered", Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return listener.getFailedSends().size() == MESSAGE_COUNT; + } + })); + + assertMessageFailedInOrder(MESSAGE_COUNT, listener); + + connection.close(); + } + + @Test(timeout = 10000) + public void testInterleavedCompletionsReturnedInOrder() throws Exception { + final int MESSAGE_COUNT = 3; + + final MockRemotePeer remotePoor = MockRemotePeer.INSTANCE; + + JmsConnectionFactory factory = new JmsConnectionFactory( + "mock://localhost?mock.delayCompletionCalls=true"); + + Connection connection = factory.createConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + final Destination destination = new JmsQueue("explicitDestination"); + JmsMessageProducer producer = (JmsMessageProducer) session.createProducer(destination); + final MyCompletionListener listener = new MyCompletionListener(); + + sendMessages(MESSAGE_COUNT, producer, listener); + + assertTrue("Not all sends made it to the remote", Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return remotePoor.getPendingCompletions(destination).size() == MESSAGE_COUNT; + } + })); + + List<JmsOutboundMessageDispatch> pending = remotePoor.getPendingCompletions(destination); + assertEquals(MESSAGE_COUNT, pending.size()); + Collections.reverse(pending); + + for (JmsOutboundMessageDispatch envelope : pending) { + int sequence = envelope.getMessage().getIntProperty("sequence"); + if (sequence % 2 == 0) { + LOG.info("Trigger completion of message: {}", envelope.getMessage().getJMSMessageID()); + remotePoor.completePendingSend(envelope); + } else { + LOG.info("Trigger failure of message: {}", envelope.getMessage().getJMSMessageID()); + remotePoor.failPendingSend(envelope, new JMSException("Failed to send message")); + } + } + + assertTrue("Not all completions triggered", Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return listener.getCombinedSends().size() == MESSAGE_COUNT; + } + })); + + assertTotalCompletionOrder(MESSAGE_COUNT, listener); + + connection.close(); + } + + private void sendMessages(int count, JmsMessageProducer producer, MyCompletionListener listener) throws Exception { + for (int i = 0; i < count; ++i) { + Message message = session.createMessage(); + message.setIntProperty("sequence", i); + + producer.send(message, listener); + } + } + + private void assertMessageCompletedInOrder(int expected, MyCompletionListener listener) throws Exception { + assertEquals("Did not get expected number of completions", expected, listener.completed.size()); + for (int i = 0; i < listener.completed.size(); ++i) { + int sequence = listener.completed.get(i).getIntProperty("sequence"); + assertEquals("Did not complete expected message: " + i + " got: " + sequence, i, sequence); + } + } + + private void assertMessageFailedInOrder(int expected, MyCompletionListener listener) throws Exception { + assertEquals("Did not get expected number of failures", expected, listener.failed.size()); + for (int i = 0; i < listener.failed.size(); ++i) { + int sequence = listener.failed.get(i).getIntProperty("sequence"); + assertEquals("Did not fail expected message: " + i + " got: " + sequence, i, sequence); + } + } + + private void assertTotalCompletionOrder(int expected, MyCompletionListener listener) throws Exception { + assertEquals("Did not get expected number of failures", expected, listener.combinedResult.size()); + for (int i = 0; i < listener.combinedResult.size(); ++i) { + int sequence = listener.combinedResult.get(i).getIntProperty("sequence"); + assertEquals("Did not fail expected message: " + i + " got: " + sequence, i, sequence); + } + } + + private class MyCompletionListener implements JmsCompletionListener { + + private final List<Message> completed = new ArrayList<Message>(); + private final List<Message> failed = new ArrayList<Message>(); + private final List<Message> combinedResult = new ArrayList<Message>(); + + @Override + public void onCompletion(Message message) { + try { + LOG.debug("Recording completed send: {}", message.getJMSMessageID()); + } catch (JMSException e) { + } + completed.add(message); + combinedResult.add(message); + } + + @Override + public void onException(Message message, Exception exception) { + try { + LOG.debug("Recording failed send: {} -> error {}", message.getJMSMessageID(), exception.getMessage()); + } catch (JMSException e) { + } + failed.add(message); + combinedResult.add(message); + } + + public List<Message> getCombinedSends() { + return combinedResult; + } + + public List<Message> getCompletedSends() { + return completed; + } + + public List<Message> getFailedSends() { + return failed; + } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org