This is an automated email from the ASF dual-hosted git repository.

gnodet pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new fd2adec0294a CAMEL-23892: Fix flaky camel-sql JDBC aggregate tests by 
replacing Thread.sleep with Awaitility (#24391)
fd2adec0294a is described below

commit fd2adec0294a95bfb3e9bc2e1cb61f3a789dda01
Author: Guillaume Nodet <[email protected]>
AuthorDate: Fri Jul 3 14:02:03 2026 +0200

    CAMEL-23892: Fix flaky camel-sql JDBC aggregate tests by replacing 
Thread.sleep with Awaitility (#24391)
    
    Signed-off-by: Guillaume Nodet <[email protected]>
    Co-authored-by: Claude Opus 4.6 <[email protected]>
---
 .../jdbc/AbstractJdbcAggregationTestSupport.java    |  8 +++++---
 .../jdbc/JdbcAggregateCompletionIntervalTest.java   | 11 +++++++----
 .../JdbcAggregateConcurrentDifferentGroupsTest.java | 16 +++++++---------
 .../jdbc/JdbcAggregateConcurrentSameGroupTest.java  | 14 ++++++--------
 .../jdbc/JdbcAggregateDiscardOnTimeoutTest.java     | 21 ++++++++++++++-------
 .../jdbc/JdbcAggregateLoadAndRecoverTest.java       |  3 ++-
 .../jdbc/JdbcAggregateLoadConcurrentTest.java       | 18 ++++++++----------
 ...dbcAggregationRepositoryRecoverExistingTest.java |  9 ++++++---
 .../jdbc/JdbcRemoveConfirmOrderAggregateTest.java   |  8 ++++----
 9 files changed, 59 insertions(+), 49 deletions(-)

diff --git 
a/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/AbstractJdbcAggregationTestSupport.java
 
b/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/AbstractJdbcAggregationTestSupport.java
index 4b7568964bdd..5026bd77b2b7 100644
--- 
a/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/AbstractJdbcAggregationTestSupport.java
+++ 
b/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/AbstractJdbcAggregationTestSupport.java
@@ -16,6 +16,8 @@
  */
 package org.apache.camel.processor.aggregate.jdbc;
 
+import java.util.concurrent.TimeUnit;
+
 import org.apache.camel.AggregationStrategy;
 import org.apache.camel.Exchange;
 import org.apache.camel.builder.RouteBuilder;
@@ -79,11 +81,11 @@ public abstract class AbstractJdbcAggregationTestSupport 
extends CamelSpringTest
                             = 
ObjectHelper.getException(OptimisticLockingAggregationRepository.OptimisticLockingException.class,
                                     e);
                     if (ole != null) {
-                        // okay lets try again
+                        // okay lets try again after a short back-off
                         try {
-                            Thread.sleep(50);
+                            TimeUnit.MILLISECONDS.sleep(50);
                         } catch (InterruptedException ex) {
-                            // ignore
+                            Thread.currentThread().interrupt();
                         }
                         continue;
                     }
diff --git 
a/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateCompletionIntervalTest.java
 
b/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateCompletionIntervalTest.java
index a3f1de5dd57d..67215deb6326 100644
--- 
a/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateCompletionIntervalTest.java
+++ 
b/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateCompletionIntervalTest.java
@@ -16,9 +16,12 @@
  */
 package org.apache.camel.processor.aggregate.jdbc;
 
+import java.util.concurrent.TimeUnit;
+
 import org.apache.camel.component.mock.MockEndpoint;
 import org.junit.jupiter.api.Test;
 
+import static org.awaitility.Awaitility.await;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
 public class JdbcAggregateCompletionIntervalTest extends 
AbstractJdbcAggregationTestSupport {
@@ -29,15 +32,15 @@ public class JdbcAggregateCompletionIntervalTest extends 
AbstractJdbcAggregation
         mock.setResultWaitTime(30 * 1000L);
         mock.expectedBodiesReceived("ABCD", "E");
 
-        // wait a bit so we complete on the next poll
-        Thread.sleep(2000);
-
         template.sendBodyAndHeader("direct:start", "A", "id", 123);
         template.sendBodyAndHeader("direct:start", "B", "id", 123);
         template.sendBodyAndHeader("direct:start", "C", "id", 123);
         template.sendBodyAndHeader("direct:start", "D", "id", 123);
 
-        Thread.sleep(6000);
+        // Wait for the completion interval to fire and aggregate ABCD
+        // before sending E, so E ends up in a separate batch
+        await().atMost(20, TimeUnit.SECONDS)
+                .untilAsserted(() -> assertEquals(1, 
mock.getReceivedExchanges().size()));
 
         template.sendBodyAndHeader("direct:start", "E", "id", 123);
 
diff --git 
a/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateConcurrentDifferentGroupsTest.java
 
b/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateConcurrentDifferentGroupsTest.java
index d44a4b0106ae..b98183729583 100644
--- 
a/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateConcurrentDifferentGroupsTest.java
+++ 
b/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateConcurrentDifferentGroupsTest.java
@@ -16,9 +16,9 @@
  */
 package org.apache.camel.processor.aggregate.jdbc;
 
-import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.camel.component.mock.MockEndpoint;
 import org.junit.jupiter.api.Test;
@@ -48,14 +48,12 @@ public class JdbcAggregateConcurrentDifferentGroupsTest 
extends AbstractJdbcAggr
         ExecutorService executor = Executors.newFixedThreadPool(poolSize);
         for (int i = 0; i < files; i++) {
             final int index = i;
-            executor.submit(new Callable<Object>() {
-                public Object call() throws Exception {
-                    String id = index % 2 == 0 ? "A" : "B";
-                    template.sendBodyAndHeader("direct:start", index, "id", 
id);
-                    // simulate a little delay
-                    Thread.sleep(3);
-                    return null;
-                }
+            executor.submit(() -> {
+                String id = index % 2 == 0 ? "A" : "B";
+                template.sendBodyAndHeader("direct:start", index, "id", id);
+                // simulate a little delay
+                TimeUnit.MILLISECONDS.sleep(3);
+                return null;
             });
         }
 
diff --git 
a/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateConcurrentSameGroupTest.java
 
b/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateConcurrentSameGroupTest.java
index a77a36a33438..29ef263e7864 100644
--- 
a/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateConcurrentSameGroupTest.java
+++ 
b/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateConcurrentSameGroupTest.java
@@ -16,9 +16,9 @@
  */
 package org.apache.camel.processor.aggregate.jdbc;
 
-import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.camel.component.mock.MockEndpoint;
 import org.junit.jupiter.api.Test;
@@ -48,13 +48,11 @@ public class JdbcAggregateConcurrentSameGroupTest extends 
AbstractJdbcAggregatio
         ExecutorService executor = Executors.newFixedThreadPool(poolSize);
         for (int i = 0; i < files; i++) {
             final int index = i;
-            executor.submit(new Callable<Object>() {
-                public Object call() throws Exception {
-                    template.sendBodyAndHeader("direct:start", index, "id", 
123);
-                    // simulate a little delay
-                    Thread.sleep(3);
-                    return null;
-                }
+            executor.submit(() -> {
+                template.sendBodyAndHeader("direct:start", index, "id", 123);
+                // simulate a little delay
+                TimeUnit.MILLISECONDS.sleep(3);
+                return null;
             });
         }
 
diff --git 
a/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateDiscardOnTimeoutTest.java
 
b/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateDiscardOnTimeoutTest.java
index c5784da56cc2..28af2ccecfaf 100644
--- 
a/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateDiscardOnTimeoutTest.java
+++ 
b/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateDiscardOnTimeoutTest.java
@@ -22,6 +22,9 @@ import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.junit.jupiter.api.Test;
 
+import static org.awaitility.Awaitility.await;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
 public class JdbcAggregateDiscardOnTimeoutTest extends 
AbstractJdbcAggregationTestSupport {
 
     @Test
@@ -32,21 +35,25 @@ public class JdbcAggregateDiscardOnTimeoutTest extends 
AbstractJdbcAggregationTe
         template.sendBodyAndHeader("direct:start", "A", "id", 123);
         template.sendBodyAndHeader("direct:start", "B", "id", 123);
 
-        // wait 2 seconds
-        Thread.sleep(2000);
-
+        // Wait long enough for the 1-second completion timeout to fire and 
discard the
+        // incomplete aggregate (only 2 of 3 messages). Use 
setSleepForEmptyTest so
+        // assertIsSatisfied waits before accepting the zero-message 
expectation.
+        mock.setSleepForEmptyTest(2000);
         mock.assertIsSatisfied();
 
-        // now send 3 which does not timeout
+        // now send 3 which completes by size before timeout
         mock.reset();
-        mock.expectedBodiesReceived("C+D+E");
+        mock.expectedBodiesReceived("ABC");
 
         template.sendBodyAndHeader("direct:start", "A", "id", 123);
         template.sendBodyAndHeader("direct:start", "B", "id", 123);
         template.sendBodyAndHeader("direct:start", "C", "id", 123);
 
-        // should complete before timeout
-        mock.await(1500, TimeUnit.MILLISECONDS);
+        // Wait for the aggregate to complete (completionSize=3 reached)
+        await().atMost(10, TimeUnit.SECONDS)
+                .untilAsserted(() -> assertEquals(1, 
mock.getReceivedExchanges().size()));
+
+        MockEndpoint.assertIsSatisfied(context);
     }
 
     @Override
diff --git 
a/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateLoadAndRecoverTest.java
 
b/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateLoadAndRecoverTest.java
index 58bdd574432e..748aef285901 100644
--- 
a/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateLoadAndRecoverTest.java
+++ 
b/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateLoadAndRecoverTest.java
@@ -18,6 +18,7 @@ package org.apache.camel.processor.aggregate.jdbc;
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.camel.Exchange;
@@ -53,7 +54,7 @@ public class JdbcAggregateLoadAndRecoverTest extends 
AbstractJdbcAggregationTest
             LOG.debug("Sending {} with id {}", value, id);
             template.sendBodyAndHeaders("seda:start?size=" + SIZE, value, 
headers);
             // simulate a little delay
-            Thread.sleep(3);
+            TimeUnit.MILLISECONDS.sleep(3);
         }
 
         LOG.info("Sending all {} message done. Now waiting for aggregation to 
complete.", SIZE);
diff --git 
a/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateLoadConcurrentTest.java
 
b/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateLoadConcurrentTest.java
index 1fcfc0eb0171..5557eb0a1d52 100644
--- 
a/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateLoadConcurrentTest.java
+++ 
b/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateLoadConcurrentTest.java
@@ -16,9 +16,9 @@
  */
 package org.apache.camel.processor.aggregate.jdbc;
 
-import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
@@ -45,15 +45,13 @@ public class JdbcAggregateLoadConcurrentTest extends 
AbstractJdbcAggregationTest
         for (int i = 0; i < SIZE; i++) {
             final int value = 1;
             final int key = i % 10;
-            executor.submit(new Callable<Object>() {
-                public Object call() throws Exception {
-                    char id = KEYS[key];
-                    LOG.debug("Sending {} with id {}", value, id);
-                    template.sendBodyAndHeader("direct:start", value, "id", 
Character.toString(id));
-                    // simulate a little delay
-                    Thread.sleep(3);
-                    return null;
-                }
+            executor.submit(() -> {
+                char id = KEYS[key];
+                LOG.debug("Sending {} with id {}", value, id);
+                template.sendBodyAndHeader("direct:start", value, "id", 
Character.toString(id));
+                // simulate a little delay
+                TimeUnit.MILLISECONDS.sleep(3);
+                return null;
             });
         }
 
diff --git 
a/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregationRepositoryRecoverExistingTest.java
 
b/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregationRepositoryRecoverExistingTest.java
index 39207897a731..59bb5ffd5bf7 100644
--- 
a/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregationRepositoryRecoverExistingTest.java
+++ 
b/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregationRepositoryRecoverExistingTest.java
@@ -16,11 +16,14 @@
  */
 package org.apache.camel.processor.aggregate.jdbc;
 
+import java.util.concurrent.TimeUnit;
+
 import org.apache.camel.Exchange;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.support.DefaultExchange;
 import org.junit.jupiter.api.Test;
 
+import static org.awaitility.Awaitility.await;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
@@ -62,10 +65,10 @@ public class JdbcAggregationRepositoryRecoverExistingTest 
extends AbstractJdbcAg
 
         String id = exchange1.getExchangeId();
 
-        // stop the repo
+        // stop the repo and wait for it to be fully stopped
         repo.stop();
-
-        Thread.sleep(1000);
+        await().atMost(5, TimeUnit.SECONDS)
+                .until(() -> repo.isStopped());
 
         // load the repo again
         repo.start();
diff --git 
a/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcRemoveConfirmOrderAggregateTest.java
 
b/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcRemoveConfirmOrderAggregateTest.java
index 01a18da2517b..2819e94002b0 100644
--- 
a/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcRemoveConfirmOrderAggregateTest.java
+++ 
b/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcRemoveConfirmOrderAggregateTest.java
@@ -46,9 +46,9 @@ public class JdbcRemoveConfirmOrderAggregateTest extends 
AbstractJdbcAggregation
             if ("main".equals(Thread.currentThread().getName()) && ++count == 
2) {
                 try {
                     LOG.debug("sleeping while committing...");
-                    Thread.sleep(300);
+                    TimeUnit.MILLISECONDS.sleep(300);
                 } catch (InterruptedException e) {
-                    e.printStackTrace();
+                    Thread.currentThread().interrupt();
                 }
             }
             super.doCommit(status);
@@ -62,9 +62,9 @@ public class JdbcRemoveConfirmOrderAggregateTest extends 
AbstractJdbcAggregation
             try {
                 // The recovery thread has an initial delay of 1 sec
                 LOG.debug("Delaying during aggregate");
-                Thread.sleep(500);
+                TimeUnit.MILLISECONDS.sleep(500);
             } catch (InterruptedException e) {
-                e.printStackTrace();
+                Thread.currentThread().interrupt();
             }
             return super.aggregate(oldExchange, newExchange);
         }

Reply via email to