This is an automated email from the ASF dual-hosted git repository. robbie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/qpid-jms.git
commit 458e6de93f89cad1e9a75df5607c8a77a234d787 Author: Robbie Gemmell <rob...@apache.org> AuthorDate: Tue Sep 24 16:59:39 2019 +0100 QPIDJMS-473: avoid passing conflicting sync + completion-required flags to the anonymous fallback producer --- .../amqp/AmqpAnonymousFallbackProducer.java | 30 ++-- .../jms/integration/ProducerIntegrationTest.java | 152 +++++++++++++++++++++ 2 files changed, 172 insertions(+), 10 deletions(-) diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAnonymousFallbackProducer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAnonymousFallbackProducer.java index 807ce4b..63e5cbb 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAnonymousFallbackProducer.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAnonymousFallbackProducer.java @@ -59,10 +59,6 @@ public class AmqpAnonymousFallbackProducer extends AmqpProducer { public void send(JmsOutboundMessageDispatch envelope, AsyncResult request) throws ProviderException { LOG.trace("Started send chain for anonymous producer: {}", getProducerId()); - // Force sends marked as asynchronous to be sent synchronous so that the temporary - // producer instance can handle failures and perform necessary completion work on - // the send. - envelope.setSendAsync(false); // Create a new ProducerInfo for the short lived producer that's created to perform the // send to the given AMQP target. @@ -74,7 +70,12 @@ public class AmqpAnonymousFallbackProducer extends AmqpProducer { // it will trigger the open event which will in turn trigger the send event. // The created producer will be closed immediately after the delivery has been acknowledged. AmqpProducerBuilder builder = new AmqpProducerBuilder(session, info); - builder.buildResource(new AnonymousSendRequest(request, builder, envelope)); + builder.buildResource(new AnonymousSendRequest(request, builder, envelope, envelope.isCompletionRequired())); + + // Force sends to be sent synchronous so that the temporary producer instance can handle + // the failures and perform necessary completion work on the send. + envelope.setSendAsync(false); + envelope.setCompletionRequired(false); getParent().getProvider().pumpToProtonTransport(request); } @@ -108,10 +109,12 @@ public class AmqpAnonymousFallbackProducer extends AmqpProducer { private abstract class AnonymousRequest extends WrappedAsyncResult { protected final JmsOutboundMessageDispatch envelope; + private final boolean completionRequired; - public AnonymousRequest(AsyncResult sendResult, JmsOutboundMessageDispatch envelope) { + public AnonymousRequest(AsyncResult sendResult, JmsOutboundMessageDispatch envelope, boolean completionRequired) { super(sendResult); this.envelope = envelope; + this.completionRequired = completionRequired; } /** @@ -124,6 +127,10 @@ public class AmqpAnonymousFallbackProducer extends AmqpProducer { super.onFailure(result); } + public boolean isCompletionRequired() { + return completionRequired; + } + public abstract AmqpProducer getProducer(); } @@ -131,8 +138,8 @@ public class AmqpAnonymousFallbackProducer extends AmqpProducer { private final AmqpProducerBuilder producerBuilder; - public AnonymousSendRequest(AsyncResult sendResult, AmqpProducerBuilder producerBuilder, JmsOutboundMessageDispatch envelope) { - super(sendResult, envelope); + public AnonymousSendRequest(AsyncResult sendResult, AmqpProducerBuilder producerBuilder, JmsOutboundMessageDispatch envelope, boolean completionRequired) { + super(sendResult, envelope, completionRequired); this.producerBuilder = producerBuilder; } @@ -159,7 +166,7 @@ public class AmqpAnonymousFallbackProducer extends AmqpProducer { private final AmqpProducer producer; public AnonymousSendCompleteRequest(AnonymousSendRequest open) { - super(open.getWrappedRequest(), open.envelope); + super(open.getWrappedRequest(), open.envelope, open.isCompletionRequired()); this.producer = open.getProducer(); } @@ -190,7 +197,7 @@ public class AmqpAnonymousFallbackProducer extends AmqpProducer { private final AmqpProducer producer; public AnonymousCloseRequest(AnonymousSendCompleteRequest sendComplete) { - super(sendComplete.getWrappedRequest(), sendComplete.envelope); + super(sendComplete.getWrappedRequest(), sendComplete.envelope, sendComplete.isCompletionRequired()); this.producer = sendComplete.getProducer(); } @@ -199,6 +206,9 @@ public class AmqpAnonymousFallbackProducer extends AmqpProducer { public void onSuccess() { LOG.trace("Close phase of anonymous send complete: {} ", getProducerId()); super.onSuccess(); + if (isCompletionRequired()) { + getParent().getProvider().getProviderListener().onCompletedMessageSend(envelope); + } } @Override diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java index 3c36512..0e5ce7d 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java @@ -2841,6 +2841,158 @@ public class ProducerIntegrationTest extends QpidJmsTestCase { } @Test(timeout = 20000) + public void testAnonymousProducerAsyncCompletionListenerSendFailureHandledWhenAnonymousRelayNodeIsNotSupported() throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + + // DO NOT add capability to indicate server support for ANONYMOUS-RELAY + + Connection connection = testFixture.establishConnecton(testPeer); + + connection.start(); + + testPeer.expectBegin(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + String topicName = "myTopic"; + Topic dest = session.createTopic(topicName); + + // Expect no AMQP traffic when we create the anonymous producer, as it will wait + // for an actual send to occur on the producer before anything occurs on the wire + + //Create an anonymous producer + MessageProducer producer = session.createProducer(null); + assertNotNull("Producer object was null", producer); + + // Expect a new message sent by the above producer to cause creation of a new + // sender link to the given destination, then closing the link after the message is sent. + TargetMatcher targetMatcher = new TargetMatcher(); + targetMatcher.withAddress(equalTo(topicName)); + targetMatcher.withDynamic(equalTo(false)); + targetMatcher.withDurable(equalTo(TerminusDurability.NONE)); + + String content = "testContent"; + TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher(); + messageMatcher.setHeadersMatcher(new MessageHeaderSectionMatcher(true)); + messageMatcher.setMessageAnnotationsMatcher(new MessageAnnotationsSectionMatcher(true)); + messageMatcher.setPropertiesMatcher(new MessagePropertiesSectionMatcher(true)); + messageMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(content)); + + TestJmsCompletionListener completionListener = new TestJmsCompletionListener(); + Message message = session.createTextMessage(content); + + testPeer.expectSenderAttach(targetMatcher, false, false); + testPeer.expectTransfer(messageMatcher, nullValue(), new Rejected(), true); + testPeer.expectDetach(true, true, true); + + // The fallback producer acts as synchronous regardless of the completion listener, + // so exceptions are thrown from send. Only onComplete uses the listener. + try { + producer.send(dest, message, completionListener); + fail("Send should fail"); + } catch (JMSException jmsEx) { + LOG.debug("Caught expected error from failed send."); + } + + //Repeat the send (but accept this time) and observe another attach->transfer->detach. + testPeer.expectSenderAttach(targetMatcher, false, false); + testPeer.expectTransfer(messageMatcher); + testPeer.expectDetach(true, true, true); + + TestJmsCompletionListener completionListener2 = new TestJmsCompletionListener(); + + producer.send(dest, message, completionListener2); + + assertTrue("Did not get completion callback", completionListener2.awaitCompletion(5, TimeUnit.SECONDS)); + assertNull(completionListener2.exception); + Message receivedMessage2 = completionListener2.message; + assertNotNull(receivedMessage2); + assertTrue(receivedMessage2 instanceof TextMessage); + assertEquals(content, ((TextMessage) receivedMessage2).getText()); + + testPeer.expectClose(); + connection.close(); + + testPeer.waitForAllHandlersToComplete(1000); + } + } + + @Test(timeout = 20000) + public void testAnonymousProducerAsyncCompletionListenerSendWhenAnonymousRelayNodeIsNotSupported() throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + + // DO NOT add capability to indicate server support for ANONYMOUS-RELAY + + Connection connection = testFixture.establishConnecton(testPeer); + + connection.start(); + + testPeer.expectBegin(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + String topicName = "myTopic"; + Topic dest = session.createTopic(topicName); + + // Expect no AMQP traffic when we create the anonymous producer, as it will wait + // for an actual send to occur on the producer before anything occurs on the wire + + //Create an anonymous producer + MessageProducer producer = session.createProducer(null); + assertNotNull("Producer object was null", producer); + + // Expect a new message sent by the above producer to cause creation of a new + // sender link to the given destination, then closing the link after the message is sent. + TargetMatcher targetMatcher = new TargetMatcher(); + targetMatcher.withAddress(equalTo(topicName)); + targetMatcher.withDynamic(equalTo(false)); + targetMatcher.withDurable(equalTo(TerminusDurability.NONE)); + + String content = "testContent"; + TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher(); + messageMatcher.setHeadersMatcher(new MessageHeaderSectionMatcher(true)); + messageMatcher.setMessageAnnotationsMatcher(new MessageAnnotationsSectionMatcher(true)); + messageMatcher.setPropertiesMatcher(new MessagePropertiesSectionMatcher(true)); + messageMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(content)); + + testPeer.expectSenderAttach(targetMatcher, false, false); + testPeer.expectTransfer(messageMatcher); + testPeer.expectDetach(true, true, true); + + TestJmsCompletionListener completionListener = new TestJmsCompletionListener(); + Message message = session.createTextMessage(content); + + producer.send(dest, message, completionListener); + + assertTrue("Did not get completion callback", completionListener.awaitCompletion(5, TimeUnit.SECONDS)); + assertNull(completionListener.exception); + Message receivedMessage = completionListener.message; + assertNotNull(receivedMessage); + assertTrue(receivedMessage instanceof TextMessage); + assertEquals(content, ((TextMessage) receivedMessage).getText()); + + //Repeat the send and observe another attach->transfer->detach. + testPeer.expectSenderAttach(targetMatcher, false, false); + testPeer.expectTransfer(messageMatcher); + testPeer.expectDetach(true, true, true); + + TestJmsCompletionListener completionListener2 = new TestJmsCompletionListener(); + + producer.send(dest, message, completionListener2); + + assertTrue("Did not get completion callback", completionListener2.awaitCompletion(5, TimeUnit.SECONDS)); + assertNull(completionListener2.exception); + Message receivedMessage2 = completionListener2.message; + assertNotNull(receivedMessage2); + assertTrue(receivedMessage2 instanceof TextMessage); + assertEquals(content, ((TextMessage) receivedMessage2).getText()); + + testPeer.expectClose(); + connection.close(); + + testPeer.waitForAllHandlersToComplete(1000); + } + } + + @Test(timeout = 20000) public void testSendingMessageSetsJMSDeliveryTimeWithDelay() throws Exception { doSendingMessageSetsJMSDeliveryTimeTestImpl(true); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org