This is an automated email from the ASF dual-hosted git repository.

jbonofre pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq.git


The following commit(s) were added to refs/heads/main by this push:
     new b6037c720b [AMQ-9392] Prevent InactivityMonitor read check Timer leak 
when TCP connection fails (#1119)
b6037c720b is described below

commit b6037c720bcdbea1821982d78794313a3cf50150
Author: Axel Sanguinetti <[email protected]>
AuthorDate: Sun Nov 2 11:21:49 2025 -0300

    [AMQ-9392] Prevent InactivityMonitor read check Timer leak when TCP 
connection fails (#1119)
    
    * Adding test.
    
    * Naive fix.
    
    * Fixing OpenWireConnectionTimeoutTest.
    - It seems checking configuredOk can't be done that early.
    - Falling back to let the Timer be created and make sure it is disposed of.
---
 .../transport/AbstractInactivityMonitor.java       |  23 +++-
 .../transport/tcp/InactivityMonitorTest.java       | 119 ++++++++++++---------
 2 files changed, 86 insertions(+), 56 deletions(-)

diff --git 
a/activemq-client/src/main/java/org/apache/activemq/transport/AbstractInactivityMonitor.java
 
b/activemq-client/src/main/java/org/apache/activemq/transport/AbstractInactivityMonitor.java
index 6182be7c42..9db5fa1462 100644
--- 
a/activemq-client/src/main/java/org/apache/activemq/transport/AbstractInactivityMonitor.java
+++ 
b/activemq-client/src/main/java/org/apache/activemq/transport/AbstractInactivityMonitor.java
@@ -437,6 +437,17 @@ public abstract class AbstractInactivityMonitor extends 
TransportFilter {
             synchronized (AbstractInactivityMonitor.class) {
                 READ_CHECK_TIMER.purge();
                 CHECKER_COUNTER--;
+                if (CHECKER_COUNTER == 0) {
+                    if (READ_CHECK_TIMER != null) {
+                        READ_CHECK_TIMER.cancel();
+                        READ_CHECK_TIMER = null;
+                    }
+                    try {
+                        ThreadPoolUtils.shutdownGraceful(ASYNC_TASKS, 0);
+                    } finally {
+                        ASYNC_TASKS = null;
+                    }
+                }
             }
         }
     }
@@ -497,10 +508,14 @@ public abstract class AbstractInactivityMonitor extends 
TransportFilter {
                 READ_CHECK_TIMER.purge();
                 CHECKER_COUNTER--;
                 if (CHECKER_COUNTER == 0) {
-                    WRITE_CHECK_TIMER.cancel();
-                    READ_CHECK_TIMER.cancel();
-                    WRITE_CHECK_TIMER = null;
-                    READ_CHECK_TIMER = null;
+                    if (WRITE_CHECK_TIMER != null) {
+                        WRITE_CHECK_TIMER.cancel();
+                        WRITE_CHECK_TIMER = null;
+                    }
+                    if (READ_CHECK_TIMER != null) {
+                        READ_CHECK_TIMER.cancel();
+                        READ_CHECK_TIMER = null;
+                    }
                     try {
                         ThreadPoolUtils.shutdownGraceful(ASYNC_TASKS, 0);
                     } finally {
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/tcp/InactivityMonitorTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/tcp/InactivityMonitorTest.java
index 4e5415a91d..e8d07202b7 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/tcp/InactivityMonitorTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/tcp/InactivityMonitorTest.java
@@ -16,9 +16,18 @@
  */
 package org.apache.activemq.transport.tcp;
 
+import static java.lang.Thread.getAllStackTraces;
+import static java.util.stream.Collectors.toList;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.IsCollectionContaining.hasItem;
+import static org.hamcrest.core.IsNot.not;
+
 import java.io.IOException;
+import java.net.SocketException;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -33,6 +42,7 @@ import org.apache.activemq.transport.TransportAcceptListener;
 import org.apache.activemq.transport.TransportFactory;
 import org.apache.activemq.transport.TransportListener;
 import org.apache.activemq.transport.TransportServer;
+import org.hamcrest.Matcher;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -73,32 +83,7 @@ public class InactivityMonitorTest extends 
CombinationTestSupport implements Tra
      */
     private void startClient() throws Exception, URISyntaxException {
         clientTransport = TransportFactory.connect(new URI("tcp://localhost:" 
+ serverPort + "?trace=true&wireFormat.maxInactivityDuration=1000"));
-        clientTransport.setTransportListener(new TransportListener() {
-            @Override
-            public void onCommand(Object command) {
-                clientReceiveCount.incrementAndGet();
-                if (clientRunOnCommand != null) {
-                    clientRunOnCommand.run();
-                }
-            }
-
-            @Override
-            public void onException(IOException error) {
-                if (!ignoreClientError.get()) {
-                    LOG.info("Client transport error:");
-                    error.printStackTrace();
-                    clientErrorCount.incrementAndGet();
-                }
-            }
-
-            @Override
-            public void transportInterupted() {
-            }
-
-            @Override
-            public void transportResumed() {
-            }
-        });
+        clientTransport.setTransportListener(new 
TestClientTransportListener());
 
         clientTransport.start();
     }
@@ -181,32 +166,7 @@ public class InactivityMonitorTest extends 
CombinationTestSupport implements Tra
         // Manually create a client transport so that it does not send 
KeepAlive
         // packets.  this should simulate a client hang.
         clientTransport = new TcpTransport(new OpenWireFormat(), 
SocketFactory.getDefault(), new URI("tcp://localhost:" + serverPort), null);
-        clientTransport.setTransportListener(new TransportListener() {
-            @Override
-            public void onCommand(Object command) {
-                clientReceiveCount.incrementAndGet();
-                if (clientRunOnCommand != null) {
-                    clientRunOnCommand.run();
-                }
-            }
-
-            @Override
-            public void onException(IOException error) {
-                if (!ignoreClientError.get()) {
-                    LOG.info("Client transport error:");
-                    error.printStackTrace();
-                    clientErrorCount.incrementAndGet();
-                }
-            }
-
-            @Override
-            public void transportInterupted() {
-            }
-
-            @Override
-            public void transportResumed() {
-            }
-        });
+        clientTransport.setTransportListener(new 
TestClientTransportListener());
 
         clientTransport.start();
         WireFormatInfo info = new WireFormatInfo();
@@ -237,6 +197,34 @@ public class InactivityMonitorTest extends 
CombinationTestSupport implements Tra
         assertEquals(0, serverErrorCount.get());
     }
 
+    public void testReadCheckTimerIsNotLeakedOnError() throws Exception {
+        // Intentionally picks a port that is not the listening port to 
generate a failure
+        clientTransport = TransportFactory.connect(new URI("tcp://localhost:" 
+ (serverPort ^ 1)));
+        clientTransport.setTransportListener(new 
TestClientTransportListener());
+
+        // Control test to verify there was no timer from a previous test
+        assertThat(getCurrentThreadNames(), not(hasReadCheckTimer()));
+
+        try {
+            clientTransport.start();
+            fail("A ConnectionException was expected");
+        } catch (SocketException e) {
+            // A SocketException is expected.
+        }
+
+        // If there is any read check timer at this point, calling stop should 
clean it up (because CHECKER_COUNTER becomes 0)
+        clientTransport.stop();
+        assertThat(getCurrentThreadNames(), not(hasReadCheckTimer()));
+    }
+
+    private static Matcher<Iterable<? super String>> hasReadCheckTimer() {
+        return hasItem("ActiveMQ InactivityMonitor ReadCheckTimer");
+    }
+
+    private static List<String> getCurrentThreadNames() {
+        return 
getAllStackTraces().keySet().stream().map(Thread::getName).collect(toList());
+    }
+
     /**
      * Used to test when a operation blocks. This should not cause transport to
      * get disconnected.
@@ -272,4 +260,31 @@ public class InactivityMonitorTest extends 
CombinationTestSupport implements Tra
         assertEquals(0, clientErrorCount.get());
         assertEquals(0, serverErrorCount.get());
     }
+
+    private class TestClientTransportListener implements TransportListener {
+        @Override
+        public void onCommand(Object command) {
+            clientReceiveCount.incrementAndGet();
+            if (clientRunOnCommand != null) {
+                clientRunOnCommand.run();
+            }
+        }
+
+        @Override
+        public void onException(IOException error) {
+            if (!ignoreClientError.get()) {
+                LOG.info("Client transport error:");
+                error.printStackTrace();
+                clientErrorCount.incrementAndGet();
+            }
+        }
+
+        @Override
+        public void transportInterupted() {
+        }
+
+        @Override
+        public void transportResumed() {
+        }
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact


Reply via email to