This is an automated email from the ASF dual-hosted git repository.

robbie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-jms.git

commit 458e6de93f89cad1e9a75df5607c8a77a234d787
Author: Robbie Gemmell <rob...@apache.org>
AuthorDate: Tue Sep 24 16:59:39 2019 +0100

    QPIDJMS-473: avoid passing conflicting sync + completion-required flags to 
the anonymous fallback producer
---
 .../amqp/AmqpAnonymousFallbackProducer.java        |  30 ++--
 .../jms/integration/ProducerIntegrationTest.java   | 152 +++++++++++++++++++++
 2 files changed, 172 insertions(+), 10 deletions(-)

diff --git 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAnonymousFallbackProducer.java
 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAnonymousFallbackProducer.java
index 807ce4b..63e5cbb 100644
--- 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAnonymousFallbackProducer.java
+++ 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAnonymousFallbackProducer.java
@@ -59,10 +59,6 @@ public class AmqpAnonymousFallbackProducer extends 
AmqpProducer {
     public void send(JmsOutboundMessageDispatch envelope, AsyncResult request) 
throws ProviderException {
         LOG.trace("Started send chain for anonymous producer: {}", 
getProducerId());
 
-        // Force sends marked as asynchronous to be sent synchronous so that 
the temporary
-        // producer instance can handle failures and perform necessary 
completion work on
-        // the send.
-        envelope.setSendAsync(false);
 
         // Create a new ProducerInfo for the short lived producer that's 
created to perform the
         // send to the given AMQP target.
@@ -74,7 +70,12 @@ public class AmqpAnonymousFallbackProducer extends 
AmqpProducer {
         // it will trigger the open event which will in turn trigger the send 
event.
         // The created producer will be closed immediately after the delivery 
has been acknowledged.
         AmqpProducerBuilder builder = new AmqpProducerBuilder(session, info);
-        builder.buildResource(new AnonymousSendRequest(request, builder, 
envelope));
+        builder.buildResource(new AnonymousSendRequest(request, builder, 
envelope, envelope.isCompletionRequired()));
+
+        // Force sends to be sent synchronous so that the temporary producer 
instance can handle
+        // the failures and perform necessary completion work on the send.
+        envelope.setSendAsync(false);
+        envelope.setCompletionRequired(false);
 
         getParent().getProvider().pumpToProtonTransport(request);
     }
@@ -108,10 +109,12 @@ public class AmqpAnonymousFallbackProducer extends 
AmqpProducer {
     private abstract class AnonymousRequest extends WrappedAsyncResult {
 
         protected final JmsOutboundMessageDispatch envelope;
+        private final boolean completionRequired;
 
-        public AnonymousRequest(AsyncResult sendResult, 
JmsOutboundMessageDispatch envelope) {
+        public AnonymousRequest(AsyncResult sendResult, 
JmsOutboundMessageDispatch envelope, boolean completionRequired) {
             super(sendResult);
             this.envelope = envelope;
+            this.completionRequired = completionRequired;
         }
 
         /**
@@ -124,6 +127,10 @@ public class AmqpAnonymousFallbackProducer extends 
AmqpProducer {
             super.onFailure(result);
         }
 
+        public boolean isCompletionRequired() {
+            return completionRequired;
+        }
+
         public abstract AmqpProducer getProducer();
     }
 
@@ -131,8 +138,8 @@ public class AmqpAnonymousFallbackProducer extends 
AmqpProducer {
 
         private final AmqpProducerBuilder producerBuilder;
 
-        public AnonymousSendRequest(AsyncResult sendResult, 
AmqpProducerBuilder producerBuilder, JmsOutboundMessageDispatch envelope) {
-            super(sendResult, envelope);
+        public AnonymousSendRequest(AsyncResult sendResult, 
AmqpProducerBuilder producerBuilder, JmsOutboundMessageDispatch envelope, 
boolean completionRequired) {
+            super(sendResult, envelope, completionRequired);
 
             this.producerBuilder = producerBuilder;
         }
@@ -159,7 +166,7 @@ public class AmqpAnonymousFallbackProducer extends 
AmqpProducer {
         private final AmqpProducer producer;
 
         public AnonymousSendCompleteRequest(AnonymousSendRequest open) {
-            super(open.getWrappedRequest(), open.envelope);
+            super(open.getWrappedRequest(), open.envelope, 
open.isCompletionRequired());
 
             this.producer = open.getProducer();
         }
@@ -190,7 +197,7 @@ public class AmqpAnonymousFallbackProducer extends 
AmqpProducer {
         private final AmqpProducer producer;
 
         public AnonymousCloseRequest(AnonymousSendCompleteRequest 
sendComplete) {
-            super(sendComplete.getWrappedRequest(), sendComplete.envelope);
+            super(sendComplete.getWrappedRequest(), sendComplete.envelope, 
sendComplete.isCompletionRequired());
 
             this.producer = sendComplete.getProducer();
         }
@@ -199,6 +206,9 @@ public class AmqpAnonymousFallbackProducer extends 
AmqpProducer {
         public void onSuccess() {
             LOG.trace("Close phase of anonymous send complete: {} ", 
getProducerId());
             super.onSuccess();
+            if (isCompletionRequired()) {
+                
getParent().getProvider().getProviderListener().onCompletedMessageSend(envelope);
+            }
         }
 
         @Override
diff --git 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
index 3c36512..0e5ce7d 100644
--- 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
+++ 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
@@ -2841,6 +2841,158 @@ public class ProducerIntegrationTest extends 
QpidJmsTestCase {
     }
 
     @Test(timeout = 20000)
+    public void 
testAnonymousProducerAsyncCompletionListenerSendFailureHandledWhenAnonymousRelayNodeIsNotSupported()
 throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+
+            // DO NOT add capability to indicate server support for 
ANONYMOUS-RELAY
+
+            Connection connection = testFixture.establishConnecton(testPeer);
+
+            connection.start();
+
+            testPeer.expectBegin();
+            Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+
+            String topicName = "myTopic";
+            Topic dest = session.createTopic(topicName);
+
+            // Expect no AMQP traffic when we create the anonymous producer, 
as it will wait
+            // for an actual send to occur on the producer before anything 
occurs on the wire
+
+            //Create an anonymous producer
+            MessageProducer producer = session.createProducer(null);
+            assertNotNull("Producer object was null", producer);
+
+            // Expect a new message sent by the above producer to cause 
creation of a new
+            // sender link to the given destination, then closing the link 
after the message is sent.
+            TargetMatcher targetMatcher = new TargetMatcher();
+            targetMatcher.withAddress(equalTo(topicName));
+            targetMatcher.withDynamic(equalTo(false));
+            targetMatcher.withDurable(equalTo(TerminusDurability.NONE));
+
+            String content = "testContent";
+            TransferPayloadCompositeMatcher messageMatcher = new 
TransferPayloadCompositeMatcher();
+            messageMatcher.setHeadersMatcher(new 
MessageHeaderSectionMatcher(true));
+            messageMatcher.setMessageAnnotationsMatcher(new 
MessageAnnotationsSectionMatcher(true));
+            messageMatcher.setPropertiesMatcher(new 
MessagePropertiesSectionMatcher(true));
+            messageMatcher.setMessageContentMatcher(new 
EncodedAmqpValueMatcher(content));
+
+            TestJmsCompletionListener completionListener = new 
TestJmsCompletionListener();
+            Message message = session.createTextMessage(content);
+
+            testPeer.expectSenderAttach(targetMatcher, false, false);
+            testPeer.expectTransfer(messageMatcher, nullValue(), new 
Rejected(), true);
+            testPeer.expectDetach(true, true, true);
+
+            // The fallback producer acts as synchronous regardless of the 
completion listener,
+            // so exceptions are thrown from send. Only onComplete uses the 
listener.
+            try {
+                producer.send(dest, message, completionListener);
+                fail("Send should fail");
+            } catch (JMSException jmsEx) {
+                LOG.debug("Caught expected error from failed send.");
+            }
+
+            //Repeat the send (but accept this time) and observe another 
attach->transfer->detach.
+            testPeer.expectSenderAttach(targetMatcher, false, false);
+            testPeer.expectTransfer(messageMatcher);
+            testPeer.expectDetach(true, true, true);
+
+            TestJmsCompletionListener completionListener2 = new 
TestJmsCompletionListener();
+
+            producer.send(dest, message, completionListener2);
+
+            assertTrue("Did not get completion callback", 
completionListener2.awaitCompletion(5, TimeUnit.SECONDS));
+            assertNull(completionListener2.exception);
+            Message receivedMessage2 = completionListener2.message;
+            assertNotNull(receivedMessage2);
+            assertTrue(receivedMessage2 instanceof TextMessage);
+            assertEquals(content, ((TextMessage) receivedMessage2).getText());
+
+            testPeer.expectClose();
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    @Test(timeout = 20000)
+    public void 
testAnonymousProducerAsyncCompletionListenerSendWhenAnonymousRelayNodeIsNotSupported()
 throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+
+            // DO NOT add capability to indicate server support for 
ANONYMOUS-RELAY
+
+            Connection connection = testFixture.establishConnecton(testPeer);
+
+            connection.start();
+
+            testPeer.expectBegin();
+            Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+
+            String topicName = "myTopic";
+            Topic dest = session.createTopic(topicName);
+
+            // Expect no AMQP traffic when we create the anonymous producer, 
as it will wait
+            // for an actual send to occur on the producer before anything 
occurs on the wire
+
+            //Create an anonymous producer
+            MessageProducer producer = session.createProducer(null);
+            assertNotNull("Producer object was null", producer);
+
+            // Expect a new message sent by the above producer to cause 
creation of a new
+            // sender link to the given destination, then closing the link 
after the message is sent.
+            TargetMatcher targetMatcher = new TargetMatcher();
+            targetMatcher.withAddress(equalTo(topicName));
+            targetMatcher.withDynamic(equalTo(false));
+            targetMatcher.withDurable(equalTo(TerminusDurability.NONE));
+
+            String content = "testContent";
+            TransferPayloadCompositeMatcher messageMatcher = new 
TransferPayloadCompositeMatcher();
+            messageMatcher.setHeadersMatcher(new 
MessageHeaderSectionMatcher(true));
+            messageMatcher.setMessageAnnotationsMatcher(new 
MessageAnnotationsSectionMatcher(true));
+            messageMatcher.setPropertiesMatcher(new 
MessagePropertiesSectionMatcher(true));
+            messageMatcher.setMessageContentMatcher(new 
EncodedAmqpValueMatcher(content));
+
+            testPeer.expectSenderAttach(targetMatcher, false, false);
+            testPeer.expectTransfer(messageMatcher);
+            testPeer.expectDetach(true, true, true);
+
+            TestJmsCompletionListener completionListener = new 
TestJmsCompletionListener();
+            Message message = session.createTextMessage(content);
+
+            producer.send(dest, message, completionListener);
+
+            assertTrue("Did not get completion callback", 
completionListener.awaitCompletion(5, TimeUnit.SECONDS));
+            assertNull(completionListener.exception);
+            Message receivedMessage = completionListener.message;
+            assertNotNull(receivedMessage);
+            assertTrue(receivedMessage instanceof TextMessage);
+            assertEquals(content, ((TextMessage) receivedMessage).getText());
+
+            //Repeat the send and observe another attach->transfer->detach.
+            testPeer.expectSenderAttach(targetMatcher, false, false);
+            testPeer.expectTransfer(messageMatcher);
+            testPeer.expectDetach(true, true, true);
+
+            TestJmsCompletionListener completionListener2 = new 
TestJmsCompletionListener();
+
+            producer.send(dest, message, completionListener2);
+
+            assertTrue("Did not get completion callback", 
completionListener2.awaitCompletion(5, TimeUnit.SECONDS));
+            assertNull(completionListener2.exception);
+            Message receivedMessage2 = completionListener2.message;
+            assertNotNull(receivedMessage2);
+            assertTrue(receivedMessage2 instanceof TextMessage);
+            assertEquals(content, ((TextMessage) receivedMessage2).getText());
+
+            testPeer.expectClose();
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    @Test(timeout = 20000)
     public void testSendingMessageSetsJMSDeliveryTimeWithDelay() throws 
Exception {
         doSendingMessageSetsJMSDeliveryTimeTestImpl(true);
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to