This is an automated email from the ASF dual-hosted git repository.

tabish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-jms.git


The following commit(s) were added to refs/heads/main by this push:
     new 90eb60f5 QPIDJMS-600 Ensure session and connection close await async 
sends
90eb60f5 is described below

commit 90eb60f59cb59b7b9ad8363ee8a843d6903b8e77
Author: Timothy Bish <[email protected]>
AuthorDate: Fri Apr 19 18:58:03 2024 -0400

    QPIDJMS-600 Ensure session and connection close await async sends
    
    Session and Connection close should be awaiting the outcome of async send
    completions before returning. This change allows them to await up to the
    close timeout value before moving on and failing any completions that are
    not completed after that point. Several tests added to cover this behavior.
---
 .../java/org/apache/qpid/jms/JmsConnection.java    |  50 +++---
 .../main/java/org/apache/qpid/jms/JmsSession.java  | 125 ++++++++-----
 .../integration/BytesMessageIntegrationTest.java   |  28 +--
 .../jms/integration/MapMessageIntegrationTest.java |  26 +--
 .../jms/integration/MessageIntegrationTest.java    |  32 ++--
 .../integration/ObjectMessageIntegrationTest.java  |  27 +--
 .../jms/integration/ProducerIntegrationTest.java   | 193 +++++++++++++++++++--
 .../jms/integration/SessionIntegrationTest.java    | 122 ++++++++++---
 .../integration/StreamMessageIntegrationTest.java  |  25 +--
 .../integration/TextMessageIntegrationTest.java    |  27 +--
 .../provider/failover/FailoverIntegrationTest.java | 107 +++++++++---
 11 files changed, 552 insertions(+), 210 deletions(-)

diff --git 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
index 2f4e53f9..ce83ed4e 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
@@ -29,27 +29,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
-import jakarta.jms.Connection;
-import jakarta.jms.ConnectionConsumer;
-import jakarta.jms.ConnectionMetaData;
-import jakarta.jms.Destination;
-import jakarta.jms.ExceptionListener;
-import jakarta.jms.IllegalStateException;
-import jakarta.jms.InvalidClientIDException;
-import jakarta.jms.InvalidDestinationException;
-import jakarta.jms.JMSException;
-import jakarta.jms.JMSRuntimeException;
-import jakarta.jms.Queue;
-import jakarta.jms.QueueConnection;
-import jakarta.jms.QueueSession;
-import jakarta.jms.ServerSessionPool;
-import jakarta.jms.Session;
-import jakarta.jms.TemporaryQueue;
-import jakarta.jms.TemporaryTopic;
-import jakarta.jms.Topic;
-import jakarta.jms.TopicConnection;
-import jakarta.jms.TopicSession;
-
 import org.apache.qpid.jms.exceptions.JmsConnectionFailedException;
 import org.apache.qpid.jms.exceptions.JmsExceptionSupport;
 import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
@@ -91,6 +70,27 @@ import org.apache.qpid.jms.util.URISupport;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import jakarta.jms.Connection;
+import jakarta.jms.ConnectionConsumer;
+import jakarta.jms.ConnectionMetaData;
+import jakarta.jms.Destination;
+import jakarta.jms.ExceptionListener;
+import jakarta.jms.IllegalStateException;
+import jakarta.jms.InvalidClientIDException;
+import jakarta.jms.InvalidDestinationException;
+import jakarta.jms.JMSException;
+import jakarta.jms.JMSRuntimeException;
+import jakarta.jms.Queue;
+import jakarta.jms.QueueConnection;
+import jakarta.jms.QueueSession;
+import jakarta.jms.ServerSessionPool;
+import jakarta.jms.Session;
+import jakarta.jms.TemporaryQueue;
+import jakarta.jms.TemporaryTopic;
+import jakarta.jms.Topic;
+import jakarta.jms.TopicConnection;
+import jakarta.jms.TopicSession;
+
 /**
  * Implementation of a JMS Connection
  */
@@ -916,6 +916,14 @@ public class JmsConnection implements AutoCloseable, 
Connection, TopicConnection
         }
     }
 
+    ProviderFuture newProviderFuture() {
+       return newProviderFuture(null);
+    }
+
+    ProviderFuture newProviderFuture(ProviderSynchronization synchronization) {
+        return provider.newProviderFuture(synchronization);
+    }
+
     //----- Property setters and getters 
-------------------------------------//
 
     @Override
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
index 19c5fa89..65036828 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
@@ -39,6 +39,36 @@ import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.function.Consumer;
 
+import org.apache.qpid.jms.exceptions.JmsConnectionFailedException;
+import org.apache.qpid.jms.exceptions.JmsExceptionSupport;
+import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
+import org.apache.qpid.jms.message.JmsMessage;
+import org.apache.qpid.jms.message.JmsMessageTransformation;
+import org.apache.qpid.jms.message.JmsOutboundMessageDispatch;
+import org.apache.qpid.jms.meta.JmsConsumerId;
+import org.apache.qpid.jms.meta.JmsConsumerInfo;
+import org.apache.qpid.jms.meta.JmsProducerId;
+import org.apache.qpid.jms.meta.JmsProducerInfo;
+import org.apache.qpid.jms.meta.JmsResource.ResourceState;
+import org.apache.qpid.jms.meta.JmsSessionId;
+import org.apache.qpid.jms.meta.JmsSessionInfo;
+import org.apache.qpid.jms.policy.JmsDeserializationPolicy;
+import org.apache.qpid.jms.policy.JmsMessageIDPolicy;
+import org.apache.qpid.jms.policy.JmsPrefetchPolicy;
+import org.apache.qpid.jms.policy.JmsPresettlePolicy;
+import org.apache.qpid.jms.policy.JmsRedeliveryPolicy;
+import org.apache.qpid.jms.provider.Provider;
+import org.apache.qpid.jms.provider.ProviderConstants.ACK_TYPE;
+import org.apache.qpid.jms.provider.ProviderException;
+import org.apache.qpid.jms.provider.ProviderFuture;
+import org.apache.qpid.jms.provider.ProviderSynchronization;
+import org.apache.qpid.jms.selector.SelectorParser;
+import org.apache.qpid.jms.selector.filter.FilterException;
+import org.apache.qpid.jms.util.NoOpExecutor;
+import org.apache.qpid.jms.util.QpidJMSThreadFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import jakarta.jms.BytesMessage;
 import jakarta.jms.CompletionListener;
 import jakarta.jms.DeliveryMode;
