This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new deda9a108b86 CAMEL-19540: camel-pulsar - replace Thread.sleep in tests 
(#23902)
deda9a108b86 is described below

commit deda9a108b86686fe90daff5ee4b7b4c98663e48
Author: Anvith S G <[email protected]>
AuthorDate: Wed Jun 10 16:17:50 2026 +0530

    CAMEL-19540: camel-pulsar - replace Thread.sleep in tests (#23902)
    
    * CAMEL-19540 - camel-pulsar: replace Thread.sleep in tests
    
    The 1s-per-message sleep in the shared-subscription distribution test
    existed to keep route 1 from draining the topic before the late-starting
    route 2 came online. Replace it with a CountDownLatch released when the
    late consumer receives its first message, making the hand-off
    deterministic instead of timing-dependent.
    
    Co-Authored-By: Claude Fable 5 <[email protected]>
    
    * CAMEL-19540: use Awaitility instead of a custom latch in the pulsar test
    
    Addresses PR review feedback from oscerd: the processor now polls the
    late consumer's mock received counter with Awaitility (bounded at 5s,
    below the 10s ackTimeoutMillis default) instead of wiring a
    CountDownLatch through a mock callback. Same deterministic hand-off,
    less custom mechanism.
    
    Co-Authored-By: Claude Fable 5 <[email protected]>
    
    * CAMEL-19540: retrigger CI (JDK17 job failed on unrelated flaky 
PulsarConsumerNoAcknowledgementIT)
    
    Co-Authored-By: Claude Fable 5 <[email protected]>
    
    ---------
    
    Co-authored-by: Anvith SG <[email protected]>
    Co-authored-by: Claude Fable 5 <[email protected]>
---
 .../PulsarSharedSubscriptionMessageDistributionIT.java     | 14 ++++++++++----
 1 file changed, 10 insertions(+), 4 deletions(-)

diff --git 
a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/integration/PulsarSharedSubscriptionMessageDistributionIT.java
 
b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/integration/PulsarSharedSubscriptionMessageDistributionIT.java
index 5b9dc15af748..f723d50c45d1 100644
--- 
a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/integration/PulsarSharedSubscriptionMessageDistributionIT.java
+++ 
b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/integration/PulsarSharedSubscriptionMessageDistributionIT.java
@@ -30,10 +30,13 @@ import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.impl.ClientBuilderImpl;
+import org.awaitility.core.ConditionTimeoutException;
 import org.junit.jupiter.api.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.awaitility.Awaitility.await;
+
 public class PulsarSharedSubscriptionMessageDistributionIT extends 
PulsarITSupport {
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(PulsarSharedSubscriptionMessageDistributionIT.class);
@@ -69,10 +72,13 @@ public class PulsarSharedSubscriptionMessageDistributionIT 
extends PulsarITSuppo
                     LOGGER.info("Processing message {} on Thread {}", 
exchange.getIn().getHeader("message_id"),
                             Thread.currentThread());
                     try {
-                        Thread.sleep(1000);
-                    } catch (InterruptedException e) {
-                        LOGGER.info("Propagating interrupt");
-                        Thread.currentThread().interrupt();
+                        // hold each exchange until the late-starting consumer 
(route 2) has received a
+                        // message, so route 1 cannot drain the topic before 
route 2 comes online;
+                        // the wait must stay well below the 10s 
ackTimeoutMillis default so a held
+                        // (unacknowledged) message cannot hit the ack-timeout 
and be redelivered
+                        await().atMost(5, TimeUnit.SECONDS).until(() -> 
to2.getReceivedCounter() > 0);
+                    } catch (ConditionTimeoutException e) {
+                        LOGGER.warn("Timed out waiting for the late-starting 
consumer to receive a message");
                     }
                 }
             };

Reply via email to