This is an automated email from the ASF dual-hosted git repository.

jbonofre pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq.git


The following commit(s) were added to refs/heads/main by this push:
     new a2fe891993 fix(test): replace Thread.sleep with Wait.waitFor in 
ZeroPrefetchConsumerTest (#1746)
a2fe891993 is described below

commit a2fe8919932463076671e3318c8a874e59c01f94
Author: JB Onofré <[email protected]>
AuthorDate: Mon Mar 9 21:07:23 2026 +0100

    fix(test): replace Thread.sleep with Wait.waitFor in 
ZeroPrefetchConsumerTest (#1746)
    
    Replace remaining Thread.sleep() with deterministic Wait.waitFor() polling
    to handle asynchronous ConsumerControl processing, preventing flaky test
    failures due to timing races.
---
 .../org/apache/activemq/ZeroPrefetchConsumerTest.java | 19 ++++++++++++++-----
 1 file changed, 14 insertions(+), 5 deletions(-)

diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/ZeroPrefetchConsumerTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/ZeroPrefetchConsumerTest.java
index e06dc3e1f5..3c705d5235 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/ZeroPrefetchConsumerTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/ZeroPrefetchConsumerTest.java
@@ -362,20 +362,23 @@ public class ZeroPrefetchConsumerTest extends 
EmbeddedBrokerTestSupport {
 
         final ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer) 
session.createConsumer(brokerZeroQueue);
 
-        // Wait for broker subscription to be created and policy applied
+        // Wait for broker subscription to be created, policy applied, and 
ConsumerControl
+        // propagated back to the client (the broker sends a ConsumerControl 
to override
+        // the prefetch to 0, but the client processes it asynchronously)
         final ActiveMQDestination transformedDest = 
ActiveMQDestination.transform(brokerZeroQueue);
         org.apache.activemq.util.Wait.waitFor(new 
org.apache.activemq.util.Wait.Condition() {
             @Override
             public boolean isSatisified() throws Exception {
                 return 
broker.getRegionBroker().getDestinationMap().get(transformedDest) != null
-                    && 
!broker.getRegionBroker().getDestinationMap().get(transformedDest).getConsumers().isEmpty();
+                    && 
!broker.getRegionBroker().getDestinationMap().get(transformedDest).getConsumers().isEmpty()
+                    && consumer.info.getCurrentPrefetchSize() == 0;
             }
         }, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(100));
 
         assertEquals("broker config prefetch in effect", 0, 
consumer.info.getCurrentPrefetchSize());
 
         // verify sub view broker
-        Subscription sub =
+        final Subscription sub =
                 
broker.getRegionBroker().getDestinationMap().get(transformedDest).getConsumers().get(0);
         assertEquals("broker sub prefetch is correct", 0, 
sub.getConsumerInfo().getCurrentPrefetchSize());
 
@@ -388,8 +391,14 @@ public class ZeroPrefetchConsumerTest extends 
EmbeddedBrokerTestSupport {
         Object reply = ((ActiveMQConnection) 
connection).getTransport().request(consumerControl);
         assertTrue("good request", !(reply instanceof ExceptionResponse));
 
-        // Wait for the ConsumerControl to be processed
-        Thread.sleep(500);
+        // Wait for the ConsumerControl to be processed - broker policy should 
override back to 0
+        org.apache.activemq.util.Wait.waitFor(new 
org.apache.activemq.util.Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return consumer.info.getCurrentPrefetchSize() == 0
+                    && sub.getConsumerInfo().getCurrentPrefetchSize() == 0;
+            }
+        }, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(100));
 
         assertEquals("broker config prefetch in effect", 0, 
consumer.info.getCurrentPrefetchSize());
         assertEquals("broker sub prefetch is correct", 0, 
sub.getConsumerInfo().getCurrentPrefetchSize());


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact


Reply via email to