This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new deda9a108b86 CAMEL-19540: camel-pulsar - replace Thread.sleep in tests
(#23902)
deda9a108b86 is described below
commit deda9a108b86686fe90daff5ee4b7b4c98663e48
Author: Anvith S G <[email protected]>
AuthorDate: Wed Jun 10 16:17:50 2026 +0530
CAMEL-19540: camel-pulsar - replace Thread.sleep in tests (#23902)
* CAMEL-19540 - camel-pulsar: replace Thread.sleep in tests
The 1s-per-message sleep in the shared-subscription distribution test
existed to keep route 1 from draining the topic before the late-starting
route 2 came online. Replace it with a CountDownLatch released when the
late consumer receives its first message, making the hand-off
deterministic instead of timing-dependent.
Co-Authored-By: Claude Fable 5 <[email protected]>
* CAMEL-19540: use Awaitility instead of a custom latch in the pulsar test
Addresses PR review feedback from oscerd: the processor now polls the
late consumer's mock received counter with Awaitility (bounded at 5s,
below the 10s ackTimeoutMillis default) instead of wiring a
CountDownLatch through a mock callback. Same deterministic hand-off,
less custom mechanism.
Co-Authored-By: Claude Fable 5 <[email protected]>
* CAMEL-19540: retrigger CI (JDK17 job failed on unrelated flaky
PulsarConsumerNoAcknowledgementIT)
Co-Authored-By: Claude Fable 5 <[email protected]>
---------
Co-authored-by: Anvith SG <[email protected]>
Co-authored-by: Claude Fable 5 <[email protected]>
---
.../PulsarSharedSubscriptionMessageDistributionIT.java | 14 ++++++++++----
1 file changed, 10 insertions(+), 4 deletions(-)
diff --git
a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/integration/PulsarSharedSubscriptionMessageDistributionIT.java
b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/integration/PulsarSharedSubscriptionMessageDistributionIT.java
index 5b9dc15af748..f723d50c45d1 100644
---
a/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/integration/PulsarSharedSubscriptionMessageDistributionIT.java
+++
b/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/integration/PulsarSharedSubscriptionMessageDistributionIT.java
@@ -30,10 +30,13 @@ import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.ClientBuilderImpl;
+import org.awaitility.core.ConditionTimeoutException;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static org.awaitility.Awaitility.await;
+
public class PulsarSharedSubscriptionMessageDistributionIT extends
PulsarITSupport {
private static final Logger LOGGER =
LoggerFactory.getLogger(PulsarSharedSubscriptionMessageDistributionIT.class);
@@ -69,10 +72,13 @@ public class PulsarSharedSubscriptionMessageDistributionIT
extends PulsarITSuppo
LOGGER.info("Processing message {} on Thread {}",
exchange.getIn().getHeader("message_id"),
Thread.currentThread());
try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- LOGGER.info("Propagating interrupt");
- Thread.currentThread().interrupt();
+ // hold each exchange until the late-starting consumer
(route 2) has received a
+ // message, so route 1 cannot drain the topic before
route 2 comes online;
+ // the wait must stay well below the 10s
ackTimeoutMillis default so a held
+ // (unacknowledged) message cannot hit the ack-timeout
and be redelivered
+ await().atMost(5, TimeUnit.SECONDS).until(() ->
to2.getReceivedCounter() > 0);
+ } catch (ConditionTimeoutException e) {
+ LOGGER.warn("Timed out waiting for the late-starting
consumer to receive a message");
}
}
};