This is an automated email from the ASF dual-hosted git repository. robbie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/qpid-jms.git
The following commit(s) were added to refs/heads/master by this push: new 44b73fc QPIDJMS-526: fire the ExceptionListener when handling remote Session closure if it has a consumer with a MessageListener 44b73fc is described below commit 44b73fc591cc2b3907d6b5ee3416f9eec9b56e28 Author: Robbie Gemmell <rob...@apache.org> AuthorDate: Wed Mar 10 16:56:54 2021 +0000 QPIDJMS-526: fire the ExceptionListener when handling remote Session closure if it has a consumer with a MessageListener --- .../main/java/org/apache/qpid/jms/JmsSession.java | 15 ++- .../jms/integration/SessionIntegrationTest.java | 103 +++++++++++++++++++++ 2 files changed, 116 insertions(+), 2 deletions(-) diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java index dc4b303..274d99c 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java @@ -327,7 +327,9 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe shutdown(null); } - protected void shutdown(Throwable cause) throws JMSException { + protected boolean shutdown(Throwable cause) throws JMSException { + boolean listenerPresent = false; + if (closed.compareAndSet(false, true)) { JMSException shutdownError = null; @@ -339,6 +341,10 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe stop(); for (JmsMessageConsumer consumer : new ArrayList<JmsMessageConsumer>(this.consumers.values())) { + if(consumer.hasMessageListener()) { + listenerPresent = true; + } + consumer.shutdown(cause); } @@ -398,13 +404,18 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe connection.removeSession(sessionInfo); } } + + return listenerPresent; } //----- Events fired when resource remotely closed due to some error -----// void sessionClosed(Throwable cause) { try { - shutdown(cause); + boolean listenerPresent = shutdown(cause); + if (listenerPresent) { + connection.onAsyncException(JmsExceptionSupport.create(cause)); + } } catch (Throwable error) { LOG.trace("Ignoring exception thrown during cleanup of closed session", error); } diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java index a69624e..0bab49e 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java @@ -39,6 +39,7 @@ import java.util.Random; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import javax.jms.CompletionListener; import javax.jms.Connection; @@ -2689,4 +2690,106 @@ public class SessionIntegrationTest extends QpidJmsTestCase { connection.close(); } } + + @Test(timeout = 20000) + public void testRemotelyEndSessionWithMessageListener() throws Exception { + final String BREAD_CRUMB = "ErrorDescriptionBreadCrumb"; + + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer, "?jms.prefetchPolicy.all=0"); + + final CountDownLatch exceptionListenerFired = new CountDownLatch(1); + final AtomicReference<JMSException> asyncError = new AtomicReference<JMSException>(); + connection.setExceptionListener(ex -> { + asyncError.compareAndSet(null, ex); + exceptionListenerFired.countDown(); + }); + + final CountDownLatch sessionClosed = new CountDownLatch(1); + connection.addConnectionListener(new JmsDefaultConnectionListener() { + @Override + public void onSessionClosed(Session session, Throwable exception) { + sessionClosed.countDown(); + } + }); + + testPeer.expectBegin(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue("myQueue"); + + // Create a consumer + testPeer.expectReceiverAttach(); + final MessageConsumer consumer = session.createConsumer(queue); + + // Expect credit to be sent when listener is added, then remotely close the session. + testPeer.expectLinkFlow(); + testPeer.remotelyEndLastOpenedSession(true, 0, AmqpError.RESOURCE_LIMIT_EXCEEDED, BREAD_CRUMB); + + consumer.setMessageListener(m -> { + // No-op + }); + + // Verify ExceptionListener fired + assertTrue("ExceptionListener did not fire", exceptionListenerFired.await(5, TimeUnit.SECONDS)); + + JMSException jmsException = asyncError.get(); + assertNotNull("Exception from listener was not set", jmsException); + String message = jmsException.getMessage(); + assertTrue(message.contains(AmqpError.RESOURCE_LIMIT_EXCEEDED.toString()) && message.contains(BREAD_CRUMB)); + + // Verify the session (and consumer) got marked closed and listener fired + testPeer.waitForAllHandlersToComplete(1000); + assertTrue("Session closed callback did not fire", sessionClosed.await(5, TimeUnit.SECONDS)); + assertTrue("consumer never closed.", verifyConsumerClosure(BREAD_CRUMB, consumer)); + assertTrue("session never closed.", verifySessionClosure(BREAD_CRUMB, session)); + + // Try closing consumer and session explicitly, should effectively no-op in client. + // The test peer will throw during close if it sends anything unexpected. + consumer.close(); + session.close(); + + testPeer.expectClose(); + connection.close(); + } + } + + private boolean verifyConsumerClosure(final String BREAD_CRUMB, final MessageConsumer consumer) throws Exception { + return Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisfied() throws Exception { + try { + consumer.getMessageListener(); + } catch (IllegalStateException jmsise) { + if (jmsise.getCause() != null) { + String message = jmsise.getCause().getMessage(); + return message.contains(AmqpError.RESOURCE_LIMIT_EXCEEDED.toString()) && + message.contains(BREAD_CRUMB); + } else { + return false; + } + } + return false; + } + }, 5000, 10); + } + + private boolean verifySessionClosure(final String BREAD_CRUMB, final Session session) throws Exception { + return Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisfied() throws Exception { + try { + session.getTransacted(); + } catch (IllegalStateException jmsise) { + if (jmsise.getCause() != null) { + String message = jmsise.getCause().getMessage(); + return message.contains(AmqpError.RESOURCE_LIMIT_EXCEEDED.toString()) && + message.contains(BREAD_CRUMB); + } else { + return false; + } + } + return false; + } + }, 5000, 10); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org