This is an automated email from the ASF dual-hosted git repository.
jbonofre pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq.git
The following commit(s) were added to refs/heads/main by this push:
new 1f3db074a7 feat(#1739): Add requestTimeout to ConnectionFactory to
prevent hanging threads in syncSendPacket() (#1740)
1f3db074a7 is described below
commit 1f3db074a7a64e52cfd7501f3403434888cc3e58
Author: JB Onofré <[email protected]>
AuthorDate: Fri Mar 13 15:53:27 2026 +0100
feat(#1739): Add requestTimeout to ConnectionFactory to prevent hanging
threads in syncSendPacket() (#1740)
* Add requestTimeout to ConnectionFactory to prevent hanging threads in
syncSendPacket()
When the broker becomes unreachable without the TCP connection being
properly closed (network partition, half-open connection),
FutureResponse.getResult() calls ArrayBlockingQueue.take() which blocks
indefinitely, causing threads to hang forever.
Introduce a configurable requestTimeout (default 0, no timeout) on
ActiveMQConnectionFactory and ActiveMQConnection, similar to the
existing sendTimeout. When set, syncSendPacket(Command) uses
poll(timeout) instead of take(), throwing RequestTimedOutIOException
when the timeout expires.
Can be configured programmatically via factory.setRequestTimeout(ms)
or via URL parameter jms.requestTimeout=ms.
* Add consumer creation in without timeout test
* Fix flaky testSyncSendPacketFailFromTimeout by using BrokerPlugin to
delay responses
Use a BrokerFilter that delays addConsumer for the test queue by 5s,
making the 500ms requestTimeout fire deterministically instead of
relying on a 1ms race condition.
---
.../org/apache/activemq/ActiveMQConnection.java | 17 +-
.../apache/activemq/ActiveMQConnectionFactory.java | 17 ++
.../org/apache/activemq/TransactionContext.java | 2 +-
.../apache/activemq/SyncSendPacketTimeoutTest.java | 194 +++++++++++++++++++++
4 files changed, 228 insertions(+), 2 deletions(-)
diff --git
a/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java
b/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java
index dde17b7fec..975df6f7a6 100644
--- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java
+++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java
@@ -164,6 +164,7 @@ public class ActiveMQConnection implements Connection,
TopicConnection, QueueCon
private boolean watchTopicAdvisories = true;
private long warnAboutUnstartedConnectionTimeout = 500L;
private int sendTimeout =0;
+ private int requestTimeout =0;
private boolean sendAcksAsync=true;
private boolean checkForDuplicates = true;
private boolean queueOnlyConnection = false;
@@ -1526,7 +1527,7 @@ public class ActiveMQConnection implements Connection,
TopicConnection, QueueCon
* @throws JMSException
*/
public Response syncSendPacket(Command command) throws JMSException {
- return syncSendPacket(command, 0);
+ return syncSendPacket(command, requestTimeout);
}
/**
@@ -1855,6 +1856,20 @@ public class ActiveMQConnection implements Connection,
TopicConnection, QueueCon
this.sendTimeout = sendTimeout;
}
+ /**
+ * @return the requestTimeout (in milliseconds)
+ */
+ public int getRequestTimeout() {
+ return requestTimeout;
+ }
+
+ /**
+ * @param requestTimeout the requestTimeout to set (in milliseconds)
+ */
+ public void setRequestTimeout(int requestTimeout) {
+ this.requestTimeout = requestTimeout;
+ }
+
/**
* @return the sendAcksAsync
*/
diff --git
a/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
b/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
index b36f4a0d9d..cb9947a910 100644
---
a/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
+++
b/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
@@ -147,6 +147,7 @@ public class ActiveMQConnectionFactory extends
JNDIBaseStorable implements Conne
private int producerWindowSize = DEFAULT_PRODUCER_WINDOW_SIZE;
private long warnAboutUnstartedConnectionTimeout = 500L;
private int sendTimeout = 0;
+ private int requestTimeout = 0;
private int connectResponseTimeout = 0;
private boolean sendAcksAsync=true;
private TransportListener transportListener;
@@ -444,6 +445,7 @@ public class ActiveMQConnectionFactory extends
JNDIBaseStorable implements Conne
connection.setProducerWindowSize(getProducerWindowSize());
connection.setWarnAboutUnstartedConnectionTimeout(getWarnAboutUnstartedConnectionTimeout());
connection.setSendTimeout(getSendTimeout());
+ connection.setRequestTimeout(getRequestTimeout());
connection.setCloseTimeout(getCloseTimeout());
connection.setSendAcksAsync(isSendAcksAsync());
connection.setAuditDepth(getAuditDepth());
@@ -748,6 +750,20 @@ public class ActiveMQConnectionFactory extends
JNDIBaseStorable implements Conne
this.sendTimeout = sendTimeout;
}
+ /**
+ * @return the requestTimeout (in milliseconds)
+ */
+ public int getRequestTimeout() {
+ return requestTimeout;
+ }
+
+ /**
+ * @param requestTimeout the requestTimeout to set (in milliseconds)
+ */
+ public void setRequestTimeout(int requestTimeout) {
+ this.requestTimeout = requestTimeout;
+ }
+
/**
* @return the sendAcksAsync
*/
@@ -874,6 +890,7 @@ public class ActiveMQConnectionFactory extends
JNDIBaseStorable implements Conne
props.setProperty("alwaysSyncSend",
Boolean.toString(isAlwaysSyncSend()));
props.setProperty("producerWindowSize",
Integer.toString(getProducerWindowSize()));
props.setProperty("sendTimeout", Integer.toString(getSendTimeout()));
+ props.setProperty("requestTimeout",
Integer.toString(getRequestTimeout()));
props.setProperty("connectResponseTimeout",
Integer.toString(getConnectResponseTimeout()));
props.setProperty("sendAcksAsync",Boolean.toString(isSendAcksAsync()));
props.setProperty("auditDepth", Integer.toString(getAuditDepth()));
diff --git
a/activemq-client/src/main/java/org/apache/activemq/TransactionContext.java
b/activemq-client/src/main/java/org/apache/activemq/TransactionContext.java
index 652eeba3b1..4166f35af5 100644
--- a/activemq-client/src/main/java/org/apache/activemq/TransactionContext.java
+++ b/activemq-client/src/main/java/org/apache/activemq/TransactionContext.java
@@ -281,7 +281,7 @@ public class TransactionContext implements XAResource {
TransactionInfo info = new TransactionInfo(getConnectionId(),
transactionId, TransactionInfo.ROLLBACK);
this.transactionId = null;
//make this synchronous - see
https://issues.apache.org/activemq/browse/AMQ-2364
- this.connection.syncSendPacket(info, this.connection.isClosing() ?
this.connection.getCloseTimeout() : 0);
+ this.connection.syncSendPacket(info, this.connection.isClosing() ?
this.connection.getCloseTimeout() : this.connection.getRequestTimeout());
// Notify the listener that the tx was rolled back
if (localTransactionEventListener != null) {
localTransactionEventListener.rollbackEvent();
diff --git
a/activemq-unit-tests/src/test/java/org/apache/activemq/SyncSendPacketTimeoutTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/SyncSendPacketTimeoutTest.java
new file mode 100644
index 0000000000..fb2a13f8df
--- /dev/null
+++
b/activemq-unit-tests/src/test/java/org/apache/activemq/SyncSendPacketTimeoutTest.java
@@ -0,0 +1,194 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
+
+import jakarta.jms.JMSException;
+import jakarta.jms.MessageConsumer;
+import jakarta.jms.Session;
+
+import org.apache.activemq.broker.Broker;
+import org.apache.activemq.broker.BrokerFilter;
+import org.apache.activemq.broker.BrokerPlugin;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.broker.region.Subscription;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.transport.RequestTimedOutIOException;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Tests that {@link
ActiveMQConnection#syncSendPacket(org.apache.activemq.command.Command)}
+ * applies the configured {@code requestTimeout}.
+ */
+public class SyncSendPacketTimeoutTest {
+
+ private BrokerService broker;
+ private String brokerUrl;
+
+ @Before
+ public void setUp() throws Exception {
+ broker = new BrokerService();
+ broker.setPersistent(false);
+ broker.setUseJmx(false);
+ broker.addConnector("tcp://localhost:0");
+ broker.start();
+ broker.waitUntilStarted();
+ brokerUrl =
broker.getTransportConnectors().get(0).getPublishableConnectString();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ if (broker != null) {
+ broker.stop();
+ broker.waitUntilStopped();
+ }
+ }
+
+ @Test
+ public void testRequestTimeoutDefaultIsZero() throws Exception {
+ ActiveMQConnectionFactory factory = new
ActiveMQConnectionFactory(brokerUrl);
+ try (ActiveMQConnection connection = (ActiveMQConnection)
factory.createConnection()) {
+ assertEquals("Default requestTimeout should be 0", 0,
connection.getRequestTimeout());
+ }
+ }
+
+ @Test
+ public void testRequestTimeoutConfiguredViaFactory() throws Exception {
+ ActiveMQConnectionFactory factory = new
ActiveMQConnectionFactory(brokerUrl);
+ factory.setRequestTimeout(5000);
+ try (ActiveMQConnection connection = (ActiveMQConnection)
factory.createConnection()) {
+ assertEquals("requestTimeout should be propagated from factory",
5000, connection.getRequestTimeout());
+ }
+ }
+
+ @Test
+ public void testSyncSendPacketSucceedsWithRequestTimeout() throws
Exception {
+ ActiveMQConnectionFactory factory = new
ActiveMQConnectionFactory(brokerUrl);
+ factory.setRequestTimeout(5000);
+ try (ActiveMQConnection connection = (ActiveMQConnection)
factory.createConnection()) {
+ connection.start();
+ // Creating a session triggers syncSendPacket internally — should
succeed within
+ // timeout
+ try (Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ // Creating a consumer triggers syncSendPacket internally
— should succeed
+ // within timeout
+ MessageConsumer consumer =
session.createConsumer(session.createQueue("TEST.QUEUE"))) {
+ assertNotNull("Session should be created successfully",
session);
+ assertNotNull("Consumer should be created successfully",
consumer);
+ }
+ }
+ }
+
+ @Test
+ public void testSyncSendPacketSucceedsWithoutRequestTimeout() throws
Exception {
+ ActiveMQConnectionFactory factory = new
ActiveMQConnectionFactory(brokerUrl);
+ // requestTimeout=0 means no timeout (default)
+ try (ActiveMQConnection connection = (ActiveMQConnection)
factory.createConnection()) {
+ connection.start();
+ try (Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ // Creating a consumer triggers syncSendPacket internally
— should succeed
+ // no timeout
+ MessageConsumer consumer =
session.createConsumer(session.createQueue("TEST.QUEUE"))) {
+ assertNotNull("Session should be created successfully with no
timeout", session);
+ assertNotNull("Consumer should be created successfully",
consumer);
+ }
+ }
+ }
+
+ @Test
+ public void testRequestTimeoutConfiguredViaUrl() throws Exception {
+ ActiveMQConnectionFactory factory = new
ActiveMQConnectionFactory(brokerUrl + "?jms.requestTimeout=3000");
+ try (ActiveMQConnection connection = (ActiveMQConnection)
factory.createConnection()) {
+ assertEquals("requestTimeout should be set via URL parameter",
3000, connection.getRequestTimeout());
+ }
+ }
+
+ @Test
+ public void testSyncSendPacketFailFromTimeout() throws Exception {
+ // Restart the broker with a plugin that delays addConsumer responses
+ broker.stop();
+ broker.waitUntilStopped();
+
+ broker = new BrokerService();
+ broker.setPersistent(false);
+ broker.setUseJmx(false);
+ broker.setPlugins(new BrokerPlugin[]{new BrokerPlugin() {
+ @Override
+ public Broker installPlugin(Broker broker) {
+ return new BrokerFilter(broker) {
+ @Override
+ public Subscription addConsumer(ConnectionContext context,
ConsumerInfo info) throws Exception {
+ // Only delay consumers on our test queue, not
advisory consumers
+ if
(info.getDestination().getPhysicalName().equals("test")) {
+ Thread.sleep(5000);
+ }
+ return super.addConsumer(context, info);
+ }
+ };
+ }
+ }});
+ broker.addConnector("tcp://localhost:0");
+ broker.start();
+ broker.waitUntilStarted();
+ brokerUrl =
broker.getTransportConnectors().get(0).getPublishableConnectString();
+
+ ActiveMQConnectionFactory factory = new
ActiveMQConnectionFactory(brokerUrl);
+ factory.setWatchTopicAdvisories(false);
+ factory.setRequestTimeout(500);
+ try (ActiveMQConnection connection = (ActiveMQConnection)
factory.createConnection()) {
+ connection.start();
+ Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+
+ Exception exception = null;
+ try {
+ session.createConsumer(session.createQueue("test"));
+ fail("Expected JMSException due to request timeout");
+ } catch (JMSException expected) {
+ exception = expected;
+ }
+ assertNotNull("Should have caught a JMSException", exception);
+ assertEquals(RequestTimedOutIOException.class,
+ TransportConnector.getRootCause(exception).getClass());
+ }
+ }
+
+ @Test
+ public void testSyncSendPacketOverrideDefaultRequestTimeout() throws
Exception {
+ ActiveMQConnectionFactory factory = new
ActiveMQConnectionFactory(brokerUrl);
+ try (ActiveMQConnection connection = (ActiveMQConnection)
factory.createConnection()) {
+ connection.start();
+ ActiveMQSession session = (ActiveMQSession)
connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ // After session creation set the timeout default to be very short
to test that
+ // overriding directly works
+ connection.setRequestTimeout(1);
+ ConsumerInfo info = new ConsumerInfo(session.getSessionInfo(),
+ session.getNextConsumerId().getValue());
+ info.setDestination(new ActiveMQQueue("test"));
+ // Send info packet with timeout override
+ assertNotNull("Consumer should be created successfully with no
timeout",
+ connection.syncSendPacket(info, 5000));
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact