This is an automated email from the ASF dual-hosted git repository. robbie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/qpid-jms.git
The following commit(s) were added to refs/heads/master by this push: new 4a9aab8 QPIDJMS-474: better handle connection failure mid-creation on transacted session 4a9aab8 is described below commit 4a9aab83bca11f7346215a827266e5039df1393d Author: Robbie Gemmell <rob...@apache.org> AuthorDate: Thu Sep 19 15:03:59 2019 +0100 QPIDJMS-474: better handle connection failure mid-creation on transacted session --- .../java/org/apache/qpid/jms/JmsConnection.java | 6 +++- .../qpid/jms/JmsLocalTransactionContext.java | 8 ++++- .../jms/integration/ConnectionIntegrationTest.java | 37 ++++++++++++++++++++++ .../provider/failover/FailoverIntegrationTest.java | 20 +++++++++--- 4 files changed, 65 insertions(+), 6 deletions(-) diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java index 5afcb3f..6df1d6e 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java @@ -1358,7 +1358,11 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection // Signal that connection dropped we need to mark transactions as // failed, deliver failure events to asynchronous send completions etc. for (JmsSession session : sessions.values()) { - session.onConnectionInterrupted(); + try { + session.onConnectionInterrupted(); + } catch (Throwable t) { + LOG.warn("Exception while marking session interrupted", t); + } } onProviderException(ex); diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsLocalTransactionContext.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsLocalTransactionContext.java index 1452478..0b0b370 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsLocalTransactionContext.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsLocalTransactionContext.java @@ -251,6 +251,10 @@ public class JmsLocalTransactionContext implements JmsTransactionContext { private void doRollback(boolean startNewTx) throws JMSException { lock.writeLock().lock(); try { + if(transactionInfo == null) { + return; + } + LOG.debug("Rollback: {}", transactionInfo.getId()); JmsTransactionId oldTransactionId = transactionInfo.getId(); final JmsTransactionInfo nextTx; @@ -331,7 +335,9 @@ public class JmsLocalTransactionContext implements JmsTransactionContext { public void onConnectionInterrupted() { lock.writeLock().tryLock(); try { - transactionInfo.setInDoubt(true); + if(transactionInfo != null) { + transactionInfo.setInDoubt(true); + } } finally { if (lock.writeLock().isHeldByCurrentThread()) { lock.writeLock().unlock(); diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java index 9a272e9..4d1a9b2 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java @@ -75,6 +75,8 @@ import org.apache.qpid.jms.test.testpeer.basictypes.ConnectionError; import org.apache.qpid.jms.test.testpeer.matchers.CoordinatorMatcher; import org.apache.qpid.jms.test.testpeer.matchers.sections.TransferPayloadCompositeMatcher; import org.apache.qpid.jms.util.MetaDataSupport; +import org.apache.qpid.jms.util.QpidJMSTestRunner; +import org.apache.qpid.jms.util.Repeat; import org.apache.qpid.proton.amqp.Binary; import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.amqp.UnsignedInteger; @@ -83,7 +85,9 @@ import org.apache.qpid.proton.engine.impl.AmqpHeader; import org.hamcrest.Matcher; import org.junit.Ignore; import org.junit.Test; +import org.junit.runner.RunWith; +@RunWith(QpidJMSTestRunner.class) public class ConnectionIntegrationTest extends QpidJmsTestCase { private final IntegrationTestFixture testFixture = new IntegrationTestFixture(); @@ -329,6 +333,39 @@ public class ConnectionIntegrationTest extends QpidJmsTestCase { } } + @Repeat(repetitions = 1) + @Test(timeout = 20000) + public void testRemotelyDropConnectionDuringSessionCreationTransacted() throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + testPeer.expectSaslAnonymous(); + testPeer.expectOpen(); + testPeer.expectBegin(); + + ConnectionFactory factory = new JmsConnectionFactory("amqp://localhost:" + testPeer.getServerPort() + "?jms.clientID=foo"); + Connection connection = factory.createConnection(); + + CountDownLatch exceptionListenerFired = new CountDownLatch(1); + connection.setExceptionListener(ex -> exceptionListenerFired.countDown()); + + // Expect the begin, then drop connection without without a close frame before the tx-coordinator setup. + testPeer.expectBegin(); + testPeer.dropAfterLastHandler(); + + try { + connection.createSession(true, Session.SESSION_TRANSACTED); + fail("Expected exception to be thrown"); + } catch (JMSException jmse) { + // Expected + } + + assertTrue("Exception listener did not fire", exceptionListenerFired.await(5, TimeUnit.SECONDS)); + + testPeer.waitForAllHandlersToComplete(3000); + + connection.close(); + } + } + @Test(timeout = 20000) public void testConnectionPropertiesContainExpectedMetaData() throws Exception { try (TestAmqpPeer testPeer = new TestAmqpPeer();) { diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java index 39a9a94..5e572b3 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java @@ -1642,6 +1642,15 @@ public class FailoverIntegrationTest extends QpidJmsTestCase { @Test(timeout=20000) public void testTxRecreatedAfterConnectionFailsOver() throws Exception { + doTxRecreatedAfterConnectionFailsOver(true); + } + + @Test(timeout=20000) + public void testTxRecreatedAfterConnectionFailsOver2() throws Exception { + doTxRecreatedAfterConnectionFailsOver(false); + } + + private void doTxRecreatedAfterConnectionFailsOver(boolean dropAfterCoordinator) throws Exception { try (TestAmqpPeer originalPeer = new TestAmqpPeer(); TestAmqpPeer finalPeer = new TestAmqpPeer();) { @@ -1682,12 +1691,15 @@ public class FailoverIntegrationTest extends QpidJmsTestCase { assertTrue("Should connect to original peer", originalConnected.await(5, TimeUnit.SECONDS)); originalPeer.expectBegin(); - originalPeer.expectCoordinatorAttach(); - // First expect an unsettled 'declare' transfer to the txn coordinator, and - // reply with a Declared disposition state containing the txnId. Binary txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8}); - originalPeer.expectDeclare(txnId); + if(dropAfterCoordinator) { + originalPeer.expectCoordinatorAttach(); + + // First expect an unsettled 'declare' transfer to the txn coordinator, and + // reply with a Declared disposition state containing the txnId. + originalPeer.expectDeclare(txnId); + } originalPeer.dropAfterLastHandler(); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org