@@ -71,36 +101,6 @@ import jakarta.jms.TopicPublisher;
 import jakarta.jms.TopicSession;
 import jakarta.jms.TopicSubscriber;
 
-import org.apache.qpid.jms.exceptions.JmsConnectionFailedException;
-import org.apache.qpid.jms.exceptions.JmsExceptionSupport;
-import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
-import org.apache.qpid.jms.message.JmsMessage;
-import org.apache.qpid.jms.message.JmsMessageTransformation;
-import org.apache.qpid.jms.message.JmsOutboundMessageDispatch;
-import org.apache.qpid.jms.meta.JmsConsumerId;
-import org.apache.qpid.jms.meta.JmsConsumerInfo;
-import org.apache.qpid.jms.meta.JmsProducerId;
-import org.apache.qpid.jms.meta.JmsProducerInfo;
-import org.apache.qpid.jms.meta.JmsResource.ResourceState;
-import org.apache.qpid.jms.meta.JmsSessionId;
-import org.apache.qpid.jms.meta.JmsSessionInfo;
-import org.apache.qpid.jms.policy.JmsDeserializationPolicy;
-import org.apache.qpid.jms.policy.JmsMessageIDPolicy;
-import org.apache.qpid.jms.policy.JmsPrefetchPolicy;
-import org.apache.qpid.jms.policy.JmsPresettlePolicy;
-import org.apache.qpid.jms.policy.JmsRedeliveryPolicy;
-import org.apache.qpid.jms.provider.Provider;
-import org.apache.qpid.jms.provider.ProviderConstants.ACK_TYPE;
-import org.apache.qpid.jms.provider.ProviderException;
-import org.apache.qpid.jms.provider.ProviderFuture;
-import org.apache.qpid.jms.provider.ProviderSynchronization;
-import org.apache.qpid.jms.selector.SelectorParser;
-import org.apache.qpid.jms.selector.filter.FilterException;
-import org.apache.qpid.jms.util.NoOpExecutor;
-import org.apache.qpid.jms.util.QpidJMSThreadFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 /**
  * JMS Session implementation
  */
@@ -126,6 +126,7 @@ public class JmsSession implements AutoCloseable, Session, 
QueueSession, TopicSe
     private final ReentrantLock sendLock = new ReentrantLock();
     private volatile ThreadPoolExecutor deliveryExecutor;
     private volatile ThreadPoolExecutor completionExcecutor;
+    private volatile ProviderFuture asyncSendsCompletion;
     private AtomicReference<Thread> deliveryThread = new 
AtomicReference<Thread>();
     private boolean deliveryThreadCheckEnabled = true;
     private AtomicReference<Thread> completionThread = new 
AtomicReference<Thread>();
@@ -351,6 +352,7 @@ public class JmsSession implements AutoCloseable, Session, 
QueueSession, TopicSe
                     for (JmsMessageProducer producer : new 
ArrayList<JmsMessageProducer>(this.producers.values())) {
                         producer.shutdown(cause);
                     }
+
                 } catch (JMSException jmsEx) {
                     shutdownError = jmsEx;
                 }
@@ -367,30 +369,52 @@ public class JmsSession implements AutoCloseable, 
Session, QueueSession, TopicSe
                     }
                 }
 
