This is an automated email from the ASF dual-hosted git repository.
jbonofre pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq.git
The following commit(s) were added to refs/heads/main by this push:
new a2fe891993 fix(test): replace Thread.sleep with Wait.waitFor in
ZeroPrefetchConsumerTest (#1746)
a2fe891993 is described below
commit a2fe8919932463076671e3318c8a874e59c01f94
Author: JB Onofré <[email protected]>
AuthorDate: Mon Mar 9 21:07:23 2026 +0100
fix(test): replace Thread.sleep with Wait.waitFor in
ZeroPrefetchConsumerTest (#1746)
Replace remaining Thread.sleep() with deterministic Wait.waitFor() polling
to handle asynchronous ConsumerControl processing, preventing flaky test
failures due to timing races.
---
.../org/apache/activemq/ZeroPrefetchConsumerTest.java | 19 ++++++++++++++-----
1 file changed, 14 insertions(+), 5 deletions(-)
diff --git
a/activemq-unit-tests/src/test/java/org/apache/activemq/ZeroPrefetchConsumerTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/ZeroPrefetchConsumerTest.java
index e06dc3e1f5..3c705d5235 100644
---
a/activemq-unit-tests/src/test/java/org/apache/activemq/ZeroPrefetchConsumerTest.java
+++
b/activemq-unit-tests/src/test/java/org/apache/activemq/ZeroPrefetchConsumerTest.java
@@ -362,20 +362,23 @@ public class ZeroPrefetchConsumerTest extends
EmbeddedBrokerTestSupport {
final ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer)
session.createConsumer(brokerZeroQueue);
- // Wait for broker subscription to be created and policy applied
+ // Wait for broker subscription to be created, policy applied, and
ConsumerControl
+ // propagated back to the client (the broker sends a ConsumerControl
to override
+ // the prefetch to 0, but the client processes it asynchronously)
final ActiveMQDestination transformedDest =
ActiveMQDestination.transform(brokerZeroQueue);
org.apache.activemq.util.Wait.waitFor(new
org.apache.activemq.util.Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return
broker.getRegionBroker().getDestinationMap().get(transformedDest) != null
- &&
!broker.getRegionBroker().getDestinationMap().get(transformedDest).getConsumers().isEmpty();
+ &&
!broker.getRegionBroker().getDestinationMap().get(transformedDest).getConsumers().isEmpty()
+ && consumer.info.getCurrentPrefetchSize() == 0;
}
}, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(100));
assertEquals("broker config prefetch in effect", 0,
consumer.info.getCurrentPrefetchSize());
// verify sub view broker
- Subscription sub =
+ final Subscription sub =
broker.getRegionBroker().getDestinationMap().get(transformedDest).getConsumers().get(0);
assertEquals("broker sub prefetch is correct", 0,
sub.getConsumerInfo().getCurrentPrefetchSize());
@@ -388,8 +391,14 @@ public class ZeroPrefetchConsumerTest extends
EmbeddedBrokerTestSupport {
Object reply = ((ActiveMQConnection)
connection).getTransport().request(consumerControl);
assertTrue("good request", !(reply instanceof ExceptionResponse));
- // Wait for the ConsumerControl to be processed
- Thread.sleep(500);
+ // Wait for the ConsumerControl to be processed - broker policy should
override back to 0
+ org.apache.activemq.util.Wait.waitFor(new
org.apache.activemq.util.Wait.Condition() {
+ @Override
+ public boolean isSatisified() throws Exception {
+ return consumer.info.getCurrentPrefetchSize() == 0
+ && sub.getConsumerInfo().getCurrentPrefetchSize() == 0;
+ }
+ }, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(100));
assertEquals("broker config prefetch in effect", 0,
consumer.info.getCurrentPrefetchSize());
assertEquals("broker sub prefetch is correct", 0,
sub.getConsumerInfo().getCurrentPrefetchSize());
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact