This is an automated email from the ASF dual-hosted git repository.
gnodet pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new fd2adec0294a CAMEL-23892: Fix flaky camel-sql JDBC aggregate tests by
replacing Thread.sleep with Awaitility (#24391)
fd2adec0294a is described below
commit fd2adec0294a95bfb3e9bc2e1cb61f3a789dda01
Author: Guillaume Nodet <[email protected]>
AuthorDate: Fri Jul 3 14:02:03 2026 +0200
CAMEL-23892: Fix flaky camel-sql JDBC aggregate tests by replacing
Thread.sleep with Awaitility (#24391)
Signed-off-by: Guillaume Nodet <[email protected]>
Co-authored-by: Claude Opus 4.6 <[email protected]>
---
.../jdbc/AbstractJdbcAggregationTestSupport.java | 8 +++++---
.../jdbc/JdbcAggregateCompletionIntervalTest.java | 11 +++++++----
.../JdbcAggregateConcurrentDifferentGroupsTest.java | 16 +++++++---------
.../jdbc/JdbcAggregateConcurrentSameGroupTest.java | 14 ++++++--------
.../jdbc/JdbcAggregateDiscardOnTimeoutTest.java | 21 ++++++++++++++-------
.../jdbc/JdbcAggregateLoadAndRecoverTest.java | 3 ++-
.../jdbc/JdbcAggregateLoadConcurrentTest.java | 18 ++++++++----------
...dbcAggregationRepositoryRecoverExistingTest.java | 9 ++++++---
.../jdbc/JdbcRemoveConfirmOrderAggregateTest.java | 8 ++++----
9 files changed, 59 insertions(+), 49 deletions(-)
diff --git
a/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/AbstractJdbcAggregationTestSupport.java
b/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/AbstractJdbcAggregationTestSupport.java
index 4b7568964bdd..5026bd77b2b7 100644
---
a/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/AbstractJdbcAggregationTestSupport.java
+++
b/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/AbstractJdbcAggregationTestSupport.java
@@ -16,6 +16,8 @@
*/
package org.apache.camel.processor.aggregate.jdbc;
+import java.util.concurrent.TimeUnit;
+
import org.apache.camel.AggregationStrategy;
import org.apache.camel.Exchange;
import org.apache.camel.builder.RouteBuilder;
@@ -79,11 +81,11 @@ public abstract class AbstractJdbcAggregationTestSupport
extends CamelSpringTest
=
ObjectHelper.getException(OptimisticLockingAggregationRepository.OptimisticLockingException.class,
e);
if (ole != null) {
- // okay lets try again
+ // okay lets try again after a short back-off
try {
- Thread.sleep(50);
+ TimeUnit.MILLISECONDS.sleep(50);
} catch (InterruptedException ex) {
- // ignore
+ Thread.currentThread().interrupt();
}
continue;
}
diff --git
a/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateCompletionIntervalTest.java
b/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateCompletionIntervalTest.java
index a3f1de5dd57d..67215deb6326 100644
---
a/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateCompletionIntervalTest.java
+++
b/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateCompletionIntervalTest.java
@@ -16,9 +16,12 @@
*/
package org.apache.camel.processor.aggregate.jdbc;
+import java.util.concurrent.TimeUnit;
+
import org.apache.camel.component.mock.MockEndpoint;
import org.junit.jupiter.api.Test;
+import static org.awaitility.Awaitility.await;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class JdbcAggregateCompletionIntervalTest extends
AbstractJdbcAggregationTestSupport {
@@ -29,15 +32,15 @@ public class JdbcAggregateCompletionIntervalTest extends
AbstractJdbcAggregation
mock.setResultWaitTime(30 * 1000L);
mock.expectedBodiesReceived("ABCD", "E");
- // wait a bit so we complete on the next poll
- Thread.sleep(2000);
-
template.sendBodyAndHeader("direct:start", "A", "id", 123);
template.sendBodyAndHeader("direct:start", "B", "id", 123);
template.sendBodyAndHeader("direct:start", "C", "id", 123);
template.sendBodyAndHeader("direct:start", "D", "id", 123);
- Thread.sleep(6000);
+ // Wait for the completion interval to fire and aggregate ABCD
+ // before sending E, so E ends up in a separate batch
+ await().atMost(20, TimeUnit.SECONDS)
+ .untilAsserted(() -> assertEquals(1,
mock.getReceivedExchanges().size()));
template.sendBodyAndHeader("direct:start", "E", "id", 123);
diff --git
a/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateConcurrentDifferentGroupsTest.java
b/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateConcurrentDifferentGroupsTest.java
index d44a4b0106ae..b98183729583 100644
---
a/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateConcurrentDifferentGroupsTest.java
+++
b/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateConcurrentDifferentGroupsTest.java
@@ -16,9 +16,9 @@
*/
package org.apache.camel.processor.aggregate.jdbc;
-import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
import org.apache.camel.component.mock.MockEndpoint;
import org.junit.jupiter.api.Test;
@@ -48,14 +48,12 @@ public class JdbcAggregateConcurrentDifferentGroupsTest
extends AbstractJdbcAggr
ExecutorService executor = Executors.newFixedThreadPool(poolSize);
for (int i = 0; i < files; i++) {
final int index = i;
- executor.submit(new Callable<Object>() {
- public Object call() throws Exception {
- String id = index % 2 == 0 ? "A" : "B";
- template.sendBodyAndHeader("direct:start", index, "id",
id);
- // simulate a little delay
- Thread.sleep(3);
- return null;
- }
+ executor.submit(() -> {
+ String id = index % 2 == 0 ? "A" : "B";
+ template.sendBodyAndHeader("direct:start", index, "id", id);
+ // simulate a little delay
+ TimeUnit.MILLISECONDS.sleep(3);
+ return null;
});
}
diff --git
a/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateConcurrentSameGroupTest.java
b/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateConcurrentSameGroupTest.java
index a77a36a33438..29ef263e7864 100644
---
a/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateConcurrentSameGroupTest.java
+++
b/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateConcurrentSameGroupTest.java
@@ -16,9 +16,9 @@
*/
package org.apache.camel.processor.aggregate.jdbc;
-import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
import org.apache.camel.component.mock.MockEndpoint;
import org.junit.jupiter.api.Test;
@@ -48,13 +48,11 @@ public class JdbcAggregateConcurrentSameGroupTest extends
AbstractJdbcAggregatio
ExecutorService executor = Executors.newFixedThreadPool(poolSize);
for (int i = 0; i < files; i++) {
final int index = i;
- executor.submit(new Callable<Object>() {
- public Object call() throws Exception {
- template.sendBodyAndHeader("direct:start", index, "id",
123);
- // simulate a little delay
- Thread.sleep(3);
- return null;
- }
+ executor.submit(() -> {
+ template.sendBodyAndHeader("direct:start", index, "id", 123);
+ // simulate a little delay
+ TimeUnit.MILLISECONDS.sleep(3);
+ return null;
});
}
diff --git
a/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateDiscardOnTimeoutTest.java
b/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateDiscardOnTimeoutTest.java
index c5784da56cc2..28af2ccecfaf 100644
---
a/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateDiscardOnTimeoutTest.java
+++
b/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateDiscardOnTimeoutTest.java
@@ -22,6 +22,9 @@ import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.junit.jupiter.api.Test;
+import static org.awaitility.Awaitility.await;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
public class JdbcAggregateDiscardOnTimeoutTest extends
AbstractJdbcAggregationTestSupport {
@Test
@@ -32,21 +35,25 @@ public class JdbcAggregateDiscardOnTimeoutTest extends
AbstractJdbcAggregationTe
template.sendBodyAndHeader("direct:start", "A", "id", 123);
template.sendBodyAndHeader("direct:start", "B", "id", 123);
- // wait 2 seconds
- Thread.sleep(2000);
-
+ // Wait long enough for the 1-second completion timeout to fire and
discard the
+ // incomplete aggregate (only 2 of 3 messages). Use
setSleepForEmptyTest so
+ // assertIsSatisfied waits before accepting the zero-message
expectation.
+ mock.setSleepForEmptyTest(2000);
mock.assertIsSatisfied();
- // now send 3 which does not timeout
+ // now send 3 which completes by size before timeout
mock.reset();
- mock.expectedBodiesReceived("C+D+E");
+ mock.expectedBodiesReceived("ABC");
template.sendBodyAndHeader("direct:start", "A", "id", 123);
template.sendBodyAndHeader("direct:start", "B", "id", 123);
template.sendBodyAndHeader("direct:start", "C", "id", 123);
- // should complete before timeout
- mock.await(1500, TimeUnit.MILLISECONDS);
+ // Wait for the aggregate to complete (completionSize=3 reached)
+ await().atMost(10, TimeUnit.SECONDS)
+ .untilAsserted(() -> assertEquals(1,
mock.getReceivedExchanges().size()));
+
+ MockEndpoint.assertIsSatisfied(context);
}
@Override
diff --git
a/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateLoadAndRecoverTest.java
b/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateLoadAndRecoverTest.java
index 58bdd574432e..748aef285901 100644
---
a/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateLoadAndRecoverTest.java
+++
b/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateLoadAndRecoverTest.java
@@ -18,6 +18,7 @@ package org.apache.camel.processor.aggregate.jdbc;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.camel.Exchange;
@@ -53,7 +54,7 @@ public class JdbcAggregateLoadAndRecoverTest extends
AbstractJdbcAggregationTest
LOG.debug("Sending {} with id {}", value, id);
template.sendBodyAndHeaders("seda:start?size=" + SIZE, value,
headers);
// simulate a little delay
- Thread.sleep(3);
+ TimeUnit.MILLISECONDS.sleep(3);
}
LOG.info("Sending all {} message done. Now waiting for aggregation to
complete.", SIZE);
diff --git
a/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateLoadConcurrentTest.java
b/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateLoadConcurrentTest.java
index 1fcfc0eb0171..5557eb0a1d52 100644
---
a/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateLoadConcurrentTest.java
+++
b/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateLoadConcurrentTest.java
@@ -16,9 +16,9 @@
*/
package org.apache.camel.processor.aggregate.jdbc;
-import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
@@ -45,15 +45,13 @@ public class JdbcAggregateLoadConcurrentTest extends
AbstractJdbcAggregationTest
for (int i = 0; i < SIZE; i++) {
final int value = 1;
final int key = i % 10;
- executor.submit(new Callable<Object>() {
- public Object call() throws Exception {
- char id = KEYS[key];
- LOG.debug("Sending {} with id {}", value, id);
- template.sendBodyAndHeader("direct:start", value, "id",
Character.toString(id));
- // simulate a little delay
- Thread.sleep(3);
- return null;
- }
+ executor.submit(() -> {
+ char id = KEYS[key];
+ LOG.debug("Sending {} with id {}", value, id);
+ template.sendBodyAndHeader("direct:start", value, "id",
Character.toString(id));
+ // simulate a little delay
+ TimeUnit.MILLISECONDS.sleep(3);
+ return null;
});
}
diff --git
a/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregationRepositoryRecoverExistingTest.java
b/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregationRepositoryRecoverExistingTest.java
index 39207897a731..59bb5ffd5bf7 100644
---
a/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregationRepositoryRecoverExistingTest.java
+++
b/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregationRepositoryRecoverExistingTest.java
@@ -16,11 +16,14 @@
*/
package org.apache.camel.processor.aggregate.jdbc;
+import java.util.concurrent.TimeUnit;
+
import org.apache.camel.Exchange;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.support.DefaultExchange;
import org.junit.jupiter.api.Test;
+import static org.awaitility.Awaitility.await;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
@@ -62,10 +65,10 @@ public class JdbcAggregationRepositoryRecoverExistingTest
extends AbstractJdbcAg
String id = exchange1.getExchangeId();
- // stop the repo
+ // stop the repo and wait for it to be fully stopped
repo.stop();
-
- Thread.sleep(1000);
+ await().atMost(5, TimeUnit.SECONDS)
+ .until(() -> repo.isStopped());
// load the repo again
repo.start();
diff --git
a/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcRemoveConfirmOrderAggregateTest.java
b/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcRemoveConfirmOrderAggregateTest.java
index 01a18da2517b..2819e94002b0 100644
---
a/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcRemoveConfirmOrderAggregateTest.java
+++
b/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcRemoveConfirmOrderAggregateTest.java
@@ -46,9 +46,9 @@ public class JdbcRemoveConfirmOrderAggregateTest extends
AbstractJdbcAggregation
if ("main".equals(Thread.currentThread().getName()) && ++count ==
2) {
try {
LOG.debug("sleeping while committing...");
- Thread.sleep(300);
+ TimeUnit.MILLISECONDS.sleep(300);
} catch (InterruptedException e) {
- e.printStackTrace();
+ Thread.currentThread().interrupt();
}
}
super.doCommit(status);
@@ -62,9 +62,9 @@ public class JdbcRemoveConfirmOrderAggregateTest extends
AbstractJdbcAggregation
try {
// The recovery thread has an initial delay of 1 sec
LOG.debug("Delaying during aggregate");
- Thread.sleep(500);
+ TimeUnit.MILLISECONDS.sleep(500);
} catch (InterruptedException e) {
- e.printStackTrace();
+ Thread.currentThread().interrupt();
}
return super.aggregate(oldExchange, newExchange);
}