This is an automated email from the ASF dual-hosted git repository. robbie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/qpid-jms.git
The following commit(s) were added to refs/heads/master by this push: new 8fea843 QPIDJMS-521: ensure ExceptionListener called during unexpected async dispatch failure is allowed to close the Session/Connection 8fea843 is described below commit 8fea843e9b46457f4dda3b21b35c3c16558b723f Author: Robbie Gemmell <rob...@apache.org> AuthorDate: Fri Nov 27 16:03:08 2020 +0000 QPIDJMS-521: ensure ExceptionListener called during unexpected async dispatch failure is allowed to close the Session/Connection --- .../org/apache/qpid/jms/JmsMessageConsumer.java | 2 + .../main/java/org/apache/qpid/jms/JmsSession.java | 7 ++- .../jms/integration/ConsumerIntegrationTest.java | 67 ++++++++++++++++++++++ 3 files changed, 75 insertions(+), 1 deletion(-) diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java index 4afe91c..a8a6b62 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java @@ -741,6 +741,7 @@ public class JmsMessageConsumer implements AutoCloseable, MessageConsumer, JmsMe if (dispatchLock.isHeldByCurrentThread()) { reclaimLock = true; + session.setDeliveryThreadCheckEnabled(false); dispatchLock.unlock(); } @@ -749,6 +750,7 @@ public class JmsMessageConsumer implements AutoCloseable, MessageConsumer, JmsMe } finally { if (reclaimLock) { dispatchLock.lock(); + session.setDeliveryThreadCheckEnabled(true); } } } diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java index bf6d5cd..d9c2c42 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java @@ -127,6 +127,7 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe private volatile ThreadPoolExecutor deliveryExecutor; private volatile ThreadPoolExecutor completionExcecutor; private AtomicReference<Thread> deliveryThread = new AtomicReference<Thread>(); + private boolean deliveryThreadCheckEnabled = true; private AtomicReference<Thread> completionThread = new AtomicReference<Thread>(); private final AtomicLong consumerIdGenerator = new AtomicLong(); @@ -1277,8 +1278,12 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe } } + void setDeliveryThreadCheckEnabled(boolean enabled) { + deliveryThreadCheckEnabled = enabled; + } + void checkIsDeliveryThread() throws JMSException { - if (Thread.currentThread().equals(deliveryThread.get())) { + if (deliveryThreadCheckEnabled && Thread.currentThread().equals(deliveryThread.get())) { throw new IllegalStateException("Illegal invocation from MessageListener callback"); } } diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java index e0e751c..ca4e973 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java @@ -2437,4 +2437,71 @@ public class ConsumerIntegrationTest extends QpidJmsTestCase { assertEquals("Message payloads not as expected", expectedPayloads, receivedPayloads); } } + + @Test(timeout=20000) + public void testClosingSessionAndConnectionWithinExceptionListenerDueToAsyncConsumerDeliveryFailure() throws Exception { + final CountDownLatch exceptionListenerCalled = new CountDownLatch(1); + final CountDownLatch exceptionListenerCompleted = new CountDownLatch(1); + final AtomicReference<Throwable> asyncError = new AtomicReference<Throwable>(null); + final AtomicBoolean messageListenerCalled = new AtomicBoolean(); + + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + final Connection connection = testFixture.establishConnecton(testPeer); + connection.start(); + + testPeer.expectBegin(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue destination = session.createQueue(getTestName()); + connection.start(); + + final PropertiesDescribedType properties = new PropertiesDescribedType(); + properties.setContentType(Symbol.valueOf("text/plain;charset=utf-8")); + + byte[] invalidPayload = new byte[2]; // Add two for malformed UTF8 + invalidPayload[0] = (byte) 0b11000111; // The prefix for a two-byte UTF8 encoding + invalidPayload[1] = (byte) 0b00110000; // An invalid next byte, as encoding must be 0b10xxxxxx + DescribedType invalidUTF8DataContent = new DataDescribedType(new Binary(invalidPayload)); + + testPeer.expectReceiverAttach(); + testPeer.expectLinkFlowRespondWithTransfer(null, null, properties, null, invalidUTF8DataContent, 1); + + final ModifiedMatcher modifiedFailedUndeliverableMatcher = new ModifiedMatcher(); + modifiedFailedUndeliverableMatcher.withDeliveryFailed(equalTo(true)); + modifiedFailedUndeliverableMatcher.withUndeliverableHere(equalTo(true)); + + MessageConsumer consumer = session.createConsumer(destination); + + testPeer.waitForAllHandlersToComplete(2000); + + testPeer.expectDisposition(true, modifiedFailedUndeliverableMatcher); + testPeer.expectEnd(); + testPeer.expectClose(); + + connection.setExceptionListener(exception -> { + try { + exceptionListenerCalled.countDown(); + session.close(); + connection.close(); + } catch (Exception ex) { + asyncError.set(ex); + } finally { + exceptionListenerCompleted.countDown(); + } + }); + + consumer.setMessageListener(m -> messageListenerCalled.set(true)); + + assertTrue("Exception listener was not fired within given timeout", + exceptionListenerCalled.await(4000, TimeUnit.MILLISECONDS)); + + assertTrue("Exception listener didnt complete within given timeout", + exceptionListenerCompleted.await(4000, TimeUnit.MILLISECONDS)); + + assertNull("Unexpected failure during exception listener handling", asyncError.get()); + assertFalse("Message listener should not have been called due to decoding error", messageListenerCalled.get()); + + testPeer.waitForAllHandlersToComplete(2000); + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org