This is an automated email from the ASF dual-hosted git repository.
robbie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-jms.git
The following commit(s) were added to refs/heads/master by this push:
new 44b73fc QPIDJMS-526: fire the ExceptionListener when handling remote
Session closure if it has a consumer with a MessageListener
44b73fc is described below
commit 44b73fc591cc2b3907d6b5ee3416f9eec9b56e28
Author: Robbie Gemmell
AuthorDate: Wed Mar 10 16:56:54 2021 +
QPIDJMS-526: fire the ExceptionListener when handling remote Session
closure if it has a consumer with a MessageListener
---
.../main/java/org/apache/qpid/jms/JmsSession.java | 15 ++-
.../jms/integration/SessionIntegrationTest.java| 103 +
2 files changed, 116 insertions(+), 2 deletions(-)
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
index dc4b303..274d99c 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
@@ -327,7 +327,9 @@ public class JmsSession implements AutoCloseable, Session,
QueueSession, TopicSe
shutdown(null);
}
-protected void shutdown(Throwable cause) throws JMSException {
+protected boolean shutdown(Throwable cause) throws JMSException {
+boolean listenerPresent = false;
+
if (closed.compareAndSet(false, true)) {
JMSException shutdownError = null;
@@ -339,6 +341,10 @@ public class JmsSession implements AutoCloseable, Session,
QueueSession, TopicSe
stop();
for (JmsMessageConsumer consumer : new
ArrayList(this.consumers.values())) {
+if(consumer.hasMessageListener()) {
+listenerPresent = true;
+}
+
consumer.shutdown(cause);
}
@@ -398,13 +404,18 @@ public class JmsSession implements AutoCloseable,
Session, QueueSession, TopicSe
connection.removeSession(sessionInfo);
}
}
+
+return listenerPresent;
}
//- Events fired when resource remotely closed due to some error
-//
void sessionClosed(Throwable cause) {
try {
-shutdown(cause);
+boolean listenerPresent = shutdown(cause);
+if (listenerPresent) {
+connection.onAsyncException(JmsExceptionSupport.create(cause));
+}
} catch (Throwable error) {
LOG.trace("Ignoring exception thrown during cleanup of closed
session", error);
}
diff --git
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
index a69624e..0bab49e 100644
---
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
+++
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
@@ -39,6 +39,7 @@ import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
import javax.jms.CompletionListener;
import javax.jms.Connection;
@@ -2689,4 +2690,106 @@ public class SessionIntegrationTest extends
QpidJmsTestCase {
connection.close();
}
}
+
+@Test(timeout = 2)
+public void testRemotelyEndSessionWithMessageListener() throws Exception {
+final String BREAD_CRUMB = "ErrorDescriptionBreadCrumb";
+
+try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+JmsConnection connection = (JmsConnection)
testFixture.establishConnecton(testPeer, "?jms.prefetchPolicy.all=0");
+
+final CountDownLatch exceptionListenerFired = new
CountDownLatch(1);
+final AtomicReference asyncError = new
AtomicReference();
+connection.setExceptionListener(ex -> {
+asyncError.compareAndSet(null, ex);
+exceptionListenerFired.countDown();
+});
+
+final CountDownLatch sessionClosed = new CountDownLatch(1);
+connection.addConnectionListener(new
JmsDefaultConnectionListener() {
+@Override
+public void onSessionClosed(Session session, Throwable
exception) {
+sessionClosed.countDown();
+}
+});
+
+testPeer.expectBegin();
+Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+Queue queue = session.createQueue("myQueue");
+
+// Create a consumer
+testPeer.expectReceiverAttach();
+final MessageConsum