This is an automated email from the ASF dual-hosted git repository.
tabish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-jms.git
The following commit(s) were added to refs/heads/main by this push:
new 90eb60f5 QPIDJMS-600 Ensure session and connection close await async
sends
90eb60f5 is described below
commit 90eb60f59cb59b7b9ad8363ee8a843d6903b8e77
Author: Timothy Bish <[email protected]>
AuthorDate: Fri Apr 19 18:58:03 2024 -0400
QPIDJMS-600 Ensure session and connection close await async sends
Session and Connection close should be awaiting the outcome of async send
completions before returning. This change allows them to await up to the
close timeout value before moving on and failing any completions that are
not completed after that point. Several tests added to cover this behavior.
---
.../java/org/apache/qpid/jms/JmsConnection.java | 50 +++---
.../main/java/org/apache/qpid/jms/JmsSession.java | 125 ++++++++-----
.../integration/BytesMessageIntegrationTest.java | 28 +--
.../jms/integration/MapMessageIntegrationTest.java | 26 +--
.../jms/integration/MessageIntegrationTest.java | 32 ++--
.../integration/ObjectMessageIntegrationTest.java | 27 +--
.../jms/integration/ProducerIntegrationTest.java | 193 +++++++++++++++++++--
.../jms/integration/SessionIntegrationTest.java | 122 ++++++++++---
.../integration/StreamMessageIntegrationTest.java | 25 +--
.../integration/TextMessageIntegrationTest.java | 27 +--
.../provider/failover/FailoverIntegrationTest.java | 107 +++++++++---
11 files changed, 552 insertions(+), 210 deletions(-)
diff --git
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
index 2f4e53f9..ce83ed4e 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
@@ -29,27 +29,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
-import jakarta.jms.Connection;
-import jakarta.jms.ConnectionConsumer;
-import jakarta.jms.ConnectionMetaData;
-import jakarta.jms.Destination;
-import jakarta.jms.ExceptionListener;
-import jakarta.jms.IllegalStateException;
-import jakarta.jms.InvalidClientIDException;
-import jakarta.jms.InvalidDestinationException;
-import jakarta.jms.JMSException;
-import jakarta.jms.JMSRuntimeException;
-import jakarta.jms.Queue;
-import jakarta.jms.QueueConnection;
-import jakarta.jms.QueueSession;
-import jakarta.jms.ServerSessionPool;
-import jakarta.jms.Session;
-import jakarta.jms.TemporaryQueue;
-import jakarta.jms.TemporaryTopic;
-import jakarta.jms.Topic;
-import jakarta.jms.TopicConnection;
-import jakarta.jms.TopicSession;
-
import org.apache.qpid.jms.exceptions.JmsConnectionFailedException;
import org.apache.qpid.jms.exceptions.JmsExceptionSupport;
import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
@@ -91,6 +70,27 @@ import org.apache.qpid.jms.util.URISupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import jakarta.jms.Connection;
+import jakarta.jms.ConnectionConsumer;
+import jakarta.jms.ConnectionMetaData;
+import jakarta.jms.Destination;
+import jakarta.jms.ExceptionListener;
+import jakarta.jms.IllegalStateException;
+import jakarta.jms.InvalidClientIDException;
+import jakarta.jms.InvalidDestinationException;
+import jakarta.jms.JMSException;
+import jakarta.jms.JMSRuntimeException;
+import jakarta.jms.Queue;
+import jakarta.jms.QueueConnection;
+import jakarta.jms.QueueSession;
+import jakarta.jms.ServerSessionPool;
+import jakarta.jms.Session;
+import jakarta.jms.TemporaryQueue;
+import jakarta.jms.TemporaryTopic;
+import jakarta.jms.Topic;
+import jakarta.jms.TopicConnection;
+import jakarta.jms.TopicSession;
+
/**
* Implementation of a JMS Connection
*/
@@ -916,6 +916,14 @@ public class JmsConnection implements AutoCloseable,
Connection, TopicConnection
}
}
+ ProviderFuture newProviderFuture() {
+ return newProviderFuture(null);
+ }
+
+ ProviderFuture newProviderFuture(ProviderSynchronization synchronization) {
+ return provider.newProviderFuture(synchronization);
+ }
+
//----- Property setters and getters
-------------------------------------//
@Override
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
index 19c5fa89..65036828 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
@@ -39,6 +39,36 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
+import org.apache.qpid.jms.exceptions.JmsConnectionFailedException;
+import org.apache.qpid.jms.exceptions.JmsExceptionSupport;
+import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
+import org.apache.qpid.jms.message.JmsMessage;
+import org.apache.qpid.jms.message.JmsMessageTransformation;
+import org.apache.qpid.jms.message.JmsOutboundMessageDispatch;
+import org.apache.qpid.jms.meta.JmsConsumerId;
+import org.apache.qpid.jms.meta.JmsConsumerInfo;
+import org.apache.qpid.jms.meta.JmsProducerId;
+import org.apache.qpid.jms.meta.JmsProducerInfo;
+import org.apache.qpid.jms.meta.JmsResource.ResourceState;
+import org.apache.qpid.jms.meta.JmsSessionId;
+import org.apache.qpid.jms.meta.JmsSessionInfo;
+import org.apache.qpid.jms.policy.JmsDeserializationPolicy;
+import org.apache.qpid.jms.policy.JmsMessageIDPolicy;
+import org.apache.qpid.jms.policy.JmsPrefetchPolicy;
+import org.apache.qpid.jms.policy.JmsPresettlePolicy;
+import org.apache.qpid.jms.policy.JmsRedeliveryPolicy;
+import org.apache.qpid.jms.provider.Provider;
+import org.apache.qpid.jms.provider.ProviderConstants.ACK_TYPE;
+import org.apache.qpid.jms.provider.ProviderException;
+import org.apache.qpid.jms.provider.ProviderFuture;
+import org.apache.qpid.jms.provider.ProviderSynchronization;
+import org.apache.qpid.jms.selector.SelectorParser;
+import org.apache.qpid.jms.selector.filter.FilterException;
+import org.apache.qpid.jms.util.NoOpExecutor;
+import org.apache.qpid.jms.util.QpidJMSThreadFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import jakarta.jms.BytesMessage;
import jakarta.jms.CompletionListener;
import jakarta.jms.DeliveryMode;
@@ -71,36 +101,6 @@ import jakarta.jms.TopicPublisher;
import jakarta.jms.TopicSession;
import jakarta.jms.TopicSubscriber;
-import org.apache.qpid.jms.exceptions.JmsConnectionFailedException;
-import org.apache.qpid.jms.exceptions.JmsExceptionSupport;
-import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
-import org.apache.qpid.jms.message.JmsMessage;
-import org.apache.qpid.jms.message.JmsMessageTransformation;
-import org.apache.qpid.jms.message.JmsOutboundMessageDispatch;
-import org.apache.qpid.jms.meta.JmsConsumerId;
-import org.apache.qpid.jms.meta.JmsConsumerInfo;
-import org.apache.qpid.jms.meta.JmsProducerId;
-import org.apache.qpid.jms.meta.JmsProducerInfo;
-import org.apache.qpid.jms.meta.JmsResource.ResourceState;
-import org.apache.qpid.jms.meta.JmsSessionId;
-import org.apache.qpid.jms.meta.JmsSessionInfo;
-import org.apache.qpid.jms.policy.JmsDeserializationPolicy;
-import org.apache.qpid.jms.policy.JmsMessageIDPolicy;
-import org.apache.qpid.jms.policy.JmsPrefetchPolicy;
-import org.apache.qpid.jms.policy.JmsPresettlePolicy;
-import org.apache.qpid.jms.policy.JmsRedeliveryPolicy;
-import org.apache.qpid.jms.provider.Provider;
-import org.apache.qpid.jms.provider.ProviderConstants.ACK_TYPE;
-import org.apache.qpid.jms.provider.ProviderException;
-import org.apache.qpid.jms.provider.ProviderFuture;
-import org.apache.qpid.jms.provider.ProviderSynchronization;
-import org.apache.qpid.jms.selector.SelectorParser;
-import org.apache.qpid.jms.selector.filter.FilterException;
-import org.apache.qpid.jms.util.NoOpExecutor;
-import org.apache.qpid.jms.util.QpidJMSThreadFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
/**
* JMS Session implementation
*/
@@ -126,6 +126,7 @@ public class JmsSession implements AutoCloseable, Session,
QueueSession, TopicSe
private final ReentrantLock sendLock = new ReentrantLock();
private volatile ThreadPoolExecutor deliveryExecutor;
private volatile ThreadPoolExecutor completionExcecutor;
+ private volatile ProviderFuture asyncSendsCompletion;
private AtomicReference<Thread> deliveryThread = new
AtomicReference<Thread>();
private boolean deliveryThreadCheckEnabled = true;
private AtomicReference<Thread> completionThread = new
AtomicReference<Thread>();
@@ -351,6 +352,7 @@ public class JmsSession implements AutoCloseable, Session,
QueueSession, TopicSe
for (JmsMessageProducer producer : new
ArrayList<JmsMessageProducer>(this.producers.values())) {
producer.shutdown(cause);
}
+
} catch (JMSException jmsEx) {
shutdownError = jmsEx;
}
@@ -367,30 +369,52 @@ public class JmsSession implements AutoCloseable,
Session, QueueSession, TopicSe
}
}
- // Ensure that no asynchronous completion sends remain blocked
after close.
+ try {
+ if (getSessionMode() == Session.CLIENT_ACKNOWLEDGE) {
+ acknowledge(ACK_TYPE.SESSION_SHUTDOWN);
+ }
+ } catch (Exception e) {
+ LOG.trace("Exception during session shutdown cleanup
acknowledgement", e);
+ }
+
+ // Ensure that no asynchronous completion sends remain blocked
after close but wait
+ // using the close timeout for the asynchronous sends to
complete normally.
+ final ExecutorService completionExecutor =
getCompletionExecutor();
+
synchronized (sessionInfo) {
+ // Producers are now quiesced and we can await completion
of asynchronous sends
+ // that are still pending a result or timeout once we've
done a quick check to
+ // see if any are actually pending or have completed
already.
+ asyncSendsCompletion = connection.newProviderFuture();
+
+ completionExecutor.execute(() -> {
+ if (asyncSendQueue.isEmpty()) {
+ asyncSendsCompletion.onSuccess();
+ }
+ });
+ }
+
+ try {
+ asyncSendsCompletion.sync(connection.getCloseTimeout(),
TimeUnit.MILLISECONDS);
+ } catch (Exception ex) {
+ LOG.trace("Exception during wait for asynchronous sends to
complete", ex);
+ } finally {
if (cause == null) {
cause = new JMSException("Session closed remotely
before message transfer result was notified");
}
- getCompletionExecutor().execute(new
FailOrCompleteAsyncCompletionsTask(JmsExceptionSupport.create(cause)));
- getCompletionExecutor().shutdown();
+ // as a last task we want to fail any stragglers in the
asynchronous send queue and then
+ // shutdown the queue to prevent any more submissions
while the cleanup goes on.
+ completionExecutor.execute(new
FailOrCompleteAsyncCompletionsTask(JmsExceptionSupport.create(cause)));
+ completionExecutor.shutdown();
}
try {
-
getCompletionExecutor().awaitTermination(connection.getCloseTimeout(),
TimeUnit.MILLISECONDS);
+
completionExecutor.awaitTermination(connection.getCloseTimeout(),
TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
LOG.trace("Session close awaiting send completions was
interrupted");
}
- try {
- if (getSessionMode() == Session.CLIENT_ACKNOWLEDGE) {
- acknowledge(ACK_TYPE.SESSION_SHUTDOWN);
- }
- } catch (Exception e) {
- LOG.trace("Exception during session shutdown cleanup
acknowledgement", e);
- }
-
if (shutdownError != null) {
throw shutdownError;
}
@@ -856,11 +880,12 @@ public class JmsSession implements AutoCloseable,
Session, QueueSession, TopicSe
}
private void send(JmsMessageProducer producer, JmsDestination destination,
Message original, int deliveryMode, int priority, long timeToLive, boolean
disableMsgId, boolean disableTimestamp, long deliveryDelay, CompletionListener
listener) throws JMSException {
- sendLock.lock();
-
JmsMessage outbound = null;
+ sendLock.lock();
try {
+ checkClosed();
+
original.setJMSDeliveryMode(deliveryMode);
original.setJMSPriority(priority);
original.setJMSRedelivered(false);
@@ -909,7 +934,7 @@ public class JmsSession implements AutoCloseable, Session,
QueueSession, TopicSe
}
outbound.getFacade().setDeliveryTime(deliveryTime, hasDelay);
- if(!isJmsMessage) {
+ if (!isJmsMessage) {
// If the original was a foreign message, we still need to
update it too.
setForeignMessageDeliveryTime(original, deliveryTime);
}
@@ -977,7 +1002,7 @@ public class JmsSession implements AutoCloseable, Session,
QueueSession, TopicSe
}
} catch (JMSException jmsEx) {
// Ensure that on failure case the message is returned to usable
state for another send attempt.
- if(outbound != null) {
+ if (outbound != null) {
outbound.onSendComplete();
}
throw jmsEx;
@@ -1511,6 +1536,10 @@ public class JmsSession implements AutoCloseable,
Session, QueueSession, TopicSe
if (producerId == null) {
asyncSendQueue.clear();
}
+
+ if (closed.get() && asyncSendsCompletion != null &&
asyncSendQueue.isEmpty()) {
+ asyncSendsCompletion.onSuccess();
+ }
}
}
@@ -1577,6 +1606,10 @@ public class JmsSession implements AutoCloseable,
Session, QueueSession, TopicSe
}
}
}
+
+ if (closed.get() && asyncSendsCompletion != null &&
asyncSendQueue.isEmpty()) {
+ asyncSendsCompletion.onSuccess();
+ }
} catch (Exception ex) {
LOG.error("Async completion task encountered unexpected
failure", ex);
}
diff --git
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/BytesMessageIntegrationTest.java
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/BytesMessageIntegrationTest.java
index 9fabb115..d30955b7 100644
---
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/BytesMessageIntegrationTest.java
+++
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/BytesMessageIntegrationTest.java
@@ -30,19 +30,6 @@ import java.io.IOException;
import java.util.Arrays;
import java.util.Map;
-import jakarta.jms.BytesMessage;
-import jakarta.jms.CompletionListener;
-import jakarta.jms.Connection;
-import jakarta.jms.DeliveryMode;
-import jakarta.jms.JMSException;
-import jakarta.jms.Message;
-import jakarta.jms.MessageConsumer;
-import jakarta.jms.MessageFormatException;
-import jakarta.jms.MessageNotWriteableException;
-import jakarta.jms.MessageProducer;
-import jakarta.jms.Queue;
-import jakarta.jms.Session;
-
import org.apache.qpid.jms.JmsConnection;
import org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport;
import org.apache.qpid.jms.test.QpidJmsTestCase;
@@ -62,6 +49,19 @@ import org.apache.qpid.proton.amqp.Symbol;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
+import jakarta.jms.BytesMessage;
+import jakarta.jms.CompletionListener;
+import jakarta.jms.Connection;
+import jakarta.jms.DeliveryMode;
+import jakarta.jms.JMSException;
+import jakarta.jms.Message;
+import jakarta.jms.MessageConsumer;
+import jakarta.jms.MessageFormatException;
+import jakarta.jms.MessageNotWriteableException;
+import jakarta.jms.MessageProducer;
+import jakarta.jms.Queue;
+import jakarta.jms.Session;
+
public class BytesMessageIntegrationTest extends QpidJmsTestCase {
private final IntegrationTestFixture testFixture = new
IntegrationTestFixture();
@@ -581,7 +581,7 @@ public class BytesMessageIntegrationTest extends
QpidJmsTestCase {
@Timeout(20)
public void testAsyncCompletionSendMarksBytesMessageReadOnly() throws
Exception {
try(TestAmqpPeer testPeer = new TestAmqpPeer();) {
- JmsConnection connection = (JmsConnection)
testFixture.establishConnecton(testPeer);
+ JmsConnection connection = (JmsConnection)
testFixture.establishConnecton(testPeer, "jms.closeTimeout=50");
connection.setSendTimeout(15000);
testPeer.expectBegin();
diff --git
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MapMessageIntegrationTest.java
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MapMessageIntegrationTest.java
index e67c9e98..039ac874 100644
---
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MapMessageIntegrationTest.java
+++
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MapMessageIntegrationTest.java
@@ -30,18 +30,6 @@ import java.util.Arrays;
import java.util.LinkedHashMap;
import java.util.Map;
-import jakarta.jms.CompletionListener;
-import jakarta.jms.Connection;
-import jakarta.jms.DeliveryMode;
-import jakarta.jms.MapMessage;
-import jakarta.jms.Message;
-import jakarta.jms.MessageConsumer;
-import jakarta.jms.MessageFormatException;
-import jakarta.jms.MessageNotWriteableException;
-import jakarta.jms.MessageProducer;
-import jakarta.jms.Queue;
-import jakarta.jms.Session;
-
import org.apache.qpid.jms.JmsConnection;
import org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport;
import org.apache.qpid.jms.test.QpidJmsTestCase;
@@ -58,6 +46,18 @@ import org.apache.qpid.proton.amqp.DescribedType;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
+import jakarta.jms.CompletionListener;
+import jakarta.jms.Connection;
+import jakarta.jms.DeliveryMode;
+import jakarta.jms.MapMessage;
+import jakarta.jms.Message;
+import jakarta.jms.MessageConsumer;
+import jakarta.jms.MessageFormatException;
+import jakarta.jms.MessageNotWriteableException;
+import jakarta.jms.MessageProducer;
+import jakarta.jms.Queue;
+import jakarta.jms.Session;
+
public class MapMessageIntegrationTest extends QpidJmsTestCase {
private final IntegrationTestFixture testFixture = new
IntegrationTestFixture();
@@ -440,7 +440,7 @@ public class MapMessageIntegrationTest extends
QpidJmsTestCase {
@Timeout(20)
public void testAsyncCompletionSendMarksMapMessageReadOnly() throws
Exception {
try(TestAmqpPeer testPeer = new TestAmqpPeer();) {
- JmsConnection connection = (JmsConnection)
testFixture.establishConnecton(testPeer);
+ JmsConnection connection = (JmsConnection)
testFixture.establishConnecton(testPeer, "jms.closeTimeout=50");
connection.setSendTimeout(15000);
testPeer.expectBegin();
diff --git
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MessageIntegrationTest.java
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MessageIntegrationTest.java
index 5698cf5d..5dd03d4b 100644
---
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MessageIntegrationTest.java
+++
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MessageIntegrationTest.java
@@ -38,21 +38,6 @@ import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
-import jakarta.jms.CompletionListener;
-import jakarta.jms.Connection;
-import jakarta.jms.DeliveryMode;
-import jakarta.jms.Destination;
-import jakarta.jms.JMSException;
-import jakarta.jms.Message;
-import jakarta.jms.MessageConsumer;
-import jakarta.jms.MessageNotWriteableException;
-import jakarta.jms.MessageProducer;
-import jakarta.jms.Queue;
-import jakarta.jms.Session;
-import jakarta.jms.TemporaryQueue;
-import jakarta.jms.TemporaryTopic;
-import jakarta.jms.Topic;
-
import org.apache.qpid.jms.JmsClientProperties;
import org.apache.qpid.jms.JmsConnection;
import org.apache.qpid.jms.provider.amqp.message.AmqpDestinationHelper;
@@ -80,6 +65,21 @@ import org.apache.qpid.proton.amqp.UnsignedLong;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
+import jakarta.jms.CompletionListener;
+import jakarta.jms.Connection;
+import jakarta.jms.DeliveryMode;
+import jakarta.jms.Destination;
+import jakarta.jms.JMSException;
+import jakarta.jms.Message;
+import jakarta.jms.MessageConsumer;
+import jakarta.jms.MessageNotWriteableException;
+import jakarta.jms.MessageProducer;
+import jakarta.jms.Queue;
+import jakarta.jms.Session;
+import jakarta.jms.TemporaryQueue;
+import jakarta.jms.TemporaryTopic;
+import jakarta.jms.Topic;
+
public class MessageIntegrationTest extends QpidJmsTestCase
{
private static final String NULL_STRING_PROP = "nullStringProperty";
@@ -2231,7 +2231,7 @@ public class MessageIntegrationTest extends
QpidJmsTestCase
@Timeout(20)
public void testAsyncCompletionSendMarksMessageReadOnly() throws Exception
{
try(TestAmqpPeer testPeer = new TestAmqpPeer();) {
- JmsConnection connection = (JmsConnection)
testFixture.establishConnecton(testPeer);
+ JmsConnection connection = (JmsConnection)
testFixture.establishConnecton(testPeer, "jms.closeTimeout=50");
connection.setSendTimeout(15000);
testPeer.expectBegin();
diff --git
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ObjectMessageIntegrationTest.java
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ObjectMessageIntegrationTest.java
index 0585b87b..b5036dc3 100644
---
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ObjectMessageIntegrationTest.java
+++
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ObjectMessageIntegrationTest.java
@@ -35,19 +35,6 @@ import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
-import jakarta.jms.CompletionListener;
-import jakarta.jms.Connection;
-import jakarta.jms.DeliveryMode;
-import jakarta.jms.JMSException;
-import jakarta.jms.Message;
-import jakarta.jms.MessageConsumer;
-import jakarta.jms.MessageFormatException;
-import jakarta.jms.MessageNotWriteableException;
-import jakarta.jms.MessageProducer;
-import jakarta.jms.ObjectMessage;
-import jakarta.jms.Queue;
-import jakarta.jms.Session;
-
import org.apache.qpid.jms.JmsConnection;
import org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport;
import org.apache.qpid.jms.test.QpidJmsTestCase;
@@ -70,6 +57,19 @@ import org.junit.jupiter.api.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import jakarta.jms.CompletionListener;
+import jakarta.jms.Connection;
+import jakarta.jms.DeliveryMode;
+import jakarta.jms.JMSException;
+import jakarta.jms.Message;
+import jakarta.jms.MessageConsumer;
+import jakarta.jms.MessageFormatException;
+import jakarta.jms.MessageNotWriteableException;
+import jakarta.jms.MessageProducer;
+import jakarta.jms.ObjectMessage;
+import jakarta.jms.Queue;
+import jakarta.jms.Session;
+
public class ObjectMessageIntegrationTest extends QpidJmsTestCase {
private static final Logger LOG =
LoggerFactory.getLogger(ObjectMessageIntegrationTest.class);
@@ -660,6 +660,7 @@ public class ObjectMessageIntegrationTest extends
QpidJmsTestCase {
try(TestAmqpPeer testPeer = new TestAmqpPeer();) {
JmsConnection connection = (JmsConnection)
testFixture.establishConnecton(testPeer);
connection.setSendTimeout(15000);
+ connection.setCloseTimeout(10);
testPeer.expectBegin();
diff --git
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
index df9087cb..6a5e420e 100644
---
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
+++
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
@@ -51,21 +51,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.LockSupport;
-import jakarta.jms.BytesMessage;
-import jakarta.jms.CompletionListener;
-import jakarta.jms.Connection;
-import jakarta.jms.DeliveryMode;
-import jakarta.jms.ExceptionListener;
-import jakarta.jms.IllegalStateException;
-import jakarta.jms.JMSException;
-import jakarta.jms.Message;
-import jakarta.jms.MessageProducer;
-import jakarta.jms.Queue;
-import jakarta.jms.ResourceAllocationException;
-import jakarta.jms.Session;
-import jakarta.jms.TextMessage;
-import jakarta.jms.Topic;
-
import org.apache.qpid.jms.JmsConnection;
import org.apache.qpid.jms.JmsConnectionFactory;
import org.apache.qpid.jms.JmsDefaultConnectionListener;
@@ -103,6 +88,21 @@ import org.junit.jupiter.api.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import jakarta.jms.BytesMessage;
+import jakarta.jms.CompletionListener;
+import jakarta.jms.Connection;
+import jakarta.jms.DeliveryMode;
+import jakarta.jms.ExceptionListener;
+import jakarta.jms.IllegalStateException;
+import jakarta.jms.JMSException;
+import jakarta.jms.Message;
+import jakarta.jms.MessageProducer;
+import jakarta.jms.Queue;
+import jakarta.jms.ResourceAllocationException;
+import jakarta.jms.Session;
+import jakarta.jms.TextMessage;
+import jakarta.jms.Topic;
+
public class ProducerIntegrationTest extends QpidJmsTestCase {
private static final Logger LOG =
LoggerFactory.getLogger(ProducerIntegrationTest.class);
@@ -2322,7 +2322,7 @@ public class ProducerIntegrationTest extends
QpidJmsTestCase {
@Timeout(20)
public void testAsyncCompletionGetsNotifiedWhenSessionClosed() throws
Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
- JmsConnection connection = (JmsConnection)
testFixture.establishConnecton(testPeer);
+ JmsConnection connection = (JmsConnection)
testFixture.establishConnecton(testPeer, "jms.closeTimeout=100");
testPeer.expectBegin();
testPeer.expectSenderAttach();
@@ -2343,6 +2343,8 @@ public class ProducerIntegrationTest extends
QpidJmsTestCase {
producer.send(message, listener);
+ assertFalse(listener.hasCompleted()); // Close should complete it
as failed on timeout
+
session.close();
assertTrue(listener.awaitCompletion(5, TimeUnit.SECONDS), "Did not
get async callback");
@@ -2356,11 +2358,51 @@ public class ProducerIntegrationTest extends
QpidJmsTestCase {
}
}
+ @Test
+ @Timeout(20)
+ public void
testAsyncCompletionGetsNotifiedWhenSessionClosedAndWaitForCompletion() throws
Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ JmsConnection connection = (JmsConnection)
testFixture.establishConnecton(testPeer, "jms.closeTimeout=1000");
+
+ testPeer.expectBegin();
+ testPeer.expectSenderAttach();
+
+ Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue("myQueue");
+
+ MessageProducer producer = session.createProducer(queue);
+
+ // Create and transfer a new message
+ String text = "myMessage";
+ testPeer.expectTransfer(new TransferPayloadCompositeMatcher(),
nullValue(), false, true, new Accepted(), true, 0, 100);
+ testPeer.expectEnd();
+ testPeer.expectClose();
+
+ TextMessage message = session.createTextMessage(text);
+ TestJmsCompletionListener listener = new
TestJmsCompletionListener();
+
+ producer.send(message, listener);
+
+ assertFalse(listener.hasCompleted()); // Close should complete it
as accepted after the delay
+
+ session.close();
+
+ assertTrue(listener.awaitCompletion(5, TimeUnit.SECONDS), "Did not
get async callback");
+ assertNull(listener.exception);
+ assertNotNull(listener.message);
+ assertTrue(listener.message instanceof TextMessage);
+
+ connection.close();
+
+ testPeer.waitForAllHandlersToComplete(2000);
+ }
+ }
+
@Test
@Timeout(20)
public void testAsyncCompletionGetsNotifiedWhenConnectionClosed() throws
Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
- JmsConnection connection = (JmsConnection)
testFixture.establishConnecton(testPeer);
+ JmsConnection connection = (JmsConnection)
testFixture.establishConnecton(testPeer, "jms.closeTimeout=150");
testPeer.expectBegin();
testPeer.expectSenderAttach();
@@ -2380,6 +2422,8 @@ public class ProducerIntegrationTest extends
QpidJmsTestCase {
producer.send(message, listener);
+ assertFalse(listener.hasCompleted());
+
connection.close();
assertTrue(listener.awaitCompletion(5, TimeUnit.SECONDS), "Did not
get async callback");
@@ -2391,6 +2435,43 @@ public class ProducerIntegrationTest extends
QpidJmsTestCase {
}
}
+ @Test
+ @Timeout(20)
+ public void
testAsyncCompletionAllowedToCompleteNormallyWhenConnectionClosed() throws
Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ JmsConnection connection = (JmsConnection)
testFixture.establishConnecton(testPeer, "jms.closeTimeout=1000");
+
+ testPeer.expectBegin();
+ testPeer.expectSenderAttach();
+
+ Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue("myQueue");
+
+ MessageProducer producer = session.createProducer(queue);
+
+ // Create and transfer a new message
+ String text = "myMessage";
+ testPeer.expectTransfer(new TransferPayloadCompositeMatcher(),
nullValue(), false, true, new Accepted(), true, 0, 100);
+ testPeer.expectClose();
+
+ TextMessage message = session.createTextMessage(text);
+ TestJmsCompletionListener listener = new
TestJmsCompletionListener();
+
+ producer.send(message, listener);
+
+ assertFalse(listener.hasCompleted());
+
+ connection.close();
+
+ assertTrue(listener.awaitCompletion(5, TimeUnit.SECONDS), "Did not
get async callback");
+ assertNull(listener.exception);
+ assertNotNull(listener.message);
+ assertTrue(listener.message instanceof TextMessage);
+
+ testPeer.waitForAllHandlersToComplete(2000);
+ }
+ }
+
@Test
@Timeout(20)
public void testAsyncCompletionResetsBytesMessage() throws Exception {
@@ -2856,7 +2937,7 @@ public class ProducerIntegrationTest extends
QpidJmsTestCase {
@Timeout(20)
public void
testRemotelyCloseOneProducerDoesNotCompleteAsyncSendFromAnotherProducer()
throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
- JmsConnection connection = (JmsConnection)
testFixture.establishConnecton(testPeer);
+ JmsConnection connection = (JmsConnection)
testFixture.establishConnecton(testPeer, "jms.closeTimeout=150");
testPeer.expectBegin();
testPeer.expectSenderAttach();
@@ -2895,7 +2976,7 @@ public class ProducerIntegrationTest extends
QpidJmsTestCase {
assertFalse(listener.awaitCompletion(10, TimeUnit.MILLISECONDS),
"Should not get async callback");
- // Closing the session should complete the send with an exception
+ // Closing the session should complete the send with an exception
after timeout
testPeer.expectEnd();
session.close();
@@ -3044,6 +3125,10 @@ public class ProducerIntegrationTest extends
QpidJmsTestCase {
this.completed = new CountDownLatch(expected);
}
+ public boolean hasCompleted() {
+ return completed.getCount() == 0;
+ }
+
public boolean awaitCompletion(long timeout, TimeUnit units) throws
InterruptedException {
return completed.await(timeout, units);
}
@@ -3239,4 +3324,74 @@ public class ProducerIntegrationTest extends
QpidJmsTestCase {
testPeer.waitForAllHandlersToComplete(1000);
}
}
+
+ @Test
+ @Timeout(20)
+ public void testRemotelyEndConnectionCompletesAsyncSends() throws
Exception {
+ final String BREAD_CRUMB = "ErrorMessage";
+
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ final CountDownLatch connectionClosed = new CountDownLatch(1);
+ JmsConnection connection = (JmsConnection)
testFixture.establishConnecton(testPeer);
+ connection.addConnectionListener(new
JmsDefaultConnectionListener() {
+ @Override
+ public void onConnectionFailure(Throwable exception) {
+ connectionClosed.countDown();
+ }
+ });
+
+ testPeer.expectBegin();
+ Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+
+ // Create a producer, then remotely end the session afterwards.
+ testPeer.expectSenderAttach();
+
+ Queue queue = session.createQueue("myQueue");
+ final MessageProducer producer = session.createProducer(queue);
+
+ final int MSG_COUNT = 3;
+
+ for (int i = 0; i < MSG_COUNT; ++i) {
+ testPeer.expectTransferButDoNotRespond(new
TransferPayloadCompositeMatcher());
+ }
+
+ TestJmsCompletionListener listener = new
TestJmsCompletionListener(MSG_COUNT);
+ try {
+ for (int i = 0; i < MSG_COUNT; ++i) {
+ Message message = session.createTextMessage("content");
+ producer.send(message, listener);
+ }
+ } catch (JMSException e) {
+ LOG.warn("Caught unexpected error: {}", e.getMessage());
+ fail("No expected exception for this send.");
+ }
+
+ testPeer.waitForAllHandlersToComplete(2000);
+ testPeer.expectSenderAttach();
+ testPeer.remotelyCloseConnection(true, AmqpError.RESOURCE_DELETED,
BREAD_CRUMB, 50);
+
+ session.createProducer(queue);
+
+ // Verify the session gets marked closed
+ assertTrue(connectionClosed.await(5, TimeUnit.SECONDS), "Session
closed callback didn't trigger");
+
+ try {
+ producer.getDeliveryMode();
+ fail("Expected ISE to be thrown due to being closed");
+ } catch (IllegalStateException jmsise) {
+ String errorMessage = jmsise.getCause().getMessage();
+
assertTrue(errorMessage.contains(AmqpError.RESOURCE_DELETED.toString()));
+ assertTrue(errorMessage.contains(BREAD_CRUMB));
+ }
+
+ assertTrue(listener.awaitCompletion(5, TimeUnit.SECONDS));
+ assertEquals(MSG_COUNT, listener.errorCount); // All sends should
have been failed
+
+ // Try closing it explicitly, should effectively no-op in client.
+ // The test peer will throw during close if it sends anything.
+ connection.close();
+
+ testPeer.waitForAllHandlersToComplete(1000);
+ }
+ }
}
diff --git
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
index c8c13df4..472bcd39 100644
---
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
+++
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
@@ -41,27 +41,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
-import jakarta.jms.CompletionListener;
-import jakarta.jms.Connection;
-import jakarta.jms.Destination;
-import jakarta.jms.IllegalStateException;
-import jakarta.jms.InvalidDestinationException;
-import jakarta.jms.InvalidSelectorException;
-import jakarta.jms.JMSException;
-import jakarta.jms.JMSSecurityException;
-import jakarta.jms.Message;
-import jakarta.jms.MessageConsumer;
-import jakarta.jms.MessageListener;
-import jakarta.jms.MessageProducer;
-import jakarta.jms.Queue;
-import jakarta.jms.QueueBrowser;
-import jakarta.jms.Session;
-import jakarta.jms.TemporaryQueue;
-import jakarta.jms.TemporaryTopic;
-import jakarta.jms.TextMessage;
-import jakarta.jms.Topic;
-import jakarta.jms.TopicSubscriber;
-
import org.apache.qpid.jms.JmsConnection;
import org.apache.qpid.jms.JmsDefaultConnectionListener;
import org.apache.qpid.jms.JmsOperationTimedOutException;
@@ -101,6 +80,27 @@ import org.junit.jupiter.api.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import jakarta.jms.CompletionListener;
+import jakarta.jms.Connection;
+import jakarta.jms.Destination;
+import jakarta.jms.IllegalStateException;
+import jakarta.jms.InvalidDestinationException;
+import jakarta.jms.InvalidSelectorException;
+import jakarta.jms.JMSException;
+import jakarta.jms.JMSSecurityException;
+import jakarta.jms.Message;
+import jakarta.jms.MessageConsumer;
+import jakarta.jms.MessageListener;
+import jakarta.jms.MessageProducer;
+import jakarta.jms.Queue;
+import jakarta.jms.QueueBrowser;
+import jakarta.jms.Session;
+import jakarta.jms.TemporaryQueue;
+import jakarta.jms.TemporaryTopic;
+import jakarta.jms.TextMessage;
+import jakarta.jms.Topic;
+import jakarta.jms.TopicSubscriber;
+
public class SessionIntegrationTest extends QpidJmsTestCase {
private static final Logger LOG =
LoggerFactory.getLogger(SessionIntegrationTest.class);
@@ -2333,6 +2333,10 @@ public class SessionIntegrationTest extends
QpidJmsTestCase {
completed = new CountDownLatch(expected);
}
+ public boolean hasCompleted() {
+ return completed.getCount() == 0;
+ }
+
public boolean awaitCompletion(long timeout, TimeUnit units) throws
InterruptedException {
return completed.await(timeout, units);
}
@@ -2836,6 +2840,82 @@ public class SessionIntegrationTest extends
QpidJmsTestCase {
}
}
+ @Test
+ @Timeout(20)
+ public void testRemotelyEndSessionCompletesAsyncSends() throws Exception {
+ final String BREAD_CRUMB = "ErrorMessage";
+
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ final CountDownLatch sessionClosed = new CountDownLatch(1);
+ JmsConnection connection = (JmsConnection)
testFixture.establishConnecton(testPeer);
+ connection.addConnectionListener(new
JmsDefaultConnectionListener() {
+ @Override
+ public void onSessionClosed(Session session, Throwable
exception) {
+ sessionClosed.countDown();
+ }
+ });
+
+ testPeer.expectBegin();
+ Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+
+ // Create a producer, then remotely end the session afterwards.
+ testPeer.expectSenderAttach();
+
+ Queue queue = session.createQueue("myQueue");
+ final MessageProducer producer = session.createProducer(queue);
+
+ final int MSG_COUNT = 3;
+
+ for (int i = 0; i < MSG_COUNT; ++i) {
+ testPeer.expectTransferButDoNotRespond(new
TransferPayloadCompositeMatcher());
+ }
+
+ TestJmsCompletionListener listener = new
TestJmsCompletionListener(MSG_COUNT);
+ try {
+ for (int i = 0; i < MSG_COUNT; ++i) {
+ Message message = session.createTextMessage("content");
+ producer.send(message, listener);
+ }
+ } catch (JMSException e) {
+ LOG.warn("Caught unexpected error: {}", e.getMessage());
+ fail("No expected exception for this send.");
+ }
+
+ testPeer.waitForAllHandlersToComplete(2000);
+
+ assertFalse(listener.hasCompleted());
+
+ testPeer.expectSenderAttach();
+ testPeer.remotelyEndLastOpenedSession(true, 50,
AmqpError.RESOURCE_DELETED, BREAD_CRUMB);
+
+ session.createProducer(queue);
+
+ // Verify the session gets marked closed
+ assertTrue(sessionClosed.await(5, TimeUnit.SECONDS), "Session
closed callback didn't trigger");
+
+ try {
+ producer.getDeliveryMode();
+ fail("Expected ISE to be thrown due to being closed");
+ } catch (IllegalStateException jmsise) {
+ String errorMessage = jmsise.getCause().getMessage();
+
assertTrue(errorMessage.contains(AmqpError.RESOURCE_DELETED.toString()));
+ assertTrue(errorMessage.contains(BREAD_CRUMB));
+ }
+
+ assertTrue(listener.awaitCompletion(5, TimeUnit.SECONDS));
+ assertEquals(MSG_COUNT, listener.errorCount); // All sends should
have been failed
+
+ // Try closing it explicitly, should effectively no-op in client.
+ // The test peer will throw during close if it sends anything.
+ session.close();
+
+ testPeer.expectClose();
+ connection.close();
+
+ testPeer.waitForAllHandlersToComplete(1000);
+ }
+ }
+
private boolean verifyConsumerClosure(final String BREAD_CRUMB, final
MessageConsumer consumer) throws Exception {
return Wait.waitFor(new Wait.Condition() {
@Override
diff --git
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/StreamMessageIntegrationTest.java
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/StreamMessageIntegrationTest.java
index 3fb47dff..0589a268 100644
---
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/StreamMessageIntegrationTest.java
+++
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/StreamMessageIntegrationTest.java
@@ -29,18 +29,6 @@ import static org.junit.jupiter.api.Assertions.fail;
import java.util.ArrayList;
import java.util.List;
-import jakarta.jms.CompletionListener;
-import jakarta.jms.Connection;
-import jakarta.jms.DeliveryMode;
-import jakarta.jms.Message;
-import jakarta.jms.MessageConsumer;
-import jakarta.jms.MessageFormatException;
-import jakarta.jms.MessageNotWriteableException;
-import jakarta.jms.MessageProducer;
-import jakarta.jms.Queue;
-import jakarta.jms.Session;
-import jakarta.jms.StreamMessage;
-
import org.apache.qpid.jms.JmsConnection;
import org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport;
import org.apache.qpid.jms.test.QpidJmsTestCase;
@@ -57,6 +45,18 @@ import org.apache.qpid.proton.amqp.DescribedType;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
+import jakarta.jms.CompletionListener;
+import jakarta.jms.Connection;
+import jakarta.jms.DeliveryMode;
+import jakarta.jms.Message;
+import jakarta.jms.MessageConsumer;
+import jakarta.jms.MessageFormatException;
+import jakarta.jms.MessageNotWriteableException;
+import jakarta.jms.MessageProducer;
+import jakarta.jms.Queue;
+import jakarta.jms.Session;
+import jakarta.jms.StreamMessage;
+
public class StreamMessageIntegrationTest extends QpidJmsTestCase {
private final IntegrationTestFixture testFixture = new
IntegrationTestFixture();
@@ -439,6 +439,7 @@ public class StreamMessageIntegrationTest extends
QpidJmsTestCase {
try(TestAmqpPeer testPeer = new TestAmqpPeer();) {
JmsConnection connection = (JmsConnection)
testFixture.establishConnecton(testPeer);
connection.setSendTimeout(15000);
+ connection.setSendTimeout(15);
testPeer.expectBegin();
diff --git
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TextMessageIntegrationTest.java
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TextMessageIntegrationTest.java
index e4a55dbf..52f861f4 100644
---
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TextMessageIntegrationTest.java
+++
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/TextMessageIntegrationTest.java
@@ -28,19 +28,6 @@ import static org.junit.jupiter.api.Assertions.fail;
import java.io.IOException;
-import jakarta.jms.CompletionListener;
-import jakarta.jms.Connection;
-import jakarta.jms.DeliveryMode;
-import jakarta.jms.JMSException;
-import jakarta.jms.Message;
-import jakarta.jms.MessageConsumer;
-import jakarta.jms.MessageFormatException;
-import jakarta.jms.MessageNotWriteableException;
-import jakarta.jms.MessageProducer;
-import jakarta.jms.Queue;
-import jakarta.jms.Session;
-import jakarta.jms.TextMessage;
-
import org.apache.qpid.jms.JmsConnection;
import org.apache.qpid.jms.test.QpidJmsTestCase;
import org.apache.qpid.jms.test.testpeer.TestAmqpPeer;
@@ -58,6 +45,19 @@ import org.apache.qpid.proton.amqp.Symbol;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
+import jakarta.jms.CompletionListener;
+import jakarta.jms.Connection;
+import jakarta.jms.DeliveryMode;
+import jakarta.jms.JMSException;
+import jakarta.jms.Message;
+import jakarta.jms.MessageConsumer;
+import jakarta.jms.MessageFormatException;
+import jakarta.jms.MessageNotWriteableException;
+import jakarta.jms.MessageProducer;
+import jakarta.jms.Queue;
+import jakarta.jms.Session;
+import jakarta.jms.TextMessage;
+
public class TextMessageIntegrationTest extends QpidJmsTestCase {
private final IntegrationTestFixture testFixture = new
IntegrationTestFixture();
@@ -447,6 +447,7 @@ public class TextMessageIntegrationTest extends
QpidJmsTestCase {
try(TestAmqpPeer testPeer = new TestAmqpPeer();) {
JmsConnection connection = (JmsConnection)
testFixture.establishConnecton(testPeer);
connection.setSendTimeout(15000);
+ connection.setCloseTimeout(15);
testPeer.expectBegin();
diff --git
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java
index 53b58bb4..b7393b74 100644
---
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java
+++
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java
@@ -45,28 +45,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
-import jakarta.jms.CompletionListener;
-import jakarta.jms.Connection;
-import jakarta.jms.ConnectionFactory;
-import jakarta.jms.ExceptionListener;
-import jakarta.jms.IllegalStateException;
-import jakarta.jms.InvalidDestinationException;
-import jakarta.jms.JMSException;
-import jakarta.jms.JMSSecurityException;
-import jakarta.jms.Message;
-import jakarta.jms.MessageConsumer;
-import jakarta.jms.MessageListener;
-import jakarta.jms.MessageProducer;
-import jakarta.jms.Queue;
-import jakarta.jms.QueueBrowser;
-import jakarta.jms.ResourceAllocationException;
-import jakarta.jms.ServerSessionPool;
-import jakarta.jms.Session;
-import jakarta.jms.TemporaryTopic;
-import jakarta.jms.TextMessage;
-import jakarta.jms.Topic;
-import jakarta.jms.TransactionRolledBackException;
-
import org.apache.qpid.jms.JmsConnection;
import org.apache.qpid.jms.JmsConnectionExtensions;
import org.apache.qpid.jms.JmsConnectionFactory;
@@ -113,6 +91,28 @@ import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import jakarta.jms.CompletionListener;
+import jakarta.jms.Connection;
+import jakarta.jms.ConnectionFactory;
+import jakarta.jms.ExceptionListener;
+import jakarta.jms.IllegalStateException;
+import jakarta.jms.InvalidDestinationException;
+import jakarta.jms.JMSException;
+import jakarta.jms.JMSSecurityException;
+import jakarta.jms.Message;
+import jakarta.jms.MessageConsumer;
+import jakarta.jms.MessageListener;
+import jakarta.jms.MessageProducer;
+import jakarta.jms.Queue;
+import jakarta.jms.QueueBrowser;
+import jakarta.jms.ResourceAllocationException;
+import jakarta.jms.ServerSessionPool;
+import jakarta.jms.Session;
+import jakarta.jms.TemporaryTopic;
+import jakarta.jms.TextMessage;
+import jakarta.jms.Topic;
+import jakarta.jms.TransactionRolledBackException;
+
public class FailoverIntegrationTest extends QpidJmsTestCase {
private static final Logger LOG =
LoggerFactory.getLogger(FailoverIntegrationTest.class);
@@ -4558,6 +4558,69 @@ public class FailoverIntegrationTest extends
QpidJmsTestCase {
}
}
+ @Test
+ @Timeout(20)
+ public void testFailoverDoesFailPendingAsyncCompletionSend() throws
Exception {
+ try (TestAmqpPeer originalPeer = new TestAmqpPeer();
+ TestAmqpPeer finalPeer = new TestAmqpPeer();) {
+
+ // Create a peer to connect to, then one to reconnect to
+ final String originalURI = createPeerURI(originalPeer);
+ final String finalURI = createPeerURI(finalPeer);
+
+ LOG.info("Original peer is at: {}", originalURI);
+ LOG.info("Final peer is at: {}", finalURI);
+
+ originalPeer.expectSaslAnonymous();
+ originalPeer.expectOpen();
+ originalPeer.expectBegin();
+ originalPeer.expectBegin();
+ originalPeer.expectSenderAttach();
+ originalPeer.expectTransferButDoNotRespond(new
TransferPayloadCompositeMatcher());
+ originalPeer.dropAfterLastHandler(15); // Wait for sender to get
into wait state
+
+ // --- Post Failover Expectations of sender --- //
+ finalPeer.expectSaslAnonymous();
+ finalPeer.expectOpen();
+ finalPeer.expectBegin();
+ finalPeer.expectBegin();
+ finalPeer.expectSenderAttach();
+
+ final JmsConnection connection =
establishAnonymousConnecton("failover.initialReconnectDelay=25", originalPeer,
finalPeer);
+
+ Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue("myQueue");
+
+ MessageProducer producer = session.createProducer(queue);
+
+ // Create and transfer a new message
+ String text = "myMessage";
+
+ TextMessage message = session.createTextMessage(text);
+ TestJmsCompletionListener listener = new
TestJmsCompletionListener();
+
+ try {
+ producer.send(message, listener);
+ } catch (JMSException jmsEx) {
+ fail("Should not have failed the async completion send.");
+ }
+
+ // This should fire after reconnect without an error, if it fires
with an error at
+ // any time then something is wrong.
+ assertTrue(listener.awaitCompletion(5, TimeUnit.SECONDS), "Did not
get async callback");
+ assertNotNull(listener.exception, "Completion should have been due
to error");
+ assertNotNull(listener.message);
+ assertTrue(listener.message instanceof TextMessage);
+
+ finalPeer.waitForAllHandlersToComplete(5000);
+ finalPeer.expectClose();
+
+ connection.close();
+
+ finalPeer.waitForAllHandlersToComplete(5000);
+ }
+ }
+
@Test
@Timeout(20)
public void testFailoverHandlesAnonymousFallbackWaitingForClose() throws
Exception {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]