This is an automated email from the ASF dual-hosted git repository. tabish pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-jms.git
The following commit(s) were added to refs/heads/main by this push: new 8ab821d QPIDJMS-534 Fail pending resource creation calls on connection drop 8ab821d is described below commit 8ab821db90506d5900c313260d6666fb8b6c0fe1 Author: Timothy Bish <tabish...@gmail.com> AuthorDate: Tue Apr 27 12:22:47 2021 -0400 QPIDJMS-534 Fail pending resource creation calls on connection drop Respond quickly to resource creation failures on connection drop instead of waiting for a configured request timeout to avoid stuck calls to create during reconnection processing. --- .../qpid/jms/provider/amqp/AmqpProvider.java | 22 +- .../amqp/builders/AmqpResourceBuilder.java | 10 +- .../provider/failover/FailoverIntegrationTest.java | 267 +++++++++++++++++++++ .../qpid/jms/test/testpeer/TestAmqpPeer.java | 33 +++ 4 files changed, 328 insertions(+), 4 deletions(-) diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java index 59aa383..8058678 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java @@ -160,6 +160,7 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP private final ProviderFutureFactory futureFactory; private AsyncResult connectionRequest; private ScheduledFuture<?> nextIdleTimeoutCheck; + private List<AsyncResult> failOnConnectionDropList = new ArrayList<>(); /** * Create a new instance of an AmqpProvider bonded to the given remote URI. @@ -1146,8 +1147,17 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP failureCause = ex; ProviderListener listener = this.listener; - if (listener != null) { - listener.onConnectionFailure(ProviderExceptionSupport.createNonFatalOrPassthrough(ex)); + try { + if (listener != null) { + listener.onConnectionFailure(ProviderExceptionSupport.createNonFatalOrPassthrough(ex)); + } + } finally { + // Alert the request to the failure and then afterwards clean up any stragglers that have not + // been altered to the provider having failed to avoid any lingering blocked resource create + // calls and possibly others as needed. + for (AsyncResult request : failOnConnectionDropList) { + request.onFailure(ex); + } } } @@ -1546,6 +1556,14 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP return null; } + public void addToFailOnConnectionDropTracking(AsyncResult result) { + failOnConnectionDropList.add(result); + } + + public void removeFromFailOnConnectionDropTracking(AsyncResult result) { + failOnConnectionDropList.remove(result); + } + //----- Internal implementation ------------------------------------------// private void checkClosedOrFailed() throws ProviderException { diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpResourceBuilder.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpResourceBuilder.java index 408bda7..5dee9c0 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpResourceBuilder.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpResourceBuilder.java @@ -54,10 +54,12 @@ public abstract class AmqpResourceBuilder<TARGET extends AmqpResource, PARENT ex protected ENDPOINT endpoint; protected final PARENT parent; protected final INFO resourceInfo; + protected final AmqpProvider provider; public AmqpResourceBuilder(PARENT parent, INFO resourceInfo) { this.parent = parent; this.resourceInfo = resourceInfo; + this.provider = parent.getProvider(); } /** @@ -72,6 +74,9 @@ public abstract class AmqpResourceBuilder<TARGET extends AmqpResource, PARENT ex public void buildResource(final AsyncResult request) { this.request = request; + // Store the request with the provider for failure if connection drops + provider.addToFailOnConnectionDropTracking(request); + // Create the local end of the manage resource. endpoint = createEndpoint(resourceInfo); endpoint.setContext(this); @@ -80,8 +85,6 @@ public abstract class AmqpResourceBuilder<TARGET extends AmqpResource, PARENT ex // Create the resource object now resource = createResource(parent, resourceInfo, endpoint); - AmqpProvider provider = parent.getProvider(); - if (getRequestTimeout() > JmsConnectionInfo.INFINITE) { // Attempt to schedule a cancellation of the pending open request, can return @@ -147,6 +150,7 @@ public abstract class AmqpResourceBuilder<TARGET extends AmqpResource, PARENT ex //----- Standard open and close handlers ---------------------------------// protected final void handleOpened(AmqpProvider provider) { + provider.removeFromFailOnConnectionDropTracking(request); // perform any post open processing prior to opened state inspection. afterOpened(); @@ -172,6 +176,8 @@ public abstract class AmqpResourceBuilder<TARGET extends AmqpResource, PARENT ex } protected final void handleClosed(AmqpProvider provider, ProviderException cause) { + provider.removeFromFailOnConnectionDropTracking(request); + // If the resource being built is closed during the creation process // then this is always an error. diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java index 805ae70..4e0733b 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java @@ -1877,6 +1877,42 @@ public class FailoverIntegrationTest extends QpidJmsTestCase { } @Test(timeout = 20000) + public void testFailoverEnforcesRequestTimeoutSessionWhenBeginSent() throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer()) { + + // Create a peer to connect to so we can get to a state where we + // can try to send when offline. + final String peerURI = createPeerURI(testPeer); + + LOG.info("Original peer is at: {}", peerURI); + + // Connect to the test peer + testPeer.expectSaslAnonymous(); + testPeer.expectOpen(); + testPeer.expectBegin(); + testPeer.expectBegin(false); + testPeer.dropAfterLastHandler(); + + final JmsConnection connection = establishAnonymousConnecton( + "jms.requestTimeout=1000&failover.reconnectDelay=2000&failover.maxReconnectAttempts=30", testPeer); + connection.start(); + + try { + connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + fail("Should have thrown an exception"); + } catch (JmsOperationTimedOutException jmsEx) { + LOG.info("Caught timed out exception from send:", jmsEx); + } catch (Exception ex) { + fail("Should have caught a timed out exception"); + } + + connection.close(); + + testPeer.waitForAllHandlersToComplete(1000); + } + } + + @Test(timeout = 20000) public void testFailoverEnforcesSendTimeout() throws Exception { try (TestAmqpPeer testPeer = new TestAmqpPeer()) { @@ -4914,6 +4950,237 @@ public class FailoverIntegrationTest extends QpidJmsTestCase { finalPeer.waitForAllHandlersToComplete(1000); } } + @Test(timeout = 20000) + public void testSessionCreationRecoversAfterDropWithNoBeginResponse() throws Exception { + try (TestAmqpPeer originalPeer = new TestAmqpPeer(); + TestAmqpPeer finalPeer = new TestAmqpPeer();) { + + final String content = "myContent"; + final DescribedType amqpValueNullContent = new AmqpValueDescribedType(content); + + originalPeer.expectSaslAnonymous(); + originalPeer.expectOpen(); + originalPeer.expectBegin(); + originalPeer.expectBegin(false); + originalPeer.dropAfterLastHandler(20); + + finalPeer.expectSaslAnonymous(); + finalPeer.expectOpen(); + finalPeer.expectBegin(); + finalPeer.expectBegin(); + finalPeer.expectReceiverAttach(); + finalPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, amqpValueNullContent); + finalPeer.expectDispositionThatIsAcceptedAndSettled(); + finalPeer.expectClose(); + + final JmsConnection connection = establishAnonymousConnecton(originalPeer, finalPeer); + + try { + connection.start(); + } catch (Exception ex) { + fail("Should not have thrown an Exception: " + ex); + } + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue("myQueue"); + MessageConsumer consumer = session.createConsumer(queue); + Message message = consumer.receive(2000); + + connection.close(); + + originalPeer.waitForAllHandlersToCompleteNoAssert(1000); + finalPeer.waitForAllHandlersToComplete(1000); + + assertNotNull(message); + assertTrue(message instanceof TextMessage); + assertEquals(content, ((TextMessage) message).getText()); + } + } + + @Test(timeout = 20000) + public void testMultipleSessionCreationRecoversAfterDropWithNoBeginResponseAndFailedRecoveryAttempt() throws Exception { + try (TestAmqpPeer originalPeer = new TestAmqpPeer(); + TestAmqpPeer intermediatePeer = new TestAmqpPeer(); + TestAmqpPeer finalPeer = new TestAmqpPeer();) { + + final String content = "myContent"; + final DescribedType amqpValueNullContent = new AmqpValueDescribedType(content); + + originalPeer.expectSaslAnonymous(); + originalPeer.expectOpen(); + originalPeer.expectBegin(); + originalPeer.expectBegin(); + originalPeer.expectBegin(false); + originalPeer.dropAfterLastHandler(20); + + intermediatePeer.expectSaslAnonymous(); + intermediatePeer.expectOpen(); + intermediatePeer.expectBegin(); + intermediatePeer.expectBegin(false); + intermediatePeer.dropAfterLastHandler(); + + finalPeer.expectSaslAnonymous(); + finalPeer.expectOpen(); + finalPeer.expectBegin(); + finalPeer.expectBegin(); + finalPeer.expectBegin(); + finalPeer.expectReceiverAttach(); + finalPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, amqpValueNullContent); + finalPeer.expectDispositionThatIsAcceptedAndSettled(); + finalPeer.expectReceiverAttach(); + finalPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, amqpValueNullContent); + finalPeer.expectDispositionThatIsAcceptedAndSettled(); + finalPeer.expectClose(); + + final JmsConnection connection = establishAnonymousConnecton(originalPeer, intermediatePeer, finalPeer); + + try { + connection.start(); + } catch (Exception ex) { + fail("Should not have thrown an Exception: " + ex); + } + + Session session1 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Session session2 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + Queue queue1 = session1.createQueue("myQueue"); + MessageConsumer consumer1 = session1.createConsumer(queue1); + Message message1 = consumer1.receive(2000); + + Queue queue2 = session2.createQueue("myQueue"); + MessageConsumer consumer2 = session2.createConsumer(queue2); + Message message2 = consumer2.receive(2000); + + connection.close(); + + originalPeer.waitForAllHandlersToComplete(1000); + intermediatePeer.waitForAllHandlersToComplete(1000); + finalPeer.waitForAllHandlersToComplete(1000); + + assertNotNull(message1); + assertTrue(message1 instanceof TextMessage); + assertEquals(content, ((TextMessage) message1).getText()); + assertNotNull(message2); + assertTrue(message2 instanceof TextMessage); + assertEquals(content, ((TextMessage) message2).getText()); + } + } + + @Test(timeout = 20000) + public void testMultipleSenderCreationRecoversAfterDropWithNoAttachResponseAndFailedRecoveryAttempt() throws Exception { + try (TestAmqpPeer originalPeer = new TestAmqpPeer(); + TestAmqpPeer intermediatePeer = new TestAmqpPeer(); + TestAmqpPeer finalPeer = new TestAmqpPeer();) { + + originalPeer.expectSaslAnonymous(); + originalPeer.expectOpen(); + originalPeer.expectBegin(); + originalPeer.expectBegin(); + originalPeer.expectSenderAttach(); + originalPeer.expectSenderAttachButDoNotRespond(); + originalPeer.dropAfterLastHandler(20); + + intermediatePeer.expectSaslAnonymous(); + intermediatePeer.expectOpen(); + intermediatePeer.expectBegin(); + intermediatePeer.expectBegin(); + intermediatePeer.expectSenderAttachButDoNotRespond(); + intermediatePeer.dropAfterLastHandler(); + + finalPeer.expectSaslAnonymous(); + finalPeer.expectOpen(); + finalPeer.expectBegin(); + finalPeer.expectBegin(); + finalPeer.expectSenderAttach(); + finalPeer.expectSenderAttach(); + finalPeer.expectClose(); + + final JmsConnection connection = establishAnonymousConnecton(originalPeer, intermediatePeer, finalPeer); + + try { + connection.start(); + } catch (Exception ex) { + fail("Should not have thrown an Exception: " + ex); + } + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue("myQueue"); + + MessageProducer producer1 = session.createProducer(queue); + MessageProducer producer2 = session.createProducer(queue); + + assertNotNull(producer1); + assertNotNull(producer2); + + assertEquals(queue, producer1.getDestination()); + assertEquals(queue, producer2.getDestination()); + + connection.close(); + + originalPeer.waitForAllHandlersToComplete(1000); + intermediatePeer.waitForAllHandlersToComplete(1000); + finalPeer.waitForAllHandlersToComplete(1000); + } + } + + @Test(timeout = 20000) + public void testSenderAndReceiverCreationRecoversAfterDropWithNoAttachResponseAndFailedRecoveryAttempt() throws Exception { + try (TestAmqpPeer originalPeer = new TestAmqpPeer(); + TestAmqpPeer intermediatePeer = new TestAmqpPeer(); + TestAmqpPeer finalPeer = new TestAmqpPeer();) { + + originalPeer.expectSaslAnonymous(); + originalPeer.expectOpen(); + originalPeer.expectBegin(); + originalPeer.expectBegin(); + originalPeer.expectSenderAttach(); + originalPeer.expectReceiverAttachButDoNotRespond(); + originalPeer.dropAfterLastHandler(20); + + intermediatePeer.expectSaslAnonymous(); + intermediatePeer.expectOpen(); + intermediatePeer.expectBegin(); + intermediatePeer.expectBegin(); + intermediatePeer.expectSenderAttachButDoNotRespond(); + intermediatePeer.dropAfterLastHandler(10); + + finalPeer.expectSaslAnonymous(); + finalPeer.expectOpen(); + finalPeer.expectBegin(); + finalPeer.expectBegin(); + finalPeer.expectSenderAttach(); + finalPeer.expectReceiverAttach(); + finalPeer.expectLinkFlow(); + finalPeer.expectClose(); + + final JmsConnection connection = establishAnonymousConnecton(originalPeer, intermediatePeer, finalPeer); + + try { + connection.start(); + } catch (Exception ex) { + fail("Should not have thrown an Exception: " + ex); + } + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue("myQueue"); + + MessageProducer producer = session.createProducer(queue); + MessageConsumer consumer = session.createConsumer(queue); + + assertNotNull(producer); + assertNotNull(consumer); + + assertEquals(queue, producer.getDestination()); + assertNull(consumer.getMessageListener()); + + connection.close(); + + originalPeer.waitForAllHandlersToComplete(1000); + intermediatePeer.waitForAllHandlersToComplete(1000); + finalPeer.waitForAllHandlersToComplete(1000); + } + } + private JmsConnection establishAnonymousConnecton(TestAmqpPeer... peers) throws JMSException { return establishAnonymousConnecton(null, null, peers); } diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java index 6d54148..388323b 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java @@ -1019,6 +1019,11 @@ public class TestAmqpPeer implements AutoCloseable expectBegin(notNullValue(), true); } + public void expectBegin(boolean sendResponse) + { + expectBegin(notNullValue(), sendResponse); + } + public void expectBegin(Matcher<?> outgoingWindowMatcher, boolean sendResponse) { final BeginMatcher beginMatcher = new BeginMatcher() @@ -1240,6 +1245,20 @@ public class TestAmqpPeer implements AutoCloseable addHandler(attachMatcher); } + public void expectSenderAttachButDoNotRespond() + { + final AttachMatcher attachMatcher = new AttachMatcher() + .withName(notNullValue()) + .withHandle(notNullValue()) + .withRole(equalTo(Role.SENDER)) + .withSndSettleMode(Matchers.oneOf(SenderSettleMode.SETTLED, SenderSettleMode.UNSETTLED)) + .withRcvSettleMode(equalTo(ReceiverSettleMode.FIRST)) + .withSource(notNullValue()) + .withTarget(notNullValue()); + + addHandler(attachMatcher); + } + public void expectSenderAttach() { expectSenderAttach(notNullValue(), false, false); @@ -1433,6 +1452,20 @@ public class TestAmqpPeer implements AutoCloseable expectReceiverAttach(notNullValue(), notNullValue(), true); } + public void expectReceiverAttachButDoNotRespond() + { + final AttachMatcher attachMatcher = new AttachMatcher() + .withName(notNullValue()) + .withHandle(notNullValue()) + .withRole(equalTo(Role.RECEIVER)) + .withSndSettleMode(Matchers.oneOf(SenderSettleMode.SETTLED, SenderSettleMode.UNSETTLED)) + .withRcvSettleMode(equalTo(ReceiverSettleMode.FIRST)) + .withSource(notNullValue()) + .withTarget(notNullValue()); + + addHandler(attachMatcher); + } + public void expectReceiverAttach() { expectReceiverAttach(notNullValue(), notNullValue()); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org