This is an automated email from the ASF dual-hosted git repository.

tabish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-jms.git


The following commit(s) were added to refs/heads/main by this push:
     new 8ab821d  QPIDJMS-534 Fail pending resource creation calls on 
connection drop
8ab821d is described below

commit 8ab821db90506d5900c313260d6666fb8b6c0fe1
Author: Timothy Bish <tabish...@gmail.com>
AuthorDate: Tue Apr 27 12:22:47 2021 -0400

    QPIDJMS-534 Fail pending resource creation calls on connection drop
    
    Respond quickly to resource creation failures on connection drop instead
    of waiting for a configured request timeout to avoid stuck calls to
    create during reconnection processing.
---
 .../qpid/jms/provider/amqp/AmqpProvider.java       |  22 +-
 .../amqp/builders/AmqpResourceBuilder.java         |  10 +-
 .../provider/failover/FailoverIntegrationTest.java | 267 +++++++++++++++++++++
 .../qpid/jms/test/testpeer/TestAmqpPeer.java       |  33 +++
 4 files changed, 328 insertions(+), 4 deletions(-)

diff --git 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
index 59aa383..8058678 100644
--- 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
+++ 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
@@ -160,6 +160,7 @@ public class AmqpProvider implements Provider, 
TransportListener , AmqpResourceP
     private final ProviderFutureFactory futureFactory;
     private AsyncResult connectionRequest;
     private ScheduledFuture<?> nextIdleTimeoutCheck;
