This is an automated email from the ASF dual-hosted git repository.
jbonofre pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq.git
The following commit(s) were added to refs/heads/main by this push:
new b6037c720b [AMQ-9392] Prevent InactivityMonitor read check Timer leak
when TCP connection fails (#1119)
b6037c720b is described below
commit b6037c720bcdbea1821982d78794313a3cf50150
Author: Axel Sanguinetti <[email protected]>
AuthorDate: Sun Nov 2 11:21:49 2025 -0300
[AMQ-9392] Prevent InactivityMonitor read check Timer leak when TCP
connection fails (#1119)
* Adding test.
* Naive fix.
* Fixing OpenWireConnectionTimeoutTest.
- It seems checking configuredOk can't be done that early.
- Falling back to let the Timer be created and make sure it is disposed of.
---
.../transport/AbstractInactivityMonitor.java | 23 +++-
.../transport/tcp/InactivityMonitorTest.java | 119 ++++++++++++---------
2 files changed, 86 insertions(+), 56 deletions(-)
diff --git
a/activemq-client/src/main/java/org/apache/activemq/transport/AbstractInactivityMonitor.java
b/activemq-client/src/main/java/org/apache/activemq/transport/AbstractInactivityMonitor.java
index 6182be7c42..9db5fa1462 100644
---
a/activemq-client/src/main/java/org/apache/activemq/transport/AbstractInactivityMonitor.java
+++
b/activemq-client/src/main/java/org/apache/activemq/transport/AbstractInactivityMonitor.java
@@ -437,6 +437,17 @@ public abstract class AbstractInactivityMonitor extends
TransportFilter {
synchronized (AbstractInactivityMonitor.class) {
READ_CHECK_TIMER.purge();
CHECKER_COUNTER--;
+ if (CHECKER_COUNTER == 0) {
+ if (READ_CHECK_TIMER != null) {
+ READ_CHECK_TIMER.cancel();
+ READ_CHECK_TIMER = null;
+ }
+ try {
+ ThreadPoolUtils.shutdownGraceful(ASYNC_TASKS, 0);
+ } finally {
+ ASYNC_TASKS = null;
+ }
+ }
}
}
}
@@ -497,10 +508,14 @@ public abstract class AbstractInactivityMonitor extends
TransportFilter {
READ_CHECK_TIMER.purge();
CHECKER_COUNTER--;
if (CHECKER_COUNTER == 0) {
- WRITE_CHECK_TIMER.cancel();
- READ_CHECK_TIMER.cancel();
- WRITE_CHECK_TIMER = null;
- READ_CHECK_TIMER = null;
+ if (WRITE_CHECK_TIMER != null) {
+ WRITE_CHECK_TIMER.cancel();
+ WRITE_CHECK_TIMER = null;
+ }
+ if (READ_CHECK_TIMER != null) {
+ READ_CHECK_TIMER.cancel();
+ READ_CHECK_TIMER = null;
+ }
try {
ThreadPoolUtils.shutdownGraceful(ASYNC_TASKS, 0);
} finally {
diff --git
a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/tcp/InactivityMonitorTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/tcp/InactivityMonitorTest.java
index 4e5415a91d..e8d07202b7 100644
---
a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/tcp/InactivityMonitorTest.java
+++
b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/tcp/InactivityMonitorTest.java
@@ -16,9 +16,18 @@
*/
package org.apache.activemq.transport.tcp;
+import static java.lang.Thread.getAllStackTraces;
+import static java.util.stream.Collectors.toList;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.IsCollectionContaining.hasItem;
+import static org.hamcrest.core.IsNot.not;
+
import java.io.IOException;
+import java.net.SocketException;
import java.net.URI;
import java.net.URISyntaxException;
+import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -33,6 +42,7 @@ import org.apache.activemq.transport.TransportAcceptListener;
import org.apache.activemq.transport.TransportFactory;
import org.apache.activemq.transport.TransportListener;
import org.apache.activemq.transport.TransportServer;
+import org.hamcrest.Matcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -73,32 +83,7 @@ public class InactivityMonitorTest extends
CombinationTestSupport implements Tra
*/
private void startClient() throws Exception, URISyntaxException {
clientTransport = TransportFactory.connect(new URI("tcp://localhost:"
+ serverPort + "?trace=true&wireFormat.maxInactivityDuration=1000"));
- clientTransport.setTransportListener(new TransportListener() {
- @Override
- public void onCommand(Object command) {
- clientReceiveCount.incrementAndGet();
- if (clientRunOnCommand != null) {
- clientRunOnCommand.run();
- }
- }
-
- @Override
- public void onException(IOException error) {
- if (!ignoreClientError.get()) {
- LOG.info("Client transport error:");
- error.printStackTrace();
- clientErrorCount.incrementAndGet();
- }
- }
-
- @Override
- public void transportInterupted() {
- }
-
- @Override
- public void transportResumed() {
- }
- });
+ clientTransport.setTransportListener(new
TestClientTransportListener());
clientTransport.start();
}
@@ -181,32 +166,7 @@ public class InactivityMonitorTest extends
CombinationTestSupport implements Tra
// Manually create a client transport so that it does not send
KeepAlive
// packets. this should simulate a client hang.
clientTransport = new TcpTransport(new OpenWireFormat(),
SocketFactory.getDefault(), new URI("tcp://localhost:" + serverPort), null);
- clientTransport.setTransportListener(new TransportListener() {
- @Override
- public void onCommand(Object command) {
- clientReceiveCount.incrementAndGet();
- if (clientRunOnCommand != null) {
- clientRunOnCommand.run();
- }
- }
-
- @Override
- public void onException(IOException error) {
- if (!ignoreClientError.get()) {
- LOG.info("Client transport error:");
- error.printStackTrace();
- clientErrorCount.incrementAndGet();
- }
- }
-
- @Override
- public void transportInterupted() {
- }
-
- @Override
- public void transportResumed() {
- }
- });
+ clientTransport.setTransportListener(new
TestClientTransportListener());
clientTransport.start();
WireFormatInfo info = new WireFormatInfo();
@@ -237,6 +197,34 @@ public class InactivityMonitorTest extends
CombinationTestSupport implements Tra
assertEquals(0, serverErrorCount.get());
}
+ public void testReadCheckTimerIsNotLeakedOnError() throws Exception {
+ // Intentionally picks a port that is not the listening port to
generate a failure
+ clientTransport = TransportFactory.connect(new URI("tcp://localhost:"
+ (serverPort ^ 1)));
+ clientTransport.setTransportListener(new
TestClientTransportListener());
+
+ // Control test to verify there was no timer from a previous test
+ assertThat(getCurrentThreadNames(), not(hasReadCheckTimer()));
+
+ try {
+ clientTransport.start();
+ fail("A ConnectionException was expected");
+ } catch (SocketException e) {
+ // A SocketException is expected.
+ }
+
+ // If there is any read check timer at this point, calling stop should
clean it up (because CHECKER_COUNTER becomes 0)
+ clientTransport.stop();
+ assertThat(getCurrentThreadNames(), not(hasReadCheckTimer()));
+ }
+
+ private static Matcher<Iterable<? super String>> hasReadCheckTimer() {
+ return hasItem("ActiveMQ InactivityMonitor ReadCheckTimer");
+ }
+
+ private static List<String> getCurrentThreadNames() {
+ return
getAllStackTraces().keySet().stream().map(Thread::getName).collect(toList());
+ }
+
/**
* Used to test when a operation blocks. This should not cause transport to
* get disconnected.
@@ -272,4 +260,31 @@ public class InactivityMonitorTest extends
CombinationTestSupport implements Tra
assertEquals(0, clientErrorCount.get());
assertEquals(0, serverErrorCount.get());
}
+
+ private class TestClientTransportListener implements TransportListener {
+ @Override
+ public void onCommand(Object command) {
+ clientReceiveCount.incrementAndGet();
+ if (clientRunOnCommand != null) {
+ clientRunOnCommand.run();
+ }
+ }
+
+ @Override
+ public void onException(IOException error) {
+ if (!ignoreClientError.get()) {
+ LOG.info("Client transport error:");
+ error.printStackTrace();
+ clientErrorCount.incrementAndGet();
+ }
+ }
+
+ @Override
+ public void transportInterupted() {
+ }
+
+ @Override
+ public void transportResumed() {
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact