This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new 3b426d1cc22f CAMEL-21438: De-flake timing-sensitive component tests
3b426d1cc22f is described below

commit 3b426d1cc22f5383d19aba71a0e38c07d40889d0
Author: Adriano Machado <[email protected]>
AuthorDate: Tue Jun 30 16:33:41 2026 -0400

    CAMEL-21438: De-flake timing-sensitive component tests
    
    Remove timing races in camel-atom, camel-pg-replication-slot, and
    camel-elasticsearch-rest-client tests. Atom polling tests now use
    repeatCount to bound poll cycles deterministically. Elasticsearch IT
    replaces Thread.sleep(5000) with an Awaitility readiness probe against
    _cluster/health. PgReplicationSlot IT polls faster to avoid racing
    the assertion timeout. Test-only changes, no component behavior changes.
    
    Closes #24338
    
    Co-Authored-By: Claude Opus 4.8 <[email protected]>
---
 .../atom/AtomEntryPollingConsumerTest.java         |  3 ++-
 .../atom/AtomPollingConsumerIdleMessageTest.java   | 22 ++++++++++------------
 .../component/atom/AtomPollingConsumerTest.java    |  6 ++++--
 .../component/atom/AtomPollingLowDelayTest.java    |  5 +++--
 .../component/atom/AtomPollingUnthrottledTest.java |  4 ++--
 .../ElasticsearchRestClientComponentIT.java        | 10 ++++++++--
 .../slot/integration/PgReplicationSlotCamelIT.java | 10 ++++++----
 7 files changed, 35 insertions(+), 25 deletions(-)

diff --git 
a/components/camel-atom/src/test/java/org/apache/camel/component/atom/AtomEntryPollingConsumerTest.java
 
b/components/camel-atom/src/test/java/org/apache/camel/component/atom/AtomEntryPollingConsumerTest.java
index ce868c634b0f..ebf452e3f82b 100644
--- 
a/components/camel-atom/src/test/java/org/apache/camel/component/atom/AtomEntryPollingConsumerTest.java
+++ 
b/components/camel-atom/src/test/java/org/apache/camel/component/atom/AtomEntryPollingConsumerTest.java
@@ -42,7 +42,8 @@ public class AtomEntryPollingConsumerTest extends 
CamelTestSupport {
     protected RouteBuilder createRouteBuilder() {
         return new RouteBuilder() {
             public void configure() {
-                
from("atom:file:src/test/data/feed.atom?splitEntries=true&delay=500")
+                // throttled: one entry per poll; repeatCount=7 bounds it to 
exactly the 7 feed entries
+                
from("atom:file:src/test/data/feed.atom?splitEntries=true&delay=100&initialDelay=0&repeatCount=7")
                         .to("mock:result1");
             }
         };
diff --git 
a/components/camel-atom/src/test/java/org/apache/camel/component/atom/AtomPollingConsumerIdleMessageTest.java
 
b/components/camel-atom/src/test/java/org/apache/camel/component/atom/AtomPollingConsumerIdleMessageTest.java
index 80f9a28a0c6e..3cec836e384e 100644
--- 
a/components/camel-atom/src/test/java/org/apache/camel/component/atom/AtomPollingConsumerIdleMessageTest.java
+++ 
b/components/camel-atom/src/test/java/org/apache/camel/component/atom/AtomPollingConsumerIdleMessageTest.java
@@ -16,12 +16,9 @@
  */
 package org.apache.camel.component.atom;
 
-import java.time.Duration;
-
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.test.junit6.CamelTestSupport;
-import org.awaitility.Awaitility;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.condition.DisabledOnOs;
 import org.junit.jupiter.api.condition.OS;
@@ -36,15 +33,16 @@ import static org.junit.jupiter.api.Assertions.assertNull;
 public class AtomPollingConsumerIdleMessageTest extends CamelTestSupport {
 
     @Test
-    void testConsumeIdleMessages() {
-        Awaitility.await().atMost(Duration.ofMillis(500)).untilAsserted(() -> {
-            MockEndpoint mock = getMockEndpoint("mock:result");
-            mock.expectedMinimumMessageCount(2);
-            MockEndpoint.assertIsSatisfied(context);
-
-            assertNull(mock.getExchanges().get(0).getIn().getBody());
-            assertNull(mock.getExchanges().get(1).getIn().getBody());
-        });
+    void testConsumeIdleMessages() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        // an empty feed polled with sendEmptyMessageWhenIdle=true emits one 
idle exchange per poll;
+        // let assertIsSatisfied do the waiting (default result wait time) 
instead of a fixed deadline
+        mock.expectedMinimumMessageCount(2);
+        mock.assertIsSatisfied();
+
+        // idle exchanges carry no body
+        assertNull(mock.getExchanges().get(0).getIn().getBody());
+        assertNull(mock.getExchanges().get(1).getIn().getBody());
     }
 
     @Override
diff --git 
a/components/camel-atom/src/test/java/org/apache/camel/component/atom/AtomPollingConsumerTest.java
 
b/components/camel-atom/src/test/java/org/apache/camel/component/atom/AtomPollingConsumerTest.java
index 505e3812fb67..96c1196c8e22 100644
--- 
a/components/camel-atom/src/test/java/org/apache/camel/component/atom/AtomPollingConsumerTest.java
+++ 
b/components/camel-atom/src/test/java/org/apache/camel/component/atom/AtomPollingConsumerTest.java
@@ -75,10 +75,12 @@ public class AtomPollingConsumerTest extends 
CamelTestSupport {
     protected RouteBuilder createRouteBuilder() {
         return new RouteBuilder() {
             public void configure() {
-                
from("atom:file:src/test/data/feed.atom?splitEntries=false").to("mock:result");
+                // not split: a single poll delivers the whole feed; 
repeatCount=1 keeps it to exactly one message
+                
from("atom:file:src/test/data/feed.atom?splitEntries=false&initialDelay=0&repeatCount=1").to("mock:result");
 
                 // this is a bit weird syntax that normally is not using the 
feedUri parameter
-                
from("atom:?feedUri=file:src/test/data/feed.atom&splitEntries=false").to("mock:result2");
+                
from("atom:?feedUri=file:src/test/data/feed.atom&splitEntries=false&initialDelay=0&repeatCount=1")
+                        .to("mock:result2");
             }
         };
     }
diff --git 
a/components/camel-atom/src/test/java/org/apache/camel/component/atom/AtomPollingLowDelayTest.java
 
b/components/camel-atom/src/test/java/org/apache/camel/component/atom/AtomPollingLowDelayTest.java
index 774c57c05b78..d32458e90c2c 100644
--- 
a/components/camel-atom/src/test/java/org/apache/camel/component/atom/AtomPollingLowDelayTest.java
+++ 
b/components/camel-atom/src/test/java/org/apache/camel/component/atom/AtomPollingLowDelayTest.java
@@ -33,7 +33,6 @@ public class AtomPollingLowDelayTest extends CamelTestSupport 
{
     void testLowDelay() throws Exception {
         MockEndpoint mock = getMockEndpoint("mock:result");
         mock.expectedMessageCount(7);
-        mock.setResultWaitTime(3000L);
         mock.assertIsSatisfied();
     }
 
@@ -41,7 +40,9 @@ public class AtomPollingLowDelayTest extends CamelTestSupport 
{
     protected RouteBuilder createRouteBuilder() {
         return new RouteBuilder() {
             public void configure() {
-                
from("atom:file:src/test/data/feed.atom?splitEntries=true&delay=100&initialDelay=0").to("mock:result");
+                // throttled fast polling: one entry per poll; repeatCount=7 
bounds it to exactly the 7 feed entries
+                
from("atom:file:src/test/data/feed.atom?splitEntries=true&delay=100&initialDelay=0&repeatCount=7")
+                        .to("mock:result");
             }
         };
     }
diff --git 
a/components/camel-atom/src/test/java/org/apache/camel/component/atom/AtomPollingUnthrottledTest.java
 
b/components/camel-atom/src/test/java/org/apache/camel/component/atom/AtomPollingUnthrottledTest.java
index 4dd629bf969d..e066b54d05fb 100644
--- 
a/components/camel-atom/src/test/java/org/apache/camel/component/atom/AtomPollingUnthrottledTest.java
+++ 
b/components/camel-atom/src/test/java/org/apache/camel/component/atom/AtomPollingUnthrottledTest.java
@@ -30,7 +30,6 @@ public class AtomPollingUnthrottledTest extends 
CamelTestSupport {
     void testLowDelay() throws Exception {
         MockEndpoint mock = getMockEndpoint("mock:result");
         mock.expectedMessageCount(7);
-        mock.setResultWaitTime(3000L);
 
         MockEndpoint.assertIsSatisfied(context);
     }
@@ -39,7 +38,8 @@ public class AtomPollingUnthrottledTest extends 
CamelTestSupport {
     protected RouteBuilder createRouteBuilder() {
         return new RouteBuilder() {
             public void configure() {
-                
from("atom:file:src/test/data/feed.atom?splitEntries=true&throttleEntries=false&initialDelay=0")
+                // unthrottled: a single poll delivers all entries, so 
repeatCount=1 yields exactly the 7 feed entries
+                
from("atom:file:src/test/data/feed.atom?splitEntries=true&throttleEntries=false&initialDelay=0&repeatCount=1")
                         .to("mock:result");
             }
         };
diff --git 
a/components/camel-elasticsearch-rest-client/src/test/java/org/apache/camel/component/elasticsearch/rest/client/integration/ElasticsearchRestClientComponentIT.java
 
b/components/camel-elasticsearch-rest-client/src/test/java/org/apache/camel/component/elasticsearch/rest/client/integration/ElasticsearchRestClientComponentIT.java
index 8abd876d8b9e..6b49b5b14395 100644
--- 
a/components/camel-elasticsearch-rest-client/src/test/java/org/apache/camel/component/elasticsearch/rest/client/integration/ElasticsearchRestClientComponentIT.java
+++ 
b/components/camel-elasticsearch-rest-client/src/test/java/org/apache/camel/component/elasticsearch/rest/client/integration/ElasticsearchRestClientComponentIT.java
@@ -28,6 +28,8 @@ import 
org.apache.camel.component.elasticsearch.rest.client.ElasticSearchRestCli
 import 
org.apache.camel.component.elasticsearch.rest.client.ElasticsearchRestClientOperation;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.awaitility.Awaitility;
+import org.elasticsearch.client.Request;
+import org.elasticsearch.client.Response;
 import org.junit.jupiter.api.Test;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -82,8 +84,12 @@ public class ElasticsearchRestClientComponentIT extends 
ElasticsearchRestClientI
     @Test
     void testProducer() throws ExecutionException, InterruptedException {
 
-        // Workaround to avoid the Credential Provider to not be ready and to 
receive a 401
-        Thread.sleep(5000);
+        // Wait until Elasticsearch security is ready so authenticated 
requests succeed, instead of
+        // sleeping a fixed amount and hoping the credential provider is ready 
(which races and 401s).
+        Awaitility.await().atMost(30, 
TimeUnit.SECONDS).ignoreExceptions().untilAsserted(() -> {
+            Response health = restClient.performRequest(new Request("GET", 
"/_cluster/health"));
+            assertEquals(200, health.getStatusLine().getStatusCode());
+        });
 
         // create index
         CompletableFuture<Boolean> ack = 
template.asyncRequestBody("direct:create-index", null, Boolean.class);
diff --git 
a/components/camel-pg-replication-slot/src/test/java/org/apache/camel/component/pg/replication/slot/integration/PgReplicationSlotCamelIT.java
 
b/components/camel-pg-replication-slot/src/test/java/org/apache/camel/component/pg/replication/slot/integration/PgReplicationSlotCamelIT.java
index 21bfea73aa5c..c111a64f3cd0 100644
--- 
a/components/camel-pg-replication-slot/src/test/java/org/apache/camel/component/pg/replication/slot/integration/PgReplicationSlotCamelIT.java
+++ 
b/components/camel-pg-replication-slot/src/test/java/org/apache/camel/component/pg/replication/slot/integration/PgReplicationSlotCamelIT.java
@@ -59,10 +59,13 @@ public class PgReplicationSlotCamelIT extends 
PgReplicationITSupport {
             @Override
             public void configure() {
 
+                // poll quickly: this consumer delivers one decoded message 
per poll, so the default
+                // 1s initial delay + 500ms cadence can take ~3.5s for 6 
messages and race the 5s timeout under CI load
                 String uriFormat
                         = 
"pg-replication-slot://{{postgres.service.address}}/camel/camel_test_slot:test_decoding?"
                           + 
"user={{postgres.user.name}}&password={{postgres.user.password}}"
-                          + 
"&slotOptions.skip-empty-xacts=true&slotOptions.include-xids=false";
+                          + 
"&slotOptions.skip-empty-xacts=true&slotOptions.include-xids=false"
+                          + "&initialDelay=200&delay=200";
 
                 from(uriFormat).to(mockEndpoint);
             }
@@ -71,10 +74,9 @@ public class PgReplicationSlotCamelIT extends 
PgReplicationITSupport {
 
     @Test
     public void canReceiveFromSlot() throws InterruptedException, SQLException 
{
-        mockEndpoint.expectedMessageCount(1);
-
         // test_decoding plugin writes each change in a separate message. Some 
other plugins can have different behaviour,
         // wal2json default behaviour is to write the whole transaction in one 
message.
+        // expectedBodiesReceived pins the exact count (6), order and content.
         mockEndpoint.expectedBodiesReceived("BEGIN", "table 
public.camel_test_table: INSERT: id[integer]:1984", "COMMIT",
                 "BEGIN", "table public.camel_test_table: INSERT: 
id[integer]:1998", "COMMIT");
 
@@ -83,6 +85,6 @@ public class PgReplicationSlotCamelIT extends 
PgReplicationITSupport {
             statement.execute("INSERT INTO camel_test_table(id) 
VALUES(1998);");
         }
 
-        mockEndpoint.assertIsSatisfied(5000);
+        mockEndpoint.assertIsSatisfied();
     }
 }

Reply via email to