+    private List<AsyncResult> failOnConnectionDropList = new ArrayList<>();
 
     /**
      * Create a new instance of an AmqpProvider bonded to the given remote URI.
@@ -1146,8 +1147,17 @@ public class AmqpProvider implements Provider, 
TransportListener , AmqpResourceP
         failureCause = ex;
 
         ProviderListener listener = this.listener;
-        if (listener != null) {
-            
listener.onConnectionFailure(ProviderExceptionSupport.createNonFatalOrPassthrough(ex));
+        try {
+            if (listener != null) {
+                
listener.onConnectionFailure(ProviderExceptionSupport.createNonFatalOrPassthrough(ex));
+            }
+        } finally {
+            // Alert the request to the failure and then afterwards clean up 
any stragglers that have not
+            // been altered to the provider having failed to avoid any 
lingering blocked resource create
+            // calls and possibly others as needed.
+            for (AsyncResult request : failOnConnectionDropList) {
+                request.onFailure(ex);
+            }
         }
     }
 
@@ -1546,6 +1556,14 @@ public class AmqpProvider implements Provider, 
TransportListener , AmqpResourceP
         return null;
     }
 
+    public void addToFailOnConnectionDropTracking(AsyncResult result) {
+        failOnConnectionDropList.add(result);
+    }
+
+    public void removeFromFailOnConnectionDropTracking(AsyncResult result) {
+        failOnConnectionDropList.remove(result);
+    }
+
     //----- Internal implementation 
------------------------------------------//
 
     private void checkClosedOrFailed() throws ProviderException {
diff --git 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpResourceBuilder.java
 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpResourceBuilder.java
index 408bda7..5dee9c0 100644
--- 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpResourceBuilder.java
+++ 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpResourceBuilder.java
@@ -54,10 +54,12 @@ public abstract class AmqpResourceBuilder<TARGET extends 
AmqpResource, PARENT ex
     protected ENDPOINT endpoint;
     protected final PARENT parent;
     protected final INFO resourceInfo;
+    protected final AmqpProvider provider;
 
     public AmqpResourceBuilder(PARENT parent, INFO resourceInfo) {
         this.parent = parent;
         this.resourceInfo = resourceInfo;
+        this.provider = parent.getProvider();
     }
 
     /**
@@ -72,6 +74,9 @@ public abstract class AmqpResourceBuilder<TARGET extends 
AmqpResource, PARENT ex
     public void buildResource(final AsyncResult request) {
         this.request = request;
 
+        // Store the request with the provider for failure if connection drops
+        provider.addToFailOnConnectionDropTracking(request);
+
         // Create the local end of the manage resource.
         endpoint = createEndpoint(resourceInfo);
         endpoint.setContext(this);
@@ -80,8 +85,6 @@ public abstract class AmqpResourceBuilder<TARGET extends 
AmqpResource, PARENT ex
         // Create the resource object now
         resource = createResource(parent, resourceInfo, endpoint);
 
-        AmqpProvider provider = parent.getProvider();
-
         if (getRequestTimeout() > JmsConnectionInfo.INFINITE) {
 
             // Attempt to schedule a cancellation of the pending open request, 
can return
@@ -147,6 +150,7 @@ public abstract class AmqpResourceBuilder<TARGET extends 
AmqpResource, PARENT ex
     //----- Standard open and close handlers 
---------------------------------//
 
     protected final void handleOpened(AmqpProvider provider) {
+        provider.removeFromFailOnConnectionDropTracking(request);
 
         // perform any post open processing prior to opened state inspection.
         afterOpened();
@@ -172,6 +176,8 @@ public abstract class AmqpResourceBuilder<TARGET extends 
AmqpResource, PARENT ex
     }
 
     protected final void handleClosed(AmqpProvider provider, ProviderException 
cause) {
+        provider.removeFromFailOnConnectionDropTracking(request);
+
         // If the resource being built is closed during the creation process
         // then this is always an error.
 
diff --git 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java
 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java
index 805ae70..4e0733b 100644
--- 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java
+++ 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java
@@ -1877,6 +1877,42 @@ public class FailoverIntegrationTest extends 
QpidJmsTestCase {
     }
 
     @Test(timeout = 20000)
+    public void testFailoverEnforcesRequestTimeoutSessionWhenBeginSent() 
throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer()) {
+
+            // Create a peer to connect to so we can get to a state where we
+            // can try to send when offline.
+            final String peerURI = createPeerURI(testPeer);
+
+            LOG.info("Original peer is at: {}", peerURI);
+
+            // Connect to the test peer
+            testPeer.expectSaslAnonymous();
+            testPeer.expectOpen();
+            testPeer.expectBegin();
+            testPeer.expectBegin(false);
+            testPeer.dropAfterLastHandler();
+
+            final JmsConnection connection = establishAnonymousConnecton(
+                
"jms.requestTimeout=1000&failover.reconnectDelay=2000&failover.maxReconnectAttempts=30",
 testPeer);
+            connection.start();
+
+            try {
+                connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+                fail("Should have thrown an exception");
+            } catch (JmsOperationTimedOutException jmsEx) {
+                LOG.info("Caught timed out exception from send:", jmsEx);
+            } catch (Exception ex) {
+                fail("Should have caught a timed out exception");
+            }
+
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    @Test(timeout = 20000)
     public void testFailoverEnforcesSendTimeout() throws Exception {
         try (TestAmqpPeer testPeer = new TestAmqpPeer()) {
 
@@ -4914,6 +4950,237 @@ public class FailoverIntegrationTest extends 
QpidJmsTestCase {
             finalPeer.waitForAllHandlersToComplete(1000);        }
     }
 
+    @Test(timeout = 20000)
+    public void testSessionCreationRecoversAfterDropWithNoBeginResponse() 
throws Exception {
+        try (TestAmqpPeer originalPeer = new TestAmqpPeer();
+             TestAmqpPeer finalPeer = new TestAmqpPeer();) {
+
+            final String content = "myContent";
+            final DescribedType amqpValueNullContent = new 
AmqpValueDescribedType(content);
+
+            originalPeer.expectSaslAnonymous();
+            originalPeer.expectOpen();
+            originalPeer.expectBegin();
+            originalPeer.expectBegin(false);
+            originalPeer.dropAfterLastHandler(20);
+
+            finalPeer.expectSaslAnonymous();
+            finalPeer.expectOpen();
+            finalPeer.expectBegin();
+            finalPeer.expectBegin();
+            finalPeer.expectReceiverAttach();
+            finalPeer.expectLinkFlowRespondWithTransfer(null, null, null, 
null, amqpValueNullContent);
+            finalPeer.expectDispositionThatIsAcceptedAndSettled();
+            finalPeer.expectClose();
+
+            final JmsConnection connection = 
establishAnonymousConnecton(originalPeer, finalPeer);
+
+            try {
+                connection.start();
+            } catch (Exception ex) {
+                fail("Should not have thrown an Exception: " + ex);
+            }
+
+            Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+            Queue queue = session.createQueue("myQueue");
+            MessageConsumer consumer = session.createConsumer(queue);
+            Message message = consumer.receive(2000);
+
+            connection.close();
+
+            originalPeer.waitForAllHandlersToCompleteNoAssert(1000);
+            finalPeer.waitForAllHandlersToComplete(1000);
+
+            assertNotNull(message);
+            assertTrue(message instanceof TextMessage);
+            assertEquals(content, ((TextMessage) message).getText());
+        }
+    }
+
+    @Test(timeout = 20000)
+    public void 
testMultipleSessionCreationRecoversAfterDropWithNoBeginResponseAndFailedRecoveryAttempt()
 throws Exception {
+        try (TestAmqpPeer originalPeer = new TestAmqpPeer();
+             TestAmqpPeer intermediatePeer = new TestAmqpPeer();
+             TestAmqpPeer finalPeer = new TestAmqpPeer();) {
+
+            final String content = "myContent";
+            final DescribedType amqpValueNullContent = new 
AmqpValueDescribedType(content);
+
+            originalPeer.expectSaslAnonymous();
+            originalPeer.expectOpen();
+            originalPeer.expectBegin();
+            originalPeer.expectBegin();
+            originalPeer.expectBegin(false);
+            originalPeer.dropAfterLastHandler(20);
+
+            intermediatePeer.expectSaslAnonymous();
+            intermediatePeer.expectOpen();
+            intermediatePeer.expectBegin();
+            intermediatePeer.expectBegin(false);
+            intermediatePeer.dropAfterLastHandler();
+
+            finalPeer.expectSaslAnonymous();
+            finalPeer.expectOpen();
+            finalPeer.expectBegin();
+            finalPeer.expectBegin();
+            finalPeer.expectBegin();
+            finalPeer.expectReceiverAttach();
+            finalPeer.expectLinkFlowRespondWithTransfer(null, null, null, 
null, amqpValueNullContent);
+            finalPeer.expectDispositionThatIsAcceptedAndSettled();
+            finalPeer.expectReceiverAttach();
+            finalPeer.expectLinkFlowRespondWithTransfer(null, null, null, 
null, amqpValueNullContent);
+            finalPeer.expectDispositionThatIsAcceptedAndSettled();
+            finalPeer.expectClose();
+
+            final JmsConnection connection = 
establishAnonymousConnecton(originalPeer, intermediatePeer, finalPeer);
+
+            try {
+                connection.start();
+            } catch (Exception ex) {
+                fail("Should not have thrown an Exception: " + ex);
+            }
+
+            Session session1 = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+            Session session2 = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+
+            Queue queue1 = session1.createQueue("myQueue");
+            MessageConsumer consumer1 = session1.createConsumer(queue1);
+            Message message1 = consumer1.receive(2000);
+
+            Queue queue2 = session2.createQueue("myQueue");
+            MessageConsumer consumer2 = session2.createConsumer(queue2);
+            Message message2 = consumer2.receive(2000);
+
+            connection.close();
+
+            originalPeer.waitForAllHandlersToComplete(1000);
+            intermediatePeer.waitForAllHandlersToComplete(1000);
+            finalPeer.waitForAllHandlersToComplete(1000);
+
+            assertNotNull(message1);
+            assertTrue(message1 instanceof TextMessage);
+            assertEquals(content, ((TextMessage) message1).getText());
+            assertNotNull(message2);
+            assertTrue(message2 instanceof TextMessage);
+            assertEquals(content, ((TextMessage) message2).getText());
+        }
+    }
+
+    @Test(timeout = 20000)
+    public void 
testMultipleSenderCreationRecoversAfterDropWithNoAttachResponseAndFailedRecoveryAttempt()
 throws Exception {
+        try (TestAmqpPeer originalPeer = new TestAmqpPeer();
+             TestAmqpPeer intermediatePeer = new TestAmqpPeer();
+             TestAmqpPeer finalPeer = new TestAmqpPeer();) {
+
+            originalPeer.expectSaslAnonymous();
+            originalPeer.expectOpen();
+            originalPeer.expectBegin();
+            originalPeer.expectBegin();
+            originalPeer.expectSenderAttach();
+            originalPeer.expectSenderAttachButDoNotRespond();
+            originalPeer.dropAfterLastHandler(20);
+
+            intermediatePeer.expectSaslAnonymous();
+            intermediatePeer.expectOpen();
+            intermediatePeer.expectBegin();
+            intermediatePeer.expectBegin();
+            intermediatePeer.expectSenderAttachButDoNotRespond();
+            intermediatePeer.dropAfterLastHandler();
+
+            finalPeer.expectSaslAnonymous();
+            finalPeer.expectOpen();
+            finalPeer.expectBegin();
+            finalPeer.expectBegin();
+            finalPeer.expectSenderAttach();
+            finalPeer.expectSenderAttach();
+            finalPeer.expectClose();
+
+            final JmsConnection connection = 
establishAnonymousConnecton(originalPeer, intermediatePeer, finalPeer);
+
+            try {
+                connection.start();
+            } catch (Exception ex) {
+                fail("Should not have thrown an Exception: " + ex);
+            }
+
+            Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+            Queue queue = session.createQueue("myQueue");
+
+            MessageProducer producer1 = session.createProducer(queue);
+            MessageProducer producer2 = session.createProducer(queue);
+
+            assertNotNull(producer1);
+            assertNotNull(producer2);
+
+            assertEquals(queue, producer1.getDestination());
+            assertEquals(queue, producer2.getDestination());
+
+            connection.close();
+
+            originalPeer.waitForAllHandlersToComplete(1000);
+            intermediatePeer.waitForAllHandlersToComplete(1000);
+            finalPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    @Test(timeout = 20000)
+    public void 
testSenderAndReceiverCreationRecoversAfterDropWithNoAttachResponseAndFailedRecoveryAttempt()
 throws Exception {
+        try (TestAmqpPeer originalPeer = new TestAmqpPeer();
+             TestAmqpPeer intermediatePeer = new TestAmqpPeer();
+             TestAmqpPeer finalPeer = new TestAmqpPeer();) {
+
+            originalPeer.expectSaslAnonymous();
+            originalPeer.expectOpen();
+            originalPeer.expectBegin();
+            originalPeer.expectBegin();
+            originalPeer.expectSenderAttach();
+            originalPeer.expectReceiverAttachButDoNotRespond();
+            originalPeer.dropAfterLastHandler(20);
+
+            intermediatePeer.expectSaslAnonymous();
+            intermediatePeer.expectOpen();
+            intermediatePeer.expectBegin();
+            intermediatePeer.expectBegin();
+            intermediatePeer.expectSenderAttachButDoNotRespond();
+            intermediatePeer.dropAfterLastHandler(10);
+
+            finalPeer.expectSaslAnonymous();
+            finalPeer.expectOpen();
+            finalPeer.expectBegin();
+            finalPeer.expectBegin();
+            finalPeer.expectSenderAttach();
+            finalPeer.expectReceiverAttach();
+            finalPeer.expectLinkFlow();
+            finalPeer.expectClose();
+
+            final JmsConnection connection = 
establishAnonymousConnecton(originalPeer, intermediatePeer, finalPeer);
+
+            try {
+                connection.start();
+            } catch (Exception ex) {
+                fail("Should not have thrown an Exception: " + ex);
+            }
+
+            Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+            Queue queue = session.createQueue("myQueue");
+
+            MessageProducer producer = session.createProducer(queue);
+            MessageConsumer consumer = session.createConsumer(queue);
+
+            assertNotNull(producer);
+            assertNotNull(consumer);
+
+            assertEquals(queue, producer.getDestination());
+            assertNull(consumer.getMessageListener());
+
+            connection.close();
+
+            originalPeer.waitForAllHandlersToComplete(1000);
+            intermediatePeer.waitForAllHandlersToComplete(1000);
+            finalPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
     private JmsConnection establishAnonymousConnecton(TestAmqpPeer... peers) 
throws JMSException {
         return establishAnonymousConnecton(null, null, peers);
     }
diff --git 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
index 6d54148..388323b 100644
--- 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
+++ 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
@@ -1019,6 +1019,11 @@ public class TestAmqpPeer implements AutoCloseable
         expectBegin(notNullValue(), true);
     }
 
+    public void expectBegin(boolean sendResponse)
+    {
+        expectBegin(notNullValue(), sendResponse);
+    }
+
     public void expectBegin(Matcher<?> outgoingWindowMatcher, boolean 
sendResponse)
     {
         final BeginMatcher beginMatcher = new BeginMatcher()
@@ -1240,6 +1245,20 @@ public class TestAmqpPeer implements AutoCloseable
         addHandler(attachMatcher);
     }
 
+    public void expectSenderAttachButDoNotRespond()
+    {
+        final AttachMatcher attachMatcher = new AttachMatcher()
+                .withName(notNullValue())
+                .withHandle(notNullValue())
+                .withRole(equalTo(Role.SENDER))
+                .withSndSettleMode(Matchers.oneOf(SenderSettleMode.SETTLED, 
SenderSettleMode.UNSETTLED))
+                .withRcvSettleMode(equalTo(ReceiverSettleMode.FIRST))
+                .withSource(notNullValue())
+                .withTarget(notNullValue());
+
+        addHandler(attachMatcher);
+    }
+
     public void expectSenderAttach()
     {
         expectSenderAttach(notNullValue(), false, false);
@@ -1433,6 +1452,20 @@ public class TestAmqpPeer implements AutoCloseable
         expectReceiverAttach(notNullValue(), notNullValue(), true);
     }
 
+    public void expectReceiverAttachButDoNotRespond()
+    {
+        final AttachMatcher attachMatcher = new AttachMatcher()
+                .withName(notNullValue())
+                .withHandle(notNullValue())
+                .withRole(equalTo(Role.RECEIVER))
+                .withSndSettleMode(Matchers.oneOf(SenderSettleMode.SETTLED, 
SenderSettleMode.UNSETTLED))
+                .withRcvSettleMode(equalTo(ReceiverSettleMode.FIRST))
+                .withSource(notNullValue())
+                .withTarget(notNullValue());
+
+        addHandler(attachMatcher);
+    }
+
     public void expectReceiverAttach()
     {
         expectReceiverAttach(notNullValue(), notNullValue());

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to