This is an automated email from the ASF dual-hosted git repository.

robbie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-jms.git


The following commit(s) were added to refs/heads/master by this push:
     new 44b73fc  QPIDJMS-526: fire the ExceptionListener when handling remote 
Session closure if it has a consumer with a MessageListener
44b73fc is described below

commit 44b73fc591cc2b3907d6b5ee3416f9eec9b56e28
Author: Robbie Gemmell <rob...@apache.org>
AuthorDate: Wed Mar 10 16:56:54 2021 +0000

    QPIDJMS-526: fire the ExceptionListener when handling remote Session 
closure if it has a consumer with a MessageListener
---
 .../main/java/org/apache/qpid/jms/JmsSession.java  |  15 ++-
 .../jms/integration/SessionIntegrationTest.java    | 103 +++++++++++++++++++++
 2 files changed, 116 insertions(+), 2 deletions(-)

diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
index dc4b303..274d99c 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
@@ -327,7 +327,9 @@ public class JmsSession implements AutoCloseable, Session, 
QueueSession, TopicSe
         shutdown(null);
     }
 
-    protected void shutdown(Throwable cause) throws JMSException {
+    protected boolean shutdown(Throwable cause) throws JMSException {
+        boolean listenerPresent = false;
+
         if (closed.compareAndSet(false, true)) {
             JMSException shutdownError = null;
 
@@ -339,6 +341,10 @@ public class JmsSession implements AutoCloseable, Session, 
QueueSession, TopicSe
                     stop();
 
                     for (JmsMessageConsumer consumer : new 
ArrayList<JmsMessageConsumer>(this.consumers.values())) {
+                        if(consumer.hasMessageListener()) {
+                            listenerPresent = true;
+                        }
+
                         consumer.shutdown(cause);
                     }
 
@@ -398,13 +404,18 @@ public class JmsSession implements AutoCloseable, 
Session, QueueSession, TopicSe
                 connection.removeSession(sessionInfo);
             }
         }
+
+        return listenerPresent;
     }
 
     //----- Events fired when resource remotely closed due to some error 
-----//
 
     void sessionClosed(Throwable cause) {
         try {
-            shutdown(cause);
+            boolean listenerPresent = shutdown(cause);
+            if (listenerPresent) {
+                connection.onAsyncException(JmsExceptionSupport.create(cause));
+            }
         } catch (Throwable error) {
             LOG.trace("Ignoring exception thrown during cleanup of closed 
session", error);
         }
diff --git 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
index a69624e..0bab49e 100644
--- 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
+++ 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
@@ -39,6 +39,7 @@ import java.util.Random;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 
 import javax.jms.CompletionListener;
 import javax.jms.Connection;
@@ -2689,4 +2690,106 @@ public class SessionIntegrationTest extends 
QpidJmsTestCase {
             connection.close();
         }
     }
+
+    @Test(timeout = 20000)
+    public void testRemotelyEndSessionWithMessageListener() throws Exception {
+        final String BREAD_CRUMB = "ErrorDescriptionBreadCrumb";
+
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            JmsConnection connection = (JmsConnection) 
testFixture.establishConnecton(testPeer, "?jms.prefetchPolicy.all=0");
+
+            final CountDownLatch exceptionListenerFired = new 
CountDownLatch(1);
+            final AtomicReference<JMSException> asyncError = new 
AtomicReference<JMSException>();
+            connection.setExceptionListener(ex -> {
+                asyncError.compareAndSet(null, ex);
+                exceptionListenerFired.countDown();
+            });
+
+            final CountDownLatch sessionClosed = new CountDownLatch(1);
+            connection.addConnectionListener(new 
JmsDefaultConnectionListener() {
+                @Override
+                public void onSessionClosed(Session session, Throwable 
exception) {
+                    sessionClosed.countDown();
+                }
+            });
+
+            testPeer.expectBegin();
+            Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+            Queue queue = session.createQueue("myQueue");
+
+            // Create a consumer
+            testPeer.expectReceiverAttach();
+            final MessageConsumer consumer = session.createConsumer(queue);
+
+            // Expect credit to be sent when listener is added, then remotely 
close the session.
+            testPeer.expectLinkFlow();
+            testPeer.remotelyEndLastOpenedSession(true, 0, 
AmqpError.RESOURCE_LIMIT_EXCEEDED, BREAD_CRUMB);
+
+            consumer.setMessageListener(m -> {
+                // No-op
+            });
+
+            // Verify ExceptionListener fired
+            assertTrue("ExceptionListener did not fire", 
exceptionListenerFired.await(5, TimeUnit.SECONDS));
+
+            JMSException jmsException = asyncError.get();
+            assertNotNull("Exception from listener was not set", jmsException);
+            String message = jmsException.getMessage();
+            
assertTrue(message.contains(AmqpError.RESOURCE_LIMIT_EXCEEDED.toString()) && 
message.contains(BREAD_CRUMB));
+
+            // Verify the session (and  consumer) got marked closed and 
listener fired
+            testPeer.waitForAllHandlersToComplete(1000);
+            assertTrue("Session closed callback did not fire", 
sessionClosed.await(5, TimeUnit.SECONDS));
+            assertTrue("consumer never closed.", 
verifyConsumerClosure(BREAD_CRUMB, consumer));
+            assertTrue("session never closed.", 
verifySessionClosure(BREAD_CRUMB, session));
+
+            // Try closing consumer and session explicitly, should effectively 
no-op in client.
+            // The test peer will throw during close if it sends anything 
unexpected.
+            consumer.close();
+            session.close();
+
+            testPeer.expectClose();
+            connection.close();
+        }
+    }
+
+    private boolean verifyConsumerClosure(final String BREAD_CRUMB, final 
MessageConsumer consumer) throws Exception {
+        return Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisfied() throws Exception {
+                try {
+                    consumer.getMessageListener();
+                } catch (IllegalStateException jmsise) {
+                    if (jmsise.getCause() != null) {
+                        String message = jmsise.getCause().getMessage();
+                        return 
message.contains(AmqpError.RESOURCE_LIMIT_EXCEEDED.toString()) &&
+                                message.contains(BREAD_CRUMB);
+                    } else {
+                        return false;
+                    }
+                }
+                return false;
+            }
+        }, 5000, 10);
+    }
+
+    private boolean verifySessionClosure(final String BREAD_CRUMB, final 
Session session) throws Exception {
+        return Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisfied() throws Exception {
+                try {
+                    session.getTransacted();
+                } catch (IllegalStateException jmsise) {
+                    if (jmsise.getCause() != null) {
+                        String message = jmsise.getCause().getMessage();
+                        return 
message.contains(AmqpError.RESOURCE_LIMIT_EXCEEDED.toString()) &&
+                                message.contains(BREAD_CRUMB);
+                    } else {
+                        return false;
+                    }
+                }
+                return false;
+            }
+        }, 5000, 10);
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to