This is an automated email from the ASF dual-hosted git repository.

jlmonteiro pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq.git


The following commit(s) were added to refs/heads/main by this push:
     new f7993bcc6b test(unit-tests): improve reliability of message 
consumption in tests (#1711)
f7993bcc6b is described below

commit f7993bcc6b9445b0b6695ab56a5d30f7e72fe0af
Author: Jean-Louis Monteiro <[email protected]>
AuthorDate: Tue Mar 3 09:58:49 2026 +0100

    test(unit-tests): improve reliability of message consumption in tests 
(#1711)
    
    * test(unit-tests): improve reliability of message consumption in tests
    
    * [test] Fix ConnectionFailureEvictsFromPoolTest: eliminate flaky async 
races
    
    Two race conditions caused testEvictionXA to fail intermittently:
    
    1. Exception event propagation: ActiveMQConnection.addTransportListener()
       callbacks fire via executeAsync(), which silently drops tasks when the
       pool's ExceptionListener closes the connection and shuts down the
       executor first. Fixed by intercepting at the MockTransport level where
       exception propagation is synchronous.
    
    2. Pool eviction timing: The pool evicts broken connections asynchronously
       via ExceptionListener fired through executeAsync(). The test could
       request a new connection before eviction completed. Fixed by using
       Wait.waitFor() retry pattern (consistent with other pool tests).
    
    * test(MaxFrameSizeEnabled): increase timeouts to improve test reliability
    
    * test(AMQ2149): enhance prefetch policy for transactional connections
    
    ---------
    
    Co-authored-by: root <[email protected]>
---
 .../pool/ConnectionFailureEvictsFromPoolTest.java  | 40 +++++++++++++++-------
 .../java/org/apache/activemq/bugs/AMQ2149Test.java |  9 ++++-
 .../org/apache/activemq/store/StoreOrderTest.java  |  2 ++
 .../activemq/store/jdbc/XACompletionTest.java      |  7 +---
 .../transport/MaxFrameSizeEnabledTest.java         | 17 +++++----
 .../RestrictedThreadPoolInactivityTimeoutTest.java |  2 ++
 .../QueueZeroPrefetchLazyDispatchPriorityTest.java | 22 ++++++++----
 7 files changed, 66 insertions(+), 33 deletions(-)

diff --git 
a/activemq-pool/src/test/java/org/apache/activemq/pool/ConnectionFailureEvictsFromPoolTest.java
 
b/activemq-pool/src/test/java/org/apache/activemq/pool/ConnectionFailureEvictsFromPoolTest.java
index 596eb00fe3..3a0fc67943 100644
--- 
a/activemq-pool/src/test/java/org/apache/activemq/pool/ConnectionFailureEvictsFromPoolTest.java
+++ 
b/activemq-pool/src/test/java/org/apache/activemq/pool/ConnectionFailureEvictsFromPoolTest.java
@@ -39,6 +39,7 @@ import org.apache.activemq.jms.pool.PooledConnection;
 import org.apache.activemq.test.TestSupport;
 import org.apache.activemq.transport.TransportListener;
 import org.apache.activemq.transport.mock.MockTransport;
+import org.apache.activemq.util.Wait;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -94,17 +95,27 @@ public class ConnectionFailureEvictsFromPoolTest extends 
TestSupport {
         final CountDownLatch gotExceptionEvent = new CountDownLatch(1);
         try (final PooledConnection connection = (PooledConnection) 
pooledFactory.createConnection()) {
             final ActiveMQConnection amqC = (ActiveMQConnection) 
connection.getConnection();
-            amqC.addTransportListener(new TransportListener() {
+            // Intercept exception propagation at the MockTransport level 
where it fires
+            // synchronously. ActiveMQConnection.addTransportListener() 
callbacks fire via
+            // executeAsync(), which silently drops the task if the pool's 
ExceptionListener
+            // closes the connection and shuts down the executor first (race 
condition that
+            // affects the XA path).
+            final MockTransport mockTransport = (MockTransport) 
amqC.getTransportChannel().narrow(MockTransport.class);
+            final TransportListener originalListener = 
mockTransport.getTransportListener();
+            mockTransport.setTransportListener(new TransportListener() {
                 public void onCommand(Object command) {
+                    originalListener.onCommand(command);
                 }
                 public void onException(IOException error) {
-                    // we know connection is dead...
-                    // listeners are fired async
+                    // fires synchronously when MockTransport.onException() is 
called
                     gotExceptionEvent.countDown();
+                    originalListener.onException(error);
                 }
                 public void transportInterupted() {
+                    originalListener.transportInterupted();
                 }
                 public void transportResumed() {
+                    originalListener.transportResumed();
                 }
             });
 
@@ -116,18 +127,21 @@ public class ConnectionFailureEvictsFromPoolTest extends 
TestSupport {
                 TestCase.fail("Expected Error");
             } catch (JMSException e) {
             }
-            // Wait for async exception event BEFORE the try-with-resources 
closes the connection.
-            // ActiveMQConnection.onException() fires TransportListener 
callbacks via executeAsync(),
-            // so the callback runs in a separate thread. If we wait after 
connection.close(), the
-            // async executor may already be shut down and the callback never 
fires.
-            TestCase.assertTrue("exception event propagated ok", 
gotExceptionEvent.await(15, TimeUnit.SECONDS));
+            TestCase.assertTrue("exception event propagated ok", 
gotExceptionEvent.await(5, TimeUnit.SECONDS));
         }
-        // If we get another connection now it should be a new connection that
-        // works.
+        // After the failure, a new connection from the pool should work.
+        // The pool eviction is async (ExceptionListener fires via 
executeAsync),
+        // so retry until the pool returns a working connection.
         LOG.info("expect new connection after failure");
-        try (final Connection connection2 = pooledFactory.createConnection()) {
-            sendMessage(connection2);
-        }
+        assertTrue("pool should provide working connection after eviction",
+            Wait.waitFor(() -> {
+                try (final Connection connection2 = 
pooledFactory.createConnection()) {
+                    sendMessage(connection2);
+                    return true;
+                } catch (Exception e) {
+                    return false;
+                }
+            }, 5000, 100));
     }
 
     private void createConnectionFailure(Connection connection) throws 
Exception {
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java 
b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java
index 37b6cdeb26..4ec8c59231 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java
@@ -41,6 +41,7 @@ import org.junit.Rule;
 import org.junit.Test;
 
 import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ActiveMQPrefetchPolicy;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.broker.region.DestinationStatistics;
@@ -171,6 +172,12 @@ public class AMQ2149Test {
             this.transactional = transactional;
             ActiveMQConnectionFactory connectionFactory = new 
ActiveMQConnectionFactory(brokerURL);
             connectionFactory.setWatchTopicAdvisories(false);
+            if (transactional) {
+                final ActiveMQPrefetchPolicy policy = 
connectionFactory.getPrefetchPolicy();
+                policy.setQueuePrefetch(1);
+                policy.setTopicPrefetch(1);
+                policy.setDurableTopicPrefetch(1);
+            }
             connection = connectionFactory.createConnection();
             connection.setClientID(dest.toString());
             session = connection.createSession(transactional, transactional ? 
Session.SESSION_TRANSACTED : Session.AUTO_ACKNOWLEDGE);
@@ -194,7 +201,7 @@ public class AMQ2149Test {
         
         final int TRANSACITON_BATCH = 500;
         boolean resumeOnNextOrPreviousIsOk = false;
-        public void onMessage(Message message) {
+        public synchronized void onMessage(Message message) {
             try {
                 final long seqNum = message.getLongProperty(SEQ_NUM_PROPERTY);
                 if ((seqNum % TRANSACITON_BATCH) == 0) {
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/store/StoreOrderTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/store/StoreOrderTest.java
index 58c853e1ba..8dfe3da99b 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/store/StoreOrderTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/store/StoreOrderTest.java
@@ -120,6 +120,7 @@ public abstract class StoreOrderTest {
         }
         if (broker != null) {
             broker.stop();
+            broker.waitUntilStopped();
         }
     }
     
@@ -257,6 +258,7 @@ public abstract class StoreOrderTest {
         configureBroker(newBroker);
         newBroker.setDeleteAllMessagesOnStartup(deleteMessagesOnStartup);
         newBroker.start();
+        newBroker.waitUntilStarted();
         return newBroker;
     }
     
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/XACompletionTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/XACompletionTest.java
index 5b7a3a417f..26e5784087 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/XACompletionTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/XACompletionTest.java
@@ -275,12 +275,7 @@ public class XACompletionTest extends TestSupport {
 
         dumpMessages();
 
-        Wait.waitFor(new Wait.Condition() {
-            @Override
-            public boolean isSatisified() throws Exception {
-                return proxy.getInFlightCount() == 0l;
-            }
-        });
+        Wait.waitFor(() -> proxy.getInFlightCount() == 0L && 
proxy.cursorSize() == 0);
         assertEquals("prefetch", 0, proxy.getInFlightCount());
         assertEquals("size", 10, proxy.getQueueSize());
         assertEquals("cursor size", 0, proxy.cursorSize());
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/MaxFrameSizeEnabledTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/MaxFrameSizeEnabledTest.java
index ed6635480d..77aafacbab 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/MaxFrameSizeEnabledTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/MaxFrameSizeEnabledTest.java
@@ -55,6 +55,9 @@ public class MaxFrameSizeEnabledTest {
     private static final int CONNECTION_COUNT = 3;
     private static final int MESSAGE_ATTEMPTS = 3;
     private static final int BODY_SIZE = 20000; // large enough to trip 2k 
limit, compressible enough for 60k
+    private static final long BROKER_START_TIMEOUT_MS = 30_000;
+    private static final long BROKER_STOP_TIMEOUT_MS = 30_000;
+    private static final int TEST_TIMEOUT_MS = 120_000;
 
     private BrokerService broker;
     private final String transportType;
@@ -158,30 +161,32 @@ public class MaxFrameSizeEnabledTest {
     }
 
     public BrokerService createBroker(String connectorName, String 
connectorString) throws Exception {
-        BrokerService broker = new BrokerService();
+        final BrokerService broker = new BrokerService();
         broker.setPersistent(false);
         broker.setUseJmx(false);
-        TransportConnector connector = broker.addConnector(connectorString);
+        final TransportConnector connector = 
broker.addConnector(connectorString);
         connector.setName(connectorName);
         broker.start();
-        broker.waitUntilStarted();
+        assertTrue("Broker should start within timeout",
+                Wait.waitFor(broker::isStarted, BROKER_START_TIMEOUT_MS, 100));
         return broker;
     }
 
     public void stopBroker(BrokerService broker) throws Exception {
         if (broker != null) {
             broker.stop();
-            broker.waitUntilStopped();
+            assertTrue("Broker should stop within timeout",
+                    Wait.waitFor(broker::isStopped, BROKER_STOP_TIMEOUT_MS, 
100));
         }
     }
 
-    @Test
+    @Test(timeout = TEST_TIMEOUT_MS)
     public void testMaxFrameSize() throws Exception {
         broker = createBroker(transportType, transportType + 
"://localhost:0?wireFormat.maxFrameSize=2048" + getServerParams());
         testMaxFrameSize(transportType, 
getClientUri(broker.getConnectorByName(transportType).getConnectUri().getPort()),
 false);
     }
 
-    @Test
+    @Test(timeout = TEST_TIMEOUT_MS)
     public void testMaxFrameSizeCompression() throws Exception {
         // Test message body length is 99841 bytes. Compresses to ~ 48000
         broker = createBroker(transportType, transportType + 
"://localhost:0?wireFormat.maxFrameSize=60000" + getServerParams());
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/RestrictedThreadPoolInactivityTimeoutTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/RestrictedThreadPoolInactivityTimeoutTest.java
index 7eaefd23bd..bf3ab74a9a 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/RestrictedThreadPoolInactivityTimeoutTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/RestrictedThreadPoolInactivityTimeoutTest.java
@@ -35,6 +35,7 @@ import java.util.concurrent.TimeUnit;
 
 public class RestrictedThreadPoolInactivityTimeoutTest extends JmsTestSupport {
     private static final Logger LOG = 
LoggerFactory.getLogger(RestrictedThreadPoolInactivityTimeoutTest.class);
+    private static final int TEST_TIMEOUT_MS = 120_000;
 
     public String brokerTransportScheme = "tcp";
     public Boolean rejectWork = Boolean.FALSE;
@@ -86,6 +87,7 @@ public class RestrictedThreadPoolInactivityTimeoutTest 
extends JmsTestSupport {
         addCombinationValues("rejectWork", new Object[] {Boolean.TRUE, 
Boolean.FALSE});
     }
 
+    @org.junit.Test(timeout = TEST_TIMEOUT_MS)
     public void testThreadsInvolvedInXInactivityTimeouts() throws Exception {
 
         URI tcpBrokerUri = 
URISupport.removeQuery(broker.getTransportConnectors().get(0).getConnectUri());
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/QueueZeroPrefetchLazyDispatchPriorityTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/QueueZeroPrefetchLazyDispatchPriorityTest.java
index 4c5b193741..7ebe48cc1d 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/QueueZeroPrefetchLazyDispatchPriorityTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/QueueZeroPrefetchLazyDispatchPriorityTest.java
@@ -199,15 +199,23 @@ public class QueueZeroPrefetchLazyDispatchPriorityTest {
             assertNotNull(message);
             assertEquals(5, message.getJMSPriority());
 
-            // consume messages
-            final ArrayList<Message> consumeList = consumeMessages("TestQ");
+            // Wait for remaining messages to be fully available after 
consumeOneMessage closes its connection.
+            // With lazyDispatch=true + optimizedDispatch=true, messages may 
briefly be in-flight
+            // during connection teardown and not yet re-queued for dispatch 
to a new consumer.
+            final int remaining = numToSend - 1;
+            assertTrue("Remaining messages available for dispatch", 
Wait.waitFor(() -> {
+                final Queue q = (Queue) broker.getDestination(destination);
+                return q != null
+                    && q.getDestinationStatistics().getMessages().getCount() 
== remaining
+                    && q.getDestinationStatistics().getInflight().getCount() 
== 0;
+            }, 5000, 100));
+
+            // consume messages (use timeout-based overload for reliable 
dispatch on slow CI)
+            final ArrayList<Message> consumeList = consumeMessages("TestQ", 
remaining, TimeUnit.SECONDS.toMillis(30));
             LOG.info("Consumed list {}", consumeList.size());
 
-            // compare lists
-            // assertEquals("Iteration: " + i
-            // +", message 1 should be priority high", 5,
-            // consumeList.get(0).getJMSPriority());
-            for (int j = 1; j < (numToSend - 1); j++) {
+            assertEquals("Iteration: " + i + ", all remaining messages 
consumed", remaining, consumeList.size());
+            for (int j = 0; j < consumeList.size(); j++) {
                 assertEquals("Iteration: " + i + ", message " + j + " should 
be priority medium", 4, consumeList.get(j).getJMSPriority());
             }
         }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact


Reply via email to