-                // Ensure that no asynchronous completion sends remain blocked 
after close.
+                try {
+                    if (getSessionMode() == Session.CLIENT_ACKNOWLEDGE) {
+                        acknowledge(ACK_TYPE.SESSION_SHUTDOWN);
+                    }
+                } catch (Exception e) {
+                    LOG.trace("Exception during session shutdown cleanup 
acknowledgement", e);
+                }
+
+                // Ensure that no asynchronous completion sends remain blocked 
after close but wait
+                // using the close timeout for the asynchronous sends to 
complete normally.
+                final ExecutorService completionExecutor = 
getCompletionExecutor();
+
                 synchronized (sessionInfo) {
+                    // Producers are now quiesced and we can await completion 
of asynchronous sends
+                    // that are still pending a result or timeout once we've 
done a quick check to
+                    // see if any are actually pending or have completed 
already.
+                    asyncSendsCompletion = connection.newProviderFuture();
+
+                    completionExecutor.execute(() -> {
+                        if (asyncSendQueue.isEmpty()) {
+                            asyncSendsCompletion.onSuccess();
+                        }
+                    });
+                }
+
+                try {
+                    asyncSendsCompletion.sync(connection.getCloseTimeout(), 
TimeUnit.MILLISECONDS);
+                } catch (Exception ex) {
+                    LOG.trace("Exception during wait for asynchronous sends to 
complete", ex);
+                } finally {
                     if (cause == null) {
                         cause = new JMSException("Session closed remotely 
before message transfer result was notified");
                     }
 
-                    getCompletionExecutor().execute(new 
FailOrCompleteAsyncCompletionsTask(JmsExceptionSupport.create(cause)));
-                    getCompletionExecutor().shutdown();
+                    // as a last task we want to fail any stragglers in the 
asynchronous send queue and then
+                    // shutdown the queue to prevent any more submissions 
while the cleanup goes on.
+                    completionExecutor.execute(new 
FailOrCompleteAsyncCompletionsTask(JmsExceptionSupport.create(cause)));
+                    completionExecutor.shutdown();
                 }
 
                 try {
-                    
getCompletionExecutor().awaitTermination(connection.getCloseTimeout(), 
TimeUnit.MILLISECONDS);
+                    
completionExecutor.awaitTermination(connection.getCloseTimeout(), 
TimeUnit.MILLISECONDS);
                 } catch (InterruptedException e) {
                     LOG.trace("Session close awaiting send completions was 
interrupted");
                 }
 
-                try {
-                    if (getSessionMode() == Session.CLIENT_ACKNOWLEDGE) {
-                        acknowledge(ACK_TYPE.SESSION_SHUTDOWN);
-                    }
-                } catch (Exception e) {
-                    LOG.trace("Exception during session shutdown cleanup 
acknowledgement", e);
-                }
-
                 if (shutdownError != null) {
                     throw shutdownError;
                 }
@@ -856,11 +880,12 @@ public class JmsSession implements AutoCloseable, 
Session, QueueSession, TopicSe
     }
 
     private void send(JmsMessageProducer producer, JmsDestination destination, 
Message original, int deliveryMode, int priority, long timeToLive, boolean 
disableMsgId, boolean disableTimestamp, long deliveryDelay, CompletionListener 
listener) throws JMSException {
-        sendLock.lock();
-
         JmsMessage outbound = null;
+        sendLock.lock();
 
         try {
+            checkClosed();
+
             original.setJMSDeliveryMode(deliveryMode);
             original.setJMSPriority(priority);
             original.setJMSRedelivered(false);
@@ -909,7 +934,7 @@ public class JmsSession implements AutoCloseable, Session, 
QueueSession, TopicSe
             }
 
             outbound.getFacade().setDeliveryTime(deliveryTime, hasDelay);
-            if(!isJmsMessage) {
+            if (!isJmsMessage) {
                 // If the original was a foreign message, we still need to 
update it too.
                 setForeignMessageDeliveryTime(original, deliveryTime);
             }
@@ -977,7 +1002,7 @@ public class JmsSession implements AutoCloseable, Session, 
QueueSession, TopicSe
             }
         } catch (JMSException jmsEx) {
             // Ensure that on failure case the message is returned to usable 
state for another send attempt.
-            if(outbound != null) {
+            if (outbound != null) {
                 outbound.onSendComplete();
             }
             throw jmsEx;
@@ -1511,6 +1536,10 @@ public class JmsSession implements AutoCloseable, 
Session, QueueSession, TopicSe
             if (producerId == null) {
                 asyncSendQueue.clear();
             }
+
+            if (closed.get() && asyncSendsCompletion != null && 
asyncSendQueue.isEmpty()) {
+                asyncSendsCompletion.onSuccess();
+            }
         }
     }
 
@@ -1577,6 +1606,10 @@ public class JmsSession implements AutoCloseable, 
Session, QueueSession, TopicSe
                         }
                     }
                 }
+
+                if (closed.get() && asyncSendsCompletion != null && 
asyncSendQueue.isEmpty()) {
+                    asyncSendsCompletion.onSuccess();
+                }
             } catch (Exception ex) {
                 LOG.error("Async completion task encountered unexpected 
failure", ex);
             }
diff --git 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/BytesMessageIntegrationTest.java
 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/BytesMessageIntegrationTest.java
index 9fabb115..d30955b7 100644
--- 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/BytesMessageIntegrationTest.java
+++ 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/BytesMessageIntegrationTest.java
@@ -30,19 +30,6 @@ import java.io.IOException;
 import java.util.Arrays;
 import java.util.Map;
 
-import jakarta.jms.BytesMessage;
-import jakarta.jms.CompletionListener;
-import jakarta.jms.Connection;
-import jakarta.jms.DeliveryMode;
-import jakarta.jms.JMSException;
-import jakarta.jms.Message;
-import jakarta.jms.MessageConsumer;
-import jakarta.jms.MessageFormatException;
-import jakarta.jms.MessageNotWriteableException;
-import jakarta.jms.MessageProducer;
-import jakarta.jms.Queue;
-import jakarta.jms.Session;
-
 import org.apache.qpid.jms.JmsConnection;
 import org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport;
 import org.apache.qpid.jms.test.QpidJmsTestCase;
@@ -62,6 +49,19 @@ import org.apache.qpid.proton.amqp.Symbol;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
 
+import jakarta.jms.BytesMessage;
+import jakarta.jms.CompletionListener;
+import jakarta.jms.Connection;
+import jakarta.jms.DeliveryMode;
+import jakarta.jms.JMSException;
+import jakarta.jms.Message;
+import jakarta.jms.MessageConsumer;
+import jakarta.jms.MessageFormatException;
+import jakarta.jms.MessageNotWriteableException;
+import jakarta.jms.MessageProducer;
+import jakarta.jms.Queue;
+import jakarta.jms.Session;
+
 public class BytesMessageIntegrationTest extends QpidJmsTestCase {
     private final IntegrationTestFixture testFixture = new 
IntegrationTestFixture();
 
@@ -581,7 +581,7 @@ public class BytesMessageIntegrationTest extends 
QpidJmsTestCase {
     @Timeout(20)
     public void testAsyncCompletionSendMarksBytesMessageReadOnly() throws 
Exception {
         try(TestAmqpPeer testPeer = new TestAmqpPeer();) {
-            JmsConnection connection = (JmsConnection) 
testFixture.establishConnecton(testPeer);
+            JmsConnection connection = (JmsConnection) 
testFixture.establishConnecton(testPeer, "jms.closeTimeout=50");
             connection.setSendTimeout(15000);
 
             testPeer.expectBegin();
diff --git 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MapMessageIntegrationTest.java
 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MapMessageIntegrationTest.java
index e67c9e98..039ac874 100644
--- 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MapMessageIntegrationTest.java
+++ 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MapMessageIntegrationTest.java
@@ -30,18 +30,6 @@ import java.util.Arrays;
 import java.util.LinkedHashMap;
 import java.util.Map;
 
-import jakarta.jms.CompletionListener;
-import jakarta.jms.Connection;
-import jakarta.jms.DeliveryMode;
-import jakarta.jms.MapMessage;
-import jakarta.jms.Message;
-import jakarta.jms.MessageConsumer;
-import jakarta.jms.MessageFormatException;
-import jakarta.jms.MessageNotWriteableException;
-import jakarta.jms.MessageProducer;
-import jakarta.jms.Queue;
-import jakarta.jms.Session;
-
 import org.apache.qpid.jms.JmsConnection;
 import org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport;
 import org.apache.qpid.jms.test.QpidJmsTestCase;
@@ -58,6 +46,18 @@ import org.apache.qpid.proton.amqp.DescribedType;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
 
+import jakarta.jms.CompletionListener;
+import jakarta.jms.Connection;
+import jakarta.jms.DeliveryMode;
+import jakarta.jms.MapMessage;
+import jakarta.jms.Message;
+import jakarta.jms.MessageConsumer;
+import jakarta.jms.MessageFormatException;
+import jakarta.jms.MessageNotWriteableException;
+import jakarta.jms.MessageProducer;
+import jakarta.jms.Queue;
+import jakarta.jms.Session;
+
 public class MapMessageIntegrationTest extends QpidJmsTestCase {
     private final IntegrationTestFixture testFixture = new 
IntegrationTestFixture();
 
@@ -440,7 +440,7 @@ public class MapMessageIntegrationTest extends 
QpidJmsTestCase {
     @Timeout(20)
     public void testAsyncCompletionSendMarksMapMessageReadOnly() throws 
Exception {
         try(TestAmqpPeer testPeer = new TestAmqpPeer();) {
-            JmsConnection connection = (JmsConnection) 
testFixture.establishConnecton(testPeer);
+            JmsConnection connection = (JmsConnection) 
testFixture.establishConnecton(testPeer, "jms.closeTimeout=50");
             connection.setSendTimeout(15000);
 
             testPeer.expectBegin();
diff --git 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MessageIntegrationTest.java
 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MessageIntegrationTest.java
index 5698cf5d..5dd03d4b 100644
--- 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MessageIntegrationTest.java
+++ 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MessageIntegrationTest.java
@@ -38,21 +38,6 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.UUID;
 
-import jakarta.jms.CompletionListener;
-import jakarta.jms.Connection;
-import jakarta.jms.DeliveryMode;
-import jakarta.jms.Destination;
-import jakarta.jms.JMSException;
-import jakarta.jms.Message;
-import jakarta.jms.MessageConsumer;
-import jakarta.jms.MessageNotWriteableException;
-import jakarta.jms.MessageProducer;
-import jakarta.jms.Queue;
-import jakarta.jms.Session;
-import jakarta.jms.TemporaryQueue;
-import jakarta.jms.TemporaryTopic;
-import jakarta.jms.Topic;
-
 import org.apache.qpid.jms.JmsClientProperties;
 import org.apache.qpid.jms.JmsConnection;
 import org.apache.qpid.jms.provider.amqp.message.AmqpDestinationHelper;
@@ -80,6 +65,21 @@ import org.apache.qpid.proton.amqp.UnsignedLong;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
 
+import jakarta.jms.CompletionListener;
+import jakarta.jms.Connection;
+import jakarta.jms.DeliveryMode;
+import jakarta.jms.Destination;
+import jakarta.jms.JMSException;
+import jakarta.jms.Message;
+import jakarta.jms.MessageConsumer;
+import jakarta.jms.MessageNotWriteableException;
+import jakarta.jms.MessageProducer;
+import jakarta.jms.Queue;
+import jakarta.jms.Session;
+import jakarta.jms.TemporaryQueue;
+import jakarta.jms.TemporaryTopic;
+import jakarta.jms.Topic;
+
 public class MessageIntegrationTest extends QpidJmsTestCase
 {
     private static final String NULL_STRING_PROP = "nullStringProperty";
@@ -2231,7 +2231,7 @@ public class MessageIntegrationTest extends 
QpidJmsTestCase
     @Timeout(20)
     public void testAsyncCompletionSendMarksMessageReadOnly() throws Exception 
{
         try(TestAmqpPeer testPeer = new TestAmqpPeer();) {
-            JmsConnection connection = (JmsConnection) 
testFixture.establishConnecton(testPeer);
+            JmsConnection connection = (JmsConnection) 
testFixture.establishConnecton(testPeer, "jms.closeTimeout=50");
             connection.setSendTimeout(15000);
 
             testPeer.expectBegin();
diff --git 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ObjectMessageIntegrationTest.java
 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ObjectMessageIntegrationTest.java
index 0585b87b..b5036dc3 100644
--- 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ObjectMessageIntegrationTest.java
+++ 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ObjectMessageIntegrationTest.java
@@ -35,19 +35,6 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.UUID;
 
-import jakarta.jms.CompletionListener;
-import jakarta.jms.Connection;
-import jakarta.jms.DeliveryMode;
-import jakarta.jms.JMSException;
-import jakarta.jms.Message;
-import jakarta.jms.MessageConsumer;
-import jakarta.jms.MessageFormatException;
-import jakarta.jms.MessageNotWriteableException;
-import jakarta.jms.MessageProducer;
-import jakarta.jms.ObjectMessage;
-import jakarta.jms.Queue;
-import jakarta.jms.Session;
-
 import org.apache.qpid.jms.JmsConnection;
 import org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport;
 import org.apache.qpid.jms.test.QpidJmsTestCase;
@@ -70,6 +57,19 @@ import org.junit.jupiter.api.Timeout;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import jakarta.jms.CompletionListener;
+import jakarta.jms.Connection;
+import jakarta.jms.DeliveryMode;
+import jakarta.jms.JMSException;
+import jakarta.jms.Message;
+import jakarta.jms.MessageConsumer;
+import jakarta.jms.MessageFormatException;
+import jakarta.jms.MessageNotWriteableException;
+import jakarta.jms.MessageProducer;
+import jakarta.jms.ObjectMessage;
+import jakarta.jms.Queue;
+import jakarta.jms.Session;
+
 public class ObjectMessageIntegrationTest extends QpidJmsTestCase {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(ObjectMessageIntegrationTest.class);
@@ -660,6 +660,7 @@ public class ObjectMessageIntegrationTest extends 
QpidJmsTestCase {
         try(TestAmqpPeer testPeer = new TestAmqpPeer();) {
             JmsConnection connection = (JmsConnection) 
testFixture.establishConnecton(testPeer);
             connection.setSendTimeout(15000);
+            connection.setCloseTimeout(10);
 
             testPeer.expectBegin();
 
diff --git 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
index df9087cb..6a5e420e 100644
--- 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
+++ 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
@@ -51,21 +51,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.LockSupport;
 
-import jakarta.jms.BytesMessage;
-import jakarta.jms.CompletionListener;
-import jakarta.jms.Connection;
-import jakarta.jms.DeliveryMode;
-import jakarta.jms.ExceptionListener;
-import jakarta.jms.IllegalStateException;
-import jakarta.jms.JMSException;
-import jakarta.jms.Message;
-import jakarta.jms.MessageProducer;
-import jakarta.jms.Queue;
-import jakarta.jms.ResourceAllocationException;
-import jakarta.jms.Session;
-import jakarta.jms.TextMessage;
-import jakarta.jms.Topic;
-
 import org.apache.qpid.jms.JmsConnection;
 import org.apache.qpid.jms.JmsConnectionFactory;
 import org.apache.qpid.jms.JmsDefaultConnectionListener;
@@ -103,6 +88,21 @@ import org.junit.jupiter.api.Timeout;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import jakarta.jms.BytesMessage;
+import jakarta.jms.CompletionListener;
+import jakarta.jms.Connection;
+import jakarta.jms.DeliveryMode;
+import jakarta.jms.ExceptionListener;
+import jakarta.jms.IllegalStateException;
+import jakarta.jms.JMSException;
+import jakarta.jms.Message;
+import jakarta.jms.MessageProducer;
+import jakarta.jms.Queue;
+import jakarta.jms.ResourceAllocationException;
+import jakarta.jms.Session;
+import jakarta.jms.TextMessage;
+import jakarta.jms.Topic;
+
 public class ProducerIntegrationTest extends QpidJmsTestCase {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(ProducerIntegrationTest.class);
@@ -2322,7 +2322,7 @@ public class ProducerIntegrationTest extends 
QpidJmsTestCase {
     @Timeout(20)
     public void testAsyncCompletionGetsNotifiedWhenSessionClosed() throws 
Exception {
         try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
-            JmsConnection connection = (JmsConnection) 
testFixture.establishConnecton(testPeer);
+            JmsConnection connection = (JmsConnection) 
testFixture.establishConnecton(testPeer, "jms.closeTimeout=100");
 
             testPeer.expectBegin();
             testPeer.expectSenderAttach();
@@ -2343,6 +2343,8 @@ public class ProducerIntegrationTest extends 
QpidJmsTestCase {
 
             producer.send(message, listener);
 
+            assertFalse(listener.hasCompleted()); // Close should complete it 
as failed on timeout
+
             session.close();
 
             assertTrue(listener.awaitCompletion(5, TimeUnit.SECONDS), "Did not 
get async callback");
@@ -2356,11 +2358,51 @@ public class ProducerIntegrationTest extends 
QpidJmsTestCase {
         }
     }
 
+    @Test
+    @Timeout(20)
+    public void 
testAsyncCompletionGetsNotifiedWhenSessionClosedAndWaitForCompletion() throws 
Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            JmsConnection connection = (JmsConnection) 
testFixture.establishConnecton(testPeer, "jms.closeTimeout=1000");
+
+            testPeer.expectBegin();
+            testPeer.expectSenderAttach();
+
+            Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+            Queue queue = session.createQueue("myQueue");
+
+            MessageProducer producer = session.createProducer(queue);
+
+            // Create and transfer a new message
+            String text = "myMessage";
+            testPeer.expectTransfer(new TransferPayloadCompositeMatcher(), 
nullValue(), false, true, new Accepted(), true, 0, 100);
+            testPeer.expectEnd();
+            testPeer.expectClose();
+
+            TextMessage message = session.createTextMessage(text);
+            TestJmsCompletionListener listener = new 
TestJmsCompletionListener();
+
+            producer.send(message, listener);
+
+            assertFalse(listener.hasCompleted()); // Close should complete it 
as accepted after the delay
+
+            session.close();
+
+            assertTrue(listener.awaitCompletion(5, TimeUnit.SECONDS), "Did not 
get async callback");
+            assertNull(listener.exception);
+            assertNotNull(listener.message);
+            assertTrue(listener.message instanceof TextMessage);
+
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(2000);
+        }
+    }
+
     @Test
     @Timeout(20)
     public void testAsyncCompletionGetsNotifiedWhenConnectionClosed() throws 
Exception {
         try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
-            JmsConnection connection = (JmsConnection) 
testFixture.establishConnecton(testPeer);
+            JmsConnection connection = (JmsConnection) 
testFixture.establishConnecton(testPeer, "jms.closeTimeout=150");
 
             testPeer.expectBegin();
             testPeer.expectSenderAttach();
@@ -2380,6 +2422,8 @@ public class ProducerIntegrationTest extends 
QpidJmsTestCase {
 
             producer.send(message, listener);
 
+            assertFalse(listener.hasCompleted());
+
             connection.close();
 
             assertTrue(listener.awaitCompletion(5, TimeUnit.SECONDS), "Did not 
get async callback");
@@ -2391,6 +2435,43 @@ public class ProducerIntegrationTest extends 
QpidJmsTestCase {
         }
     }
 
+    @Test
+    @Timeout(20)
+    public void 
testAsyncCompletionAllowedToCompleteNormallyWhenConnectionClosed() throws 
Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            JmsConnection connection = (JmsConnection) 
testFixture.establishConnecton(testPeer, "jms.closeTimeout=1000");
+
+            testPeer.expectBegin();
+            testPeer.expectSenderAttach();
+
+            Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+            Queue queue = session.createQueue("myQueue");
+
+            MessageProducer producer = session.createProducer(queue);
+
+            // Create and transfer a new message
+            String text = "myMessage";
+            testPeer.expectTransfer(new TransferPayloadCompositeMatcher(), 
nullValue(), false, true, new Accepted(), true, 0, 100);
+            testPeer.expectClose();
+
+            TextMessage message = session.createTextMessage(text);
+            TestJmsCompletionListener listener = new 
TestJmsCompletionListener();
+
+            producer.send(message, listener);
+
+            assertFalse(listener.hasCompleted());
+
+            connection.close();
+
+            assertTrue(listener.awaitCompletion(5, TimeUnit.SECONDS), "Did not 
get async callback");
+            assertNull(listener.exception);
+            assertNotNull(listener.message);
+            assertTrue(listener.message instanceof TextMessage);
+
+            testPeer.waitForAllHandlersToComplete(2000);
+        }
+    }
+
     @Test
     @Timeout(20)
     public void testAsyncCompletionResetsBytesMessage() throws Exception {
@@ -2856,7 +2937,7 @@ public class ProducerIntegrationTest extends 
QpidJmsTestCase {
     @Timeout(20)
     public void 
testRemotelyCloseOneProducerDoesNotCompleteAsyncSendFromAnotherProducer() 
throws Exception {
         try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
-            JmsConnection connection = (JmsConnection) 
testFixture.establishConnecton(testPeer);
+            JmsConnection connection = (JmsConnection) 
testFixture.establishConnecton(testPeer, "jms.closeTimeout=150");
 
             testPeer.expectBegin();
             testPeer.expectSenderAttach();
@@ -2895,7 +2976,7 @@ public class ProducerIntegrationTest extends 
QpidJmsTestCase {
 
             assertFalse(listener.awaitCompletion(10, TimeUnit.MILLISECONDS), 
"Should not get async callback");
 
-            // Closing the session should complete the send with an exception
+            // Closing the session should complete the send with an exception 
after timeout
             testPeer.expectEnd();
             session.close();
 
@@ -3044,6 +3125,10 @@ public class ProducerIntegrationTest extends 
QpidJmsTestCase {
             this.completed = new CountDownLatch(expected);
         }
 
+        public boolean hasCompleted() {
+               return completed.getCount() == 0;
+        }
+
         public boolean awaitCompletion(long timeout, TimeUnit units) throws 
InterruptedException {
             return completed.await(timeout, units);
         }
@@ -3239,4 +3324,74 @@ public class ProducerIntegrationTest extends 
QpidJmsTestCase {
             testPeer.waitForAllHandlersToComplete(1000);
         }
     }
+
+    @Test
+    @Timeout(20)
+    public void testRemotelyEndConnectionCompletesAsyncSends() throws 
Exception {
+        final String BREAD_CRUMB = "ErrorMessage";
+
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            final CountDownLatch connectionClosed = new CountDownLatch(1);
+            JmsConnection connection = (JmsConnection) 
testFixture.establishConnecton(testPeer);
+            connection.addConnectionListener(new 
JmsDefaultConnectionListener() {
+                @Override
+                public void onConnectionFailure(Throwable exception) {
+                       connectionClosed.countDown();
+                }
+            });
+
+            testPeer.expectBegin();
+            Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+
+            // Create a producer, then remotely end the session afterwards.
+            testPeer.expectSenderAttach();
+
+            Queue queue = session.createQueue("myQueue");
+            final MessageProducer producer = session.createProducer(queue);
+
+            final int MSG_COUNT = 3;
+
+            for (int i = 0; i < MSG_COUNT; ++i) {
+                testPeer.expectTransferButDoNotRespond(new 
TransferPayloadCompositeMatcher());
+            }
+
+            TestJmsCompletionListener listener = new 
TestJmsCompletionListener(MSG_COUNT);
+            try {
+                for (int i = 0; i < MSG_COUNT; ++i) {
+                    Message message = session.createTextMessage("content");
+                    producer.send(message, listener);
+                }
+            } catch (JMSException e) {
+                LOG.warn("Caught unexpected error: {}", e.getMessage());
+                fail("No expected exception for this send.");
+            }
+
+            testPeer.waitForAllHandlersToComplete(2000);
+            testPeer.expectSenderAttach();
+            testPeer.remotelyCloseConnection(true, AmqpError.RESOURCE_DELETED, 
BREAD_CRUMB, 50);
+
+            session.createProducer(queue);
+
+            // Verify the session gets marked closed
+            assertTrue(connectionClosed.await(5, TimeUnit.SECONDS), "Session 
closed callback didn't trigger");
+
+            try {
+                producer.getDeliveryMode();
+                fail("Expected ISE to be thrown due to being closed");
+            } catch (IllegalStateException jmsise) {
+                String errorMessage = jmsise.getCause().getMessage();
+                
assertTrue(errorMessage.contains(AmqpError.RESOURCE_DELETED.toString()));
+                assertTrue(errorMessage.contains(BREAD_CRUMB));
+            }
+
+            assertTrue(listener.awaitCompletion(5, TimeUnit.SECONDS));
+            assertEquals(MSG_COUNT, listener.errorCount); // All sends should 
have been failed
+
+            // Try closing it explicitly, should effectively no-op in client.
+            // The test peer will throw during close if it sends anything.
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
 }
diff --git 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
index c8c13df4..472bcd39 100644
--- 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
+++ 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
@@ -41,27 +41,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 
-import jakarta.jms.CompletionListener;
-import jakarta.jms.Connection;
-import jakarta.jms.Destination;
-import jakarta.jms.IllegalStateException;
-import jakarta.jms.InvalidDestinationException;
-import jakarta.jms.InvalidSelectorException;
-import jakarta.jms.JMSException;
-import jakarta.jms.JMSSecurityException;
-import jakarta.jms.Message;
-import jakarta.jms.MessageConsumer;
-import jakarta.jms.MessageListener;
-import jakarta.jms.MessageProducer;
-import jakarta.jms.Queue;
-import jakarta.jms.QueueBrowser;
-import jakarta.jms.Session;
-import jakarta.jms.TemporaryQueue;
-import jakarta.jms.TemporaryTopic;
-import jakarta.jms.TextMessage;
-import jakarta.jms.Topic;
-import jakarta.jms.TopicSubscriber;
-
 import org.apache.qpid.jms.JmsConnection;
 import org.apache.qpid.jms.JmsDefaultConnectionListener;
 import org.apache.qpid.jms.JmsOperationTimedOutException;
@@ -101,6 +80,27 @@ import org.junit.jupiter.api.Timeout;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import jakarta.jms.CompletionListener;
+import jakarta.jms.Connection;
+import jakarta.jms.Destination;
+import jakarta.jms.IllegalStateException;
+import jakarta.jms.InvalidDestinationException;
+import jakarta.jms.InvalidSelectorException;
+import jakarta.jms.JMSException;
+import jakarta.jms.JMSSecurityException;
+import jakarta.jms.Message;
+import jakarta.jms.MessageConsumer;
+import jakarta.jms.MessageListener;
+import jakarta.jms.MessageProducer;
+import jakarta.jms.Queue;
+import jakarta.jms.QueueBrowser;
+import jakarta.jms.Session;
+import jakarta.jms.TemporaryQueue;
+import jakarta.jms.TemporaryTopic;
+import jakarta.jms.TextMessage;
+import jakarta.jms.Topic;
+import jakarta.jms.TopicSubscriber;
+
 public class SessionIntegrationTest extends QpidJmsTestCase {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(SessionIntegrationTest.class);
@@ -2333,6 +2333,10 @@ public class SessionIntegrationTest extends 
QpidJmsTestCase {
             completed = new CountDownLatch(expected);
         }
 
+        public boolean hasCompleted() {
+               return completed.getCount() == 0;
+        }
+
         public boolean awaitCompletion(long timeout, TimeUnit units) throws 
InterruptedException {
             return completed.await(timeout, units);
         }
@@ -2836,6 +2840,82 @@ public class SessionIntegrationTest extends 
QpidJmsTestCase {
         }
     }
 
+    @Test
+    @Timeout(20)
+    public void testRemotelyEndSessionCompletesAsyncSends() throws Exception {
+        final String BREAD_CRUMB = "ErrorMessage";
+
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            final CountDownLatch sessionClosed = new CountDownLatch(1);
+            JmsConnection connection = (JmsConnection) 
testFixture.establishConnecton(testPeer);
+            connection.addConnectionListener(new 
JmsDefaultConnectionListener() {
+                @Override
+                public void onSessionClosed(Session session, Throwable 
exception) {
+                       sessionClosed.countDown();
+                }
+            });
+
+            testPeer.expectBegin();
+            Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+
+            // Create a producer, then remotely end the session afterwards.
+            testPeer.expectSenderAttach();
+
+            Queue queue = session.createQueue("myQueue");
+            final MessageProducer producer = session.createProducer(queue);
+
+            final int MSG_COUNT = 3;
+
+            for (int i = 0; i < MSG_COUNT; ++i) {
+                testPeer.expectTransferButDoNotRespond(new 
TransferPayloadCompositeMatcher());
+            }
+
+            TestJmsCompletionListener listener = new 
TestJmsCompletionListener(MSG_COUNT);
+            try {
+                for (int i = 0; i < MSG_COUNT; ++i) {
+                    Message message = session.createTextMessage("content");
+                    producer.send(message, listener);
+                }
+            } catch (JMSException e) {
+                LOG.warn("Caught unexpected error: {}", e.getMessage());
+                fail("No expected exception for this send.");
+            }
+
+            testPeer.waitForAllHandlersToComplete(2000);
+
+            assertFalse(listener.hasCompleted());
+
+            testPeer.expectSenderAttach();
+            testPeer.remotelyEndLastOpenedSession(true, 50, 
AmqpError.RESOURCE_DELETED, BREAD_CRUMB);
+
+            session.createProducer(queue);
+
+            // Verify the session gets marked closed
+            assertTrue(sessionClosed.await(5, TimeUnit.SECONDS), "Session 
closed callback didn't trigger");
+
+            try {
+                producer.getDeliveryMode();
+                fail("Expected ISE to be thrown due to being closed");
+            } catch (IllegalStateException jmsise) {
+                String errorMessage = jmsise.getCause().getMessage();
+                
assertTrue(errorMessage.contains(AmqpError.RESOURCE_DELETED.toString()));
+                assertTrue(errorMessage.contains(BREAD_CRUMB));
+            }
+
+            assertTrue(listener.awaitCompletion(5, TimeUnit.SECONDS));
+            assertEquals(MSG_COUNT, listener.errorCount); // All sends should 
have been failed
+
+            // Try closing it explicitly, should effectively no-op in client.
+            // The test peer will throw during close if it sends anything.
+            session.close();
+
+            testPeer.expectClose();
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
     private boolean verifyConsumerClosure(final String BREAD_CRUMB, final 
MessageConsumer consumer) throws Exception {
         return Wait.waitFor(new Wait.Condition() {
             @Override
diff --git 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/StreamMessageIntegrationTest.java
 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/StreamMessageIntegrationTest.java
index 3fb47dff..0589a268 100644
--- 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/StreamMessageIntegrationTest.java
+++ 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/StreamMessageIntegrationTest.java
@@ -29,18 +29,6 @@ import static org.junit.jupiter.api.Assertions.fail;
 import java.util.ArrayList;
 import java.util.List;
 
-import jakarta.jms.CompletionListener;
-import jakarta.jms.Connection;
-import jakarta.jms.DeliveryMode;
-import jakarta.jms.Message;
-import jakarta.jms.MessageConsumer;
-import jakarta.jms.MessageFormatException;
-import jakarta.jms.MessageNotWriteableException;
-import jakarta.jms.MessageProducer;
-import jakarta.jms.Queue;
-import jakarta.jms.Session;
-import jakarta.jms.StreamMessage;
-
 import org.apache.qpid.jms.JmsConnection;
 import org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport;
 import org.apache.qpid.jms.test.QpidJmsTestCase;
@@ -57,6 +45,18 @@ import org.apache.qpid.proton.amqp.DescribedType;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
 
+import jakarta.jms.CompletionListener;
+import jakarta.jms.Connection;
+import jakarta.jms.DeliveryMode;
+import jakarta.jms.Message;
+import jakarta.jms.MessageConsumer;
+import jakarta.jms.MessageFormatException;
+import jakarta.jms.MessageNotWriteableException;
+import jakarta.jms.MessageProducer;
+import jakarta.jms.Queue;
+import jakarta.jms.Session;
+import jakarta.jms.StreamMessage;
+
 public class StreamMessageIntegrationTest extends QpidJmsTestCase {
     private final IntegrationTestFixture testFixture = new 
IntegrationTestFixture();
 
@@ -439,6 +439,7 @@ public class StreamMessageIntegrationTest extends 
QpidJmsTestCase {
         try(TestAmqpPeer testPeer = new TestAmqpPeer();) {
             JmsConnection connection = (JmsConnection) 
testFixture.establishConnecton(testPeer);
             connection.setSendTimeout(15000);
+            connection.setSendTimeout(15);
 
             testPeer.expectBegin();
 
diff --git 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TextMessageIntegrationTest.java
 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TextMessageIntegrationTest.java
index e4a55dbf..52f861f4 100644
--- 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TextMessageIntegrationTest.java
+++ 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TextMessageIntegrationTest.java
@@ -28,19 +28,6 @@ import static org.junit.jupiter.api.Assertions.fail;
 
 import java.io.IOException;
 
-import jakarta.jms.CompletionListener;
-import jakarta.jms.Connection;
-import jakarta.jms.DeliveryMode;
-import jakarta.jms.JMSException;
-import jakarta.jms.Message;
-import jakarta.jms.MessageConsumer;
-import jakarta.jms.MessageFormatException;
-import jakarta.jms.MessageNotWriteableException;
-import jakarta.jms.MessageProducer;
-import jakarta.jms.Queue;
-import jakarta.jms.Session;
-import jakarta.jms.TextMessage;
-
 import org.apache.qpid.jms.JmsConnection;
 import org.apache.qpid.jms.test.QpidJmsTestCase;
 import org.apache.qpid.jms.test.testpeer.TestAmqpPeer;
@@ -58,6 +45,19 @@ import org.apache.qpid.proton.amqp.Symbol;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
 
+import jakarta.jms.CompletionListener;
+import jakarta.jms.Connection;
+import jakarta.jms.DeliveryMode;
+import jakarta.jms.JMSException;
+import jakarta.jms.Message;
+import jakarta.jms.MessageConsumer;
+import jakarta.jms.MessageFormatException;
+import jakarta.jms.MessageNotWriteableException;
+import jakarta.jms.MessageProducer;
+import jakarta.jms.Queue;
+import jakarta.jms.Session;
+import jakarta.jms.TextMessage;
+
 public class TextMessageIntegrationTest extends QpidJmsTestCase {
     private final IntegrationTestFixture testFixture = new 
IntegrationTestFixture();
 
@@ -447,6 +447,7 @@ public class TextMessageIntegrationTest extends 
QpidJmsTestCase {
         try(TestAmqpPeer testPeer = new TestAmqpPeer();) {
             JmsConnection connection = (JmsConnection) 
testFixture.establishConnecton(testPeer);
             connection.setSendTimeout(15000);
+            connection.setCloseTimeout(15);
 
             testPeer.expectBegin();
 
diff --git 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java
 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java
index 53b58bb4..b7393b74 100644
--- 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java
+++ 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java
@@ -45,28 +45,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
-import jakarta.jms.CompletionListener;
-import jakarta.jms.Connection;
-import jakarta.jms.ConnectionFactory;
-import jakarta.jms.ExceptionListener;
-import jakarta.jms.IllegalStateException;
-import jakarta.jms.InvalidDestinationException;
-import jakarta.jms.JMSException;
-import jakarta.jms.JMSSecurityException;
-import jakarta.jms.Message;
-import jakarta.jms.MessageConsumer;
-import jakarta.jms.MessageListener;
-import jakarta.jms.MessageProducer;
-import jakarta.jms.Queue;
-import jakarta.jms.QueueBrowser;
-import jakarta.jms.ResourceAllocationException;
-import jakarta.jms.ServerSessionPool;
-import jakarta.jms.Session;
-import jakarta.jms.TemporaryTopic;
-import jakarta.jms.TextMessage;
-import jakarta.jms.Topic;
-import jakarta.jms.TransactionRolledBackException;
-
 import org.apache.qpid.jms.JmsConnection;
 import org.apache.qpid.jms.JmsConnectionExtensions;
 import org.apache.qpid.jms.JmsConnectionFactory;
@@ -113,6 +91,28 @@ import org.mockito.Mockito;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import jakarta.jms.CompletionListener;
+import jakarta.jms.Connection;
+import jakarta.jms.ConnectionFactory;
+import jakarta.jms.ExceptionListener;
+import jakarta.jms.IllegalStateException;
+import jakarta.jms.InvalidDestinationException;
+import jakarta.jms.JMSException;
+import jakarta.jms.JMSSecurityException;
+import jakarta.jms.Message;
+import jakarta.jms.MessageConsumer;
+import jakarta.jms.MessageListener;
+import jakarta.jms.MessageProducer;
+import jakarta.jms.Queue;
+import jakarta.jms.QueueBrowser;
+import jakarta.jms.ResourceAllocationException;
+import jakarta.jms.ServerSessionPool;
+import jakarta.jms.Session;
+import jakarta.jms.TemporaryTopic;
+import jakarta.jms.TextMessage;
+import jakarta.jms.Topic;
+import jakarta.jms.TransactionRolledBackException;
+
 public class FailoverIntegrationTest extends QpidJmsTestCase {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(FailoverIntegrationTest.class);
@@ -4558,6 +4558,69 @@ public class FailoverIntegrationTest extends 
QpidJmsTestCase {
         }
     }
 
+    @Test
+    @Timeout(20)
+    public void testFailoverDoesFailPendingAsyncCompletionSend() throws 
Exception {
+        try (TestAmqpPeer originalPeer = new TestAmqpPeer();
+             TestAmqpPeer finalPeer = new TestAmqpPeer();) {
+
+            // Create a peer to connect to, then one to reconnect to
+            final String originalURI = createPeerURI(originalPeer);
+            final String finalURI = createPeerURI(finalPeer);
+
+            LOG.info("Original peer is at: {}", originalURI);
+            LOG.info("Final peer is at: {}", finalURI);
+
+            originalPeer.expectSaslAnonymous();
+            originalPeer.expectOpen();
+            originalPeer.expectBegin();
+            originalPeer.expectBegin();
+            originalPeer.expectSenderAttach();
+            originalPeer.expectTransferButDoNotRespond(new 
TransferPayloadCompositeMatcher());
+            originalPeer.dropAfterLastHandler(15);  // Wait for sender to get 
into wait state
+
+            // --- Post Failover Expectations of sender --- //
+            finalPeer.expectSaslAnonymous();
+            finalPeer.expectOpen();
+            finalPeer.expectBegin();
+            finalPeer.expectBegin();
+            finalPeer.expectSenderAttach();
+
+            final JmsConnection connection = 
establishAnonymousConnecton("failover.initialReconnectDelay=25", originalPeer, 
finalPeer);
+
+            Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+            Queue queue = session.createQueue("myQueue");
+
+            MessageProducer producer = session.createProducer(queue);
+
+            // Create and transfer a new message
+            String text = "myMessage";
+
+            TextMessage message = session.createTextMessage(text);
+            TestJmsCompletionListener listener = new 
TestJmsCompletionListener();
+
+            try {
+                producer.send(message, listener);
+            } catch (JMSException jmsEx) {
+                fail("Should not have failed the async completion send.");
+            }
+
+            // This should fire after reconnect without an error, if it fires 
with an error at
+            // any time then something is wrong.
+            assertTrue(listener.awaitCompletion(5, TimeUnit.SECONDS), "Did not 
get async callback");
+            assertNotNull(listener.exception, "Completion should have been due 
to error");
+            assertNotNull(listener.message);
+            assertTrue(listener.message instanceof TextMessage);
+
+            finalPeer.waitForAllHandlersToComplete(5000);
+            finalPeer.expectClose();
+
+            connection.close();
+
+            finalPeer.waitForAllHandlersToComplete(5000);
+        }
+    }
+
     @Test
     @Timeout(20)
     public void testFailoverHandlesAnonymousFallbackWaitingForClose() throws 
Exception {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to