This is an automated email from the ASF dual-hosted git repository.
gnodet pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new 1beefe3ab34e CAMEL-23495: Replace Thread.sleep() with Awaitility and
fix ~30 flaky tests
1beefe3ab34e is described below
commit 1beefe3ab34e2df18aa4ee7e5c381339a2066337
Author: Guillaume Nodet <[email protected]>
AuthorDate: Thu May 14 21:41:11 2026 +0200
CAMEL-23495: Replace Thread.sleep() with Awaitility and fix ~30 flaky tests
Fix ~30 flaky tests in camel-core and camel-management:
- Replace Thread.sleep() with Awaitility in ~15 test files (suspend/resume,
management, executor, redelivery, scheduler tests)
- Increase tight timeouts for async seda, multicast, file consumer, and
aggregator tests under CI load
- Fix file consumer partial reads by increasing initialDelay from 0 to
1000ms in MarkerFileExclusiveReadLockStrategy tests
- Fix AggregateForceCompletionOnStopParallelTest race: use Awaitility after
async context.stop()/stopRoute() with parallelProcessing
- Fix AsyncEndpointRedeliveryErrorHandlerNonBlockedDelay2Test bugs: reset
static counter between runs, fix mock:result -> mock:before copy-paste
- Use syncDelayed() for deterministic ordering in MulticastParallelStreaming
and RecipientListParallelStreaming tests
All 30 test classes verified stable with 0/100 failures in iteration loop.
---
.../file/FileConsumeFilesAndDeleteTest.java | 2 +-
...lusiveReadLockStrategyRecursiveCleanupTest.java | 10 ++---
...kerFileExclusiveReadLockStrategyUnlockTest.java | 4 +-
.../MarkerFileExclusiveReadLockStrategyTest.java | 2 +-
.../SchedulerMulticastParallelGreedyTest.java | 4 +-
.../seda/FileSedaShutdownCompleteAllTasksTest.java | 2 +-
.../component/seda/SedaBlockWhenFullTest.java | 3 +-
.../seda/SedaConsumerSuspendResumeTest.java | 13 +++---
.../DefaultCamelContextSuspendResumeRouteTest.java | 15 +++----
.../impl/DefaultExecutorServiceManagerTest.java | 7 ++-
.../camel/impl/RouteSedaSuspendResumeTest.java | 6 +--
.../camel/impl/TwoRouteSuspendResumeTest.java | 17 +++-----
...ThreadsRejectedExecutionWithDeadLetterTest.java | 6 ++-
.../processor/MulticastParallelStreamingTest.java | 2 +-
.../MulticastParallelStreamingTimeoutTest.java | 5 ++-
.../MulticastParallelTimeoutStreamCachingTest.java | 2 +
...deliveryWhileStoppingDeadLetterChannelTest.java | 7 ++-
.../NotAllowRedeliveryWhileStoppingTest.java | 8 +++-
.../RecipientListParallelStreamingTest.java | 4 +-
...tterErrorHandlerNoRedeliveryOnShutdownTest.java | 6 ++-
.../ResequenceStreamRejectOldExchangesTest.java | 8 +++-
.../ShutdownCompleteCurrentTaskOnlyTest.java | 2 +-
.../AggregateClosedCorrelationKeyTest.java | 7 ++-
.../aggregator/AggregateCompleteAllOnStopTest.java | 2 +-
.../aggregator/AggregateDiscardOnTimeoutTest.java | 5 +--
...AggregateForceCompletionOnStopParallelTest.java | 50 ++++++++++++++++++++++
.../AggregateGroupedExchangeBatchSizeTest.java | 8 +++-
...RedeliveryErrorHandlerNonBlockedDelay2Test.java | 10 ++++-
...ScopedOnExceptionLoadBalancerStopRouteTest.java | 1 +
.../management/ManagedInflightStatisticsTest.java | 21 +++++----
30 files changed, 165 insertions(+), 74 deletions(-)
diff --git
a/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumeFilesAndDeleteTest.java
b/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumeFilesAndDeleteTest.java
index 366261509387..f75a1aae8fe4 100644
---
a/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumeFilesAndDeleteTest.java
+++
b/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumeFilesAndDeleteTest.java
@@ -56,7 +56,7 @@ public class FileConsumeFilesAndDeleteTest extends
ContextTestSupport {
protected RouteBuilder createRouteBuilder() {
return new RouteBuilder() {
public void configure() {
- from(fileUri("?initialDelay=0&delay=10&fileName=" +
TEST_FILE_NAME_1 + "&delete=true"))
+ from(fileUri("?initialDelay=0&delay=2000&fileName=" +
TEST_FILE_NAME_1 + "&delete=true"))
.convertBodyTo(String.class).to("mock:result");
}
};
diff --git
a/core/camel-core/src/test/java/org/apache/camel/component/file/MarkerFileExclusiveReadLockStrategyRecursiveCleanupTest.java
b/core/camel-core/src/test/java/org/apache/camel/component/file/MarkerFileExclusiveReadLockStrategyRecursiveCleanupTest.java
index 517cfc19c360..cc7998e461ca 100644
---
a/core/camel-core/src/test/java/org/apache/camel/component/file/MarkerFileExclusiveReadLockStrategyRecursiveCleanupTest.java
+++
b/core/camel-core/src/test/java/org/apache/camel/component/file/MarkerFileExclusiveReadLockStrategyRecursiveCleanupTest.java
@@ -56,7 +56,7 @@ public class
MarkerFileExclusiveReadLockStrategyRecursiveCleanupTest extends Con
@Override
public void configure() {
from(fileUri(
-
"d1?fileName=d1.dat&readLock=markerFile&readLockDeleteOrphanLockFiles=true&initialDelay=0&delay=10"))
+
"d1?fileName=d1.dat&readLock=markerFile&readLockDeleteOrphanLockFiles=true&initialDelay=1000&delay=2000"))
.to("mock:result");
}
});
@@ -78,7 +78,7 @@ public class
MarkerFileExclusiveReadLockStrategyRecursiveCleanupTest extends Con
@Override
public void configure() {
from(fileUri(
-
"d1?include=.*.dat&readLock=markerFile&readLockDeleteOrphanLockFiles=true&initialDelay=0&delay=10&recursive=true&minDepth=2&maxDepth=2"))
+
"d1?include=.*.dat&readLock=markerFile&readLockDeleteOrphanLockFiles=true&initialDelay=1000&delay=2000&recursive=true&minDepth=2&maxDepth=2"))
.to("mock:result");
}
});
@@ -104,7 +104,7 @@ public class
MarkerFileExclusiveReadLockStrategyRecursiveCleanupTest extends Con
@Override
public void configure() {
from(fileUri(
-
"d1?include=.*.dat&readLock=markerFile&readLockDeleteOrphanLockFiles=true&initialDelay=0&delay=10&recursive=true&minDepth=2&maxDepth=4"))
+
"d1?include=.*.dat&readLock=markerFile&readLockDeleteOrphanLockFiles=true&initialDelay=1000&delay=2000&recursive=true&minDepth=2&maxDepth=4"))
.to("mock:result");
}
});
@@ -129,7 +129,7 @@ public class
MarkerFileExclusiveReadLockStrategyRecursiveCleanupTest extends Con
@Override
public void configure() {
from(fileUri(
-
"d1?antInclude=**/*.dat&readLock=markerFile&readLockDeleteOrphanLockFiles=true&initialDelay=0&delay=10&recursive=true&minDepth=2&maxDepth=4"))
+
"d1?antInclude=**/*.dat&readLock=markerFile&readLockDeleteOrphanLockFiles=true&initialDelay=1000&delay=2000&recursive=true&minDepth=2&maxDepth=4"))
.to("mock:result");
}
});
@@ -154,7 +154,7 @@ public class
MarkerFileExclusiveReadLockStrategyRecursiveCleanupTest extends Con
@Override
public void configure() {
from(fileUri(
-
"d1?include=.*.dat&readLock=markerFile&readLockDeleteOrphanLockFiles=true&initialDelay=0&delay=10&recursive=true"))
+
"d1?include=.*.dat&readLock=markerFile&readLockDeleteOrphanLockFiles=true&initialDelay=1000&delay=2000&recursive=true"))
.to("mock:result");
}
});
diff --git
a/core/camel-core/src/test/java/org/apache/camel/component/file/MarkerFileExclusiveReadLockStrategyUnlockTest.java
b/core/camel-core/src/test/java/org/apache/camel/component/file/MarkerFileExclusiveReadLockStrategyUnlockTest.java
index 9bfe76a3f180..7142ea45d8a3 100644
---
a/core/camel-core/src/test/java/org/apache/camel/component/file/MarkerFileExclusiveReadLockStrategyUnlockTest.java
+++
b/core/camel-core/src/test/java/org/apache/camel/component/file/MarkerFileExclusiveReadLockStrategyUnlockTest.java
@@ -58,8 +58,8 @@ public class MarkerFileExclusiveReadLockStrategyUnlockTest
extends ContextTestSu
return new RouteBuilder() {
@Override
public void configure() {
-
from(fileUri("input-a?fileName=file1.dat&readLock=markerFile&initialDelay=0&delay=10"))
-
.pollEnrich(fileUri("input-b?fileName=file2.dat&readLock=markerFile&initialDelay=0&delay=10"))
+
from(fileUri("input-a?fileName=file1.dat&readLock=markerFile&initialDelay=0&delay=2000"))
+
.pollEnrich(fileUri("input-b?fileName=file2.dat&readLock=markerFile&initialDelay=0&delay=2000"))
.to("mock:result");
}
};
diff --git
a/core/camel-core/src/test/java/org/apache/camel/component/file/strategy/MarkerFileExclusiveReadLockStrategyTest.java
b/core/camel-core/src/test/java/org/apache/camel/component/file/strategy/MarkerFileExclusiveReadLockStrategyTest.java
index 94ac7c860025..5defded228ca 100644
---
a/core/camel-core/src/test/java/org/apache/camel/component/file/strategy/MarkerFileExclusiveReadLockStrategyTest.java
+++
b/core/camel-core/src/test/java/org/apache/camel/component/file/strategy/MarkerFileExclusiveReadLockStrategyTest.java
@@ -104,7 +104,7 @@ public class MarkerFileExclusiveReadLockStrategyTest
extends ContextTestSupport
return new RouteBuilder() {
@Override
public void configure() {
-
from(fileUri("in?readLock=markerFile&initialDelay=0&delay=10")).onCompletion()
+
from(fileUri("in?readLock=markerFile&initialDelay=1000&delay=2000")).onCompletion()
.process(new Processor() {
public void process(Exchange exchange) {
numberOfFilesProcessed.addAndGet(1);
diff --git
a/core/camel-core/src/test/java/org/apache/camel/component/scheduler/SchedulerMulticastParallelGreedyTest.java
b/core/camel-core/src/test/java/org/apache/camel/component/scheduler/SchedulerMulticastParallelGreedyTest.java
index 6769665adaa3..3a0078141e20 100644
---
a/core/camel-core/src/test/java/org/apache/camel/component/scheduler/SchedulerMulticastParallelGreedyTest.java
+++
b/core/camel-core/src/test/java/org/apache/camel/component/scheduler/SchedulerMulticastParallelGreedyTest.java
@@ -28,9 +28,7 @@ public class SchedulerMulticastParallelGreedyTest extends
ContextTestSupport {
public void testGreedy() throws Exception {
MockEndpoint mock = getMockEndpoint("mock:parentComplete");
mock.expectedMessageCount(1);
-
- // give it time to see if too many messages are sent if greedy kicks-in
- Thread.sleep(50);
+ mock.setAssertPeriod(200);
assertMockEndpointsSatisfied();
}
diff --git
a/core/camel-core/src/test/java/org/apache/camel/component/seda/FileSedaShutdownCompleteAllTasksTest.java
b/core/camel-core/src/test/java/org/apache/camel/component/seda/FileSedaShutdownCompleteAllTasksTest.java
index 5d48f341d62b..e8848183987e 100644
---
a/core/camel-core/src/test/java/org/apache/camel/component/seda/FileSedaShutdownCompleteAllTasksTest.java
+++
b/core/camel-core/src/test/java/org/apache/camel/component/seda/FileSedaShutdownCompleteAllTasksTest.java
@@ -33,7 +33,7 @@ public class FileSedaShutdownCompleteAllTasksTest extends
ContextTestSupport {
@Test
public void testShutdownCompleteAllTasks() throws Exception {
- String url = fileUri("?initialDelay=0&delay=10");
+ String url = fileUri("?initialDelay=0&delay=2000");
// prepare 5 files to begin with
template.sendBodyAndHeader(url, "A", Exchange.FILE_NAME, "a.txt");
diff --git
a/core/camel-core/src/test/java/org/apache/camel/component/seda/SedaBlockWhenFullTest.java
b/core/camel-core/src/test/java/org/apache/camel/component/seda/SedaBlockWhenFullTest.java
index 0af9be03db43..f3a8e4f1afd7 100644
---
a/core/camel-core/src/test/java/org/apache/camel/component/seda/SedaBlockWhenFullTest.java
+++
b/core/camel-core/src/test/java/org/apache/camel/component/seda/SedaBlockWhenFullTest.java
@@ -97,8 +97,7 @@ public class SedaBlockWhenFullTest extends ContextTestSupport
{
assertEquals(QUEUE_SIZE, seda.getQueue().remainingCapacity());
asyncSendTwoOverCapacity(BLOCK_WHEN_FULL_URI, QUEUE_SIZE + 4);
- // wait a bit to allow the async processing to complete
- assertMockEndpointsSatisfied(2, TimeUnit.SECONDS);
+ assertMockEndpointsSatisfied(5, TimeUnit.SECONDS);
}
/**
diff --git
a/core/camel-core/src/test/java/org/apache/camel/component/seda/SedaConsumerSuspendResumeTest.java
b/core/camel-core/src/test/java/org/apache/camel/component/seda/SedaConsumerSuspendResumeTest.java
index e489bd24464c..3f280b78f92b 100644
---
a/core/camel-core/src/test/java/org/apache/camel/component/seda/SedaConsumerSuspendResumeTest.java
+++
b/core/camel-core/src/test/java/org/apache/camel/component/seda/SedaConsumerSuspendResumeTest.java
@@ -59,15 +59,14 @@ public class SedaConsumerSuspendResumeTest extends
ContextTestSupport {
// mode where
// it would poll and route (there is a little slack (up till 1 sec)
// before suspension is empowered)
- await().atMost(1, TimeUnit.SECONDS)
+ // wait for queues to empty
+ await().atMost(5, TimeUnit.SECONDS)
.until(() -> context.getEndpoint("seda:foo",
SedaEndpoint.class).getQueue().isEmpty()
&& context.getEndpoint("seda:bar",
SedaEndpoint.class).getQueue().isEmpty());
-
- // even though we wait for the queues to empty, there is a race
condition where the consumer
- // may still process messages while it's being suspended due to
asynchronous message handling.
- // as a result, we need to wait a bit longer to ensure that the seda
consumer is suspended before
- // sending the next message.
- Thread.sleep(1000L);
+ // give consumer thread time to idle after queues empty
+ await().pollDelay(1, TimeUnit.SECONDS)
+ .atMost(5, TimeUnit.SECONDS)
+ .untilAsserted(() -> assertEquals("Suspended",
consumer.getStatus().name()));
template.sendBody("seda:foo", "B");
// wait a little to ensure seda consumer thread would have tried to
poll
diff --git
a/core/camel-core/src/test/java/org/apache/camel/impl/DefaultCamelContextSuspendResumeRouteTest.java
b/core/camel-core/src/test/java/org/apache/camel/impl/DefaultCamelContextSuspendResumeRouteTest.java
index c5a5c872b59e..4fd8ed6f3fef 100644
---
a/core/camel-core/src/test/java/org/apache/camel/impl/DefaultCamelContextSuspendResumeRouteTest.java
+++
b/core/camel-core/src/test/java/org/apache/camel/impl/DefaultCamelContextSuspendResumeRouteTest.java
@@ -49,15 +49,12 @@ public class DefaultCamelContextSuspendResumeRouteTest
extends ContextTestSuppor
context.suspend();
- // even though we wait for the route to suspend, there is a race
condition where the consumer
- // may still process messages while it's being suspended due to
asynchronous message handling.
- // as a result, we need to wait a bit longer to ensure that the seda
consumer is suspended before
- // sending the next message.
- Thread.sleep(1000L);
-
- // need to give seda consumer thread time to idle
- Awaitility.await().atMost(200, TimeUnit.MILLISECONDS)
- .pollDelay(100, TimeUnit.MILLISECONDS)
+ // wait for the context to be fully suspended
+ Awaitility.await().atMost(5, TimeUnit.SECONDS)
+ .until(() -> context.isSuspended());
+ // give seda consumer thread time to complete its current poll cycle
+ Awaitility.await().pollDelay(1, TimeUnit.SECONDS)
+ .atMost(5, TimeUnit.SECONDS)
.untilAsserted(() -> Assertions.assertDoesNotThrow(() ->
template.sendBody("seda:foo", "B")));
mock.assertIsSatisfied(1000);
diff --git
a/core/camel-core/src/test/java/org/apache/camel/impl/DefaultExecutorServiceManagerTest.java
b/core/camel-core/src/test/java/org/apache/camel/impl/DefaultExecutorServiceManagerTest.java
index 91ca8e0ee9e8..7f0bd2783883 100644
---
a/core/camel-core/src/test/java/org/apache/camel/impl/DefaultExecutorServiceManagerTest.java
+++
b/core/camel-core/src/test/java/org/apache/camel/impl/DefaultExecutorServiceManagerTest.java
@@ -32,6 +32,7 @@ import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.DisabledIfSystemProperty;
+import static org.awaitility.Awaitility.await;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -540,12 +541,14 @@ public class DefaultExecutorServiceManagerTest extends
ContextTestSupport {
@Test
public void testLongShutdownOfThreadPool() throws Exception {
final CountDownLatch latch = new CountDownLatch(1);
+ final CountDownLatch started = new CountDownLatch(1);
ExecutorService pool =
context.getExecutorServiceManager().newSingleThreadExecutor(this, "Cool");
pool.execute(new Runnable() {
@Override
public void run() {
log.info("Starting thread");
+ started.countDown();
// this should take a long time to shutdown
try {
@@ -558,8 +561,8 @@ public class DefaultExecutorServiceManagerTest extends
ContextTestSupport {
}
});
- // sleep a bit before shutting down
- Thread.sleep(3000);
+ // wait for the task to start before shutting down
+ await().atMost(5, TimeUnit.SECONDS).until(() -> started.getCount() ==
0);
context.getExecutorServiceManager().shutdown(pool);
diff --git
a/core/camel-core/src/test/java/org/apache/camel/impl/RouteSedaSuspendResumeTest.java
b/core/camel-core/src/test/java/org/apache/camel/impl/RouteSedaSuspendResumeTest.java
index 4460350ba3c0..177fa3a2fea8 100644
---
a/core/camel-core/src/test/java/org/apache/camel/impl/RouteSedaSuspendResumeTest.java
+++
b/core/camel-core/src/test/java/org/apache/camel/impl/RouteSedaSuspendResumeTest.java
@@ -53,9 +53,9 @@ public class RouteSedaSuspendResumeTest extends
ContextTestSupport {
assertEquals("Suspended", statefulService.getStatus().name());
}
- Thread.sleep(1000L);
- // need to give seda consumer thread time to idle
- await().atMost(1, TimeUnit.SECONDS)
+ // need to give seda consumer thread time to idle after suspension
+ await().pollDelay(1, TimeUnit.SECONDS)
+ .atMost(5, TimeUnit.SECONDS)
.until(() -> context.getEndpoint("seda:foo",
SedaEndpoint.class).getQueue().isEmpty());
template.sendBody("seda:foo", "B");
diff --git
a/core/camel-core/src/test/java/org/apache/camel/impl/TwoRouteSuspendResumeTest.java
b/core/camel-core/src/test/java/org/apache/camel/impl/TwoRouteSuspendResumeTest.java
index accf3d555a03..34bc11c6ccf2 100644
---
a/core/camel-core/src/test/java/org/apache/camel/impl/TwoRouteSuspendResumeTest.java
+++
b/core/camel-core/src/test/java/org/apache/camel/impl/TwoRouteSuspendResumeTest.java
@@ -49,16 +49,13 @@ public class TwoRouteSuspendResumeTest extends
ContextTestSupport {
context.getRouteController().suspendRoute("foo");
- // need to give seda consumer thread time to idle
- await().atMost(1, TimeUnit.SECONDS).until(() -> {
- return context.getEndpoint("seda:foo",
SedaEndpoint.class).getQueue().isEmpty();
- });
-
- // even though we wait for the queues to empty, there is a race
condition where the consumer
- // may still process messages while it's being suspended due to
asynchronous message handling.
- // as a result, we need to wait a bit longer to ensure that the seda
consumer is suspended before
- // sending the next message.
- Thread.sleep(1000L);
+ // wait for seda queue to empty
+ await().atMost(5, TimeUnit.SECONDS)
+ .until(() -> context.getEndpoint("seda:foo",
SedaEndpoint.class).getQueue().isEmpty());
+ // give consumer thread time to idle after queue empties
+ await().pollDelay(1, TimeUnit.SECONDS)
+ .atMost(5, TimeUnit.SECONDS)
+ .untilAsserted(() -> assertEquals("Suspended",
context.getRouteController().getRouteStatus("foo").name()));
template.sendBody("seda:foo", "B");
template.sendBody("direct:bar", "C");
diff --git
a/core/camel-core/src/test/java/org/apache/camel/issues/ThreadsRejectedExecutionWithDeadLetterTest.java
b/core/camel-core/src/test/java/org/apache/camel/issues/ThreadsRejectedExecutionWithDeadLetterTest.java
index c9fc6be229ab..505c495afe24 100644
---
a/core/camel-core/src/test/java/org/apache/camel/issues/ThreadsRejectedExecutionWithDeadLetterTest.java
+++
b/core/camel-core/src/test/java/org/apache/camel/issues/ThreadsRejectedExecutionWithDeadLetterTest.java
@@ -26,6 +26,8 @@ import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.util.concurrent.ThreadPoolRejectedPolicy;
import org.junit.jupiter.api.Test;
+import static org.awaitility.Awaitility.await;
+
public class ThreadsRejectedExecutionWithDeadLetterTest extends
ContextTestSupport {
@Override
@@ -62,7 +64,9 @@ public class ThreadsRejectedExecutionWithDeadLetterTest
extends ContextTestSuppo
template.sendBody("seda:start", "Hi World"); // will be queued
template.sendBody("seda:start", "Bye World"); // will be rejected
- Thread.sleep(100);
+ // wait for the rejected message to arrive at dead letter
+ await().atMost(5, TimeUnit.SECONDS)
+ .until(() ->
getMockEndpoint("mock:failed").getReceivedCounter() >= 1);
latch.countDown();
latch.countDown();
diff --git
a/core/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelStreamingTest.java
b/core/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelStreamingTest.java
index 35fdff3b5b39..b5b9d2ba211a 100644
---
a/core/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelStreamingTest.java
+++
b/core/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelStreamingTest.java
@@ -78,7 +78,7 @@ public class MulticastParallelStreamingTest extends
ContextTestSupport {
// use end to indicate end of multicast route
.end().to("mock:result");
-
from("direct:a").delay(2000).asyncDelayed().setBody(constant("A"));
+
from("direct:a").delay(2000).syncDelayed().setBody(constant("A"));
from("direct:b").setBody(constant("B"));
}
diff --git
a/core/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelStreamingTimeoutTest.java
b/core/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelStreamingTimeoutTest.java
index ba4a4e23a5cc..7bab3e5d46be 100644
---
a/core/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelStreamingTimeoutTest.java
+++
b/core/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelStreamingTimeoutTest.java
@@ -33,6 +33,7 @@ public class MulticastParallelStreamingTimeoutTest extends
ContextTestSupport {
MockEndpoint mock = getMockEndpoint("mock:result");
// A will timeout so we only get B and C (C is faster than B)
mock.expectedBodiesReceived("CB");
+ mock.setResultWaitTime(20000);
template.sendBody("direct:start", "Hello");
@@ -54,11 +55,11 @@ public class MulticastParallelStreamingTimeoutTest extends
ContextTestSupport {
oldExchange.getIn().setBody(body +
newExchange.getIn().getBody(String.class));
return oldExchange;
}
-
}).parallelProcessing().streaming().timeout(5000).to("direct:a", "direct:b",
"direct:c")
+
}).parallelProcessing().streaming().timeout(10000).to("direct:a", "direct:b",
"direct:c")
// use end to indicate end of multicast route
.end().to("mock:result");
- from("direct:a").delay(10000).setBody(constant("A"));
+ from("direct:a").delay(20000).setBody(constant("A"));
from("direct:b").delay(500).setBody(constant("B"));
diff --git
a/core/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelTimeoutStreamCachingTest.java
b/core/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelTimeoutStreamCachingTest.java
index f579b9f2bb22..31c2c0182797 100644
---
a/core/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelTimeoutStreamCachingTest.java
+++
b/core/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelTimeoutStreamCachingTest.java
@@ -67,7 +67,9 @@ public class MulticastParallelTimeoutStreamCachingTest
extends ContextTestSuppor
@Test
public void
testCreateOutputStreamCacheBeforeTimeoutButWriteToOutputStreamCacheAfterTimeout()
throws Exception {
getMockEndpoint("mock:exception").expectedMessageCount(1);
+ getMockEndpoint("mock:exception").setResultWaitTime(15000);
getMockEndpoint("mock:y").expectedMessageCount(0);
+ getMockEndpoint("mock:y").setAssertPeriod(2000);
template.sendBody("direct:b", "testMessage");
assertMockEndpointsSatisfied();
diff --git
a/core/camel-core/src/test/java/org/apache/camel/processor/NotAllowRedeliveryWhileStoppingDeadLetterChannelTest.java
b/core/camel-core/src/test/java/org/apache/camel/processor/NotAllowRedeliveryWhileStoppingDeadLetterChannelTest.java
index 63db6440889d..f4b61710f74b 100644
---
a/core/camel-core/src/test/java/org/apache/camel/processor/NotAllowRedeliveryWhileStoppingDeadLetterChannelTest.java
+++
b/core/camel-core/src/test/java/org/apache/camel/processor/NotAllowRedeliveryWhileStoppingDeadLetterChannelTest.java
@@ -17,6 +17,7 @@
package org.apache.camel.processor;
import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
import org.apache.camel.ContextTestSupport;
import org.apache.camel.Exchange;
@@ -25,6 +26,7 @@ import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.util.StopWatch;
import org.junit.jupiter.api.Test;
+import static org.awaitility.Awaitility.await;
import static org.junit.jupiter.api.Assertions.*;
public class NotAllowRedeliveryWhileStoppingDeadLetterChannelTest extends
ContextTestSupport {
@@ -40,7 +42,10 @@ public class
NotAllowRedeliveryWhileStoppingDeadLetterChannelTest extends Contex
assertMockEndpointsSatisfied();
- Thread.sleep(500);
+ // wait for the error handler to start the redelivery cycle
+ await().pollDelay(500, TimeUnit.MILLISECONDS)
+ .atMost(2, TimeUnit.SECONDS)
+ .untilAsserted(() ->
assertTrue(context.getInflightRepository().size() > 0));
context.getRouteController().stopRoute("foo");
diff --git
a/core/camel-core/src/test/java/org/apache/camel/processor/NotAllowRedeliveryWhileStoppingTest.java
b/core/camel-core/src/test/java/org/apache/camel/processor/NotAllowRedeliveryWhileStoppingTest.java
index 473c2194ba26..683a64e4378d 100644
---
a/core/camel-core/src/test/java/org/apache/camel/processor/NotAllowRedeliveryWhileStoppingTest.java
+++
b/core/camel-core/src/test/java/org/apache/camel/processor/NotAllowRedeliveryWhileStoppingTest.java
@@ -16,12 +16,15 @@
*/
package org.apache.camel.processor;
+import java.util.concurrent.TimeUnit;
+
import org.apache.camel.ContextTestSupport;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.util.StopWatch;
import org.junit.jupiter.api.Test;
+import static org.awaitility.Awaitility.await;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class NotAllowRedeliveryWhileStoppingTest extends ContextTestSupport {
@@ -37,7 +40,10 @@ public class NotAllowRedeliveryWhileStoppingTest extends
ContextTestSupport {
assertMockEndpointsSatisfied();
- Thread.sleep(500);
+ // wait for the error handler to start the redelivery cycle
+ await().pollDelay(500, TimeUnit.MILLISECONDS)
+ .atMost(2, TimeUnit.SECONDS)
+ .untilAsserted(() ->
assertTrue(context.getInflightRepository().size() > 0));
context.stop();
diff --git
a/core/camel-core/src/test/java/org/apache/camel/processor/RecipientListParallelStreamingTest.java
b/core/camel-core/src/test/java/org/apache/camel/processor/RecipientListParallelStreamingTest.java
index 92f683c6846a..d7d6fbf278a0 100644
---
a/core/camel-core/src/test/java/org/apache/camel/processor/RecipientListParallelStreamingTest.java
+++
b/core/camel-core/src/test/java/org/apache/camel/processor/RecipientListParallelStreamingTest.java
@@ -49,8 +49,8 @@ public class RecipientListParallelStreamingTest extends
ContextTestSupport {
from("direct:streaming").recipientList(header("foo")).parallelProcessing().streaming().to("mock:result");
- from("direct:a").delay(100).transform(constant("a"));
- from("direct:b").delay(500).transform(constant("b"));
+
from("direct:a").delay(100).syncDelayed().transform(constant("a"));
+
from("direct:b").delay(500).syncDelayed().transform(constant("b"));
from("direct:c").transform(constant("c"));
}
};
diff --git
a/core/camel-core/src/test/java/org/apache/camel/processor/RedeliveryDeadLetterErrorHandlerNoRedeliveryOnShutdownTest.java
b/core/camel-core/src/test/java/org/apache/camel/processor/RedeliveryDeadLetterErrorHandlerNoRedeliveryOnShutdownTest.java
index 1d58b10b0bb4..5b164b5cfcde 100644
---
a/core/camel-core/src/test/java/org/apache/camel/processor/RedeliveryDeadLetterErrorHandlerNoRedeliveryOnShutdownTest.java
+++
b/core/camel-core/src/test/java/org/apache/camel/processor/RedeliveryDeadLetterErrorHandlerNoRedeliveryOnShutdownTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.camel.processor;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.camel.ContextTestSupport;
@@ -27,6 +28,7 @@ import org.apache.camel.util.StopWatch;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.parallel.Isolated;
+import static org.awaitility.Awaitility.await;
import static org.junit.jupiter.api.Assertions.assertTrue;
@Isolated
@@ -46,8 +48,8 @@ public class
RedeliveryDeadLetterErrorHandlerNoRedeliveryOnShutdownTest extends
// should not take long to stop the route
StopWatch watch = new StopWatch();
- // sleep 0.5 seconds to do some redeliveries before we stop
- Thread.sleep(500);
+ // wait for enough redeliveries before we stop
+ await().atMost(5, TimeUnit.SECONDS).until(() -> counter.get() >= 20);
log.info("==== stopping route foo ====");
context.getRouteController().stopRoute("foo");
long taken = watch.taken();
diff --git
a/core/camel-core/src/test/java/org/apache/camel/processor/ResequenceStreamRejectOldExchangesTest.java
b/core/camel-core/src/test/java/org/apache/camel/processor/ResequenceStreamRejectOldExchangesTest.java
index d89d5d643ada..97ce940cbee0 100644
---
a/core/camel-core/src/test/java/org/apache/camel/processor/ResequenceStreamRejectOldExchangesTest.java
+++
b/core/camel-core/src/test/java/org/apache/camel/processor/ResequenceStreamRejectOldExchangesTest.java
@@ -16,6 +16,8 @@
*/
package org.apache.camel.processor;
+import java.util.concurrent.TimeUnit;
+
import org.apache.camel.ContextTestSupport;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.processor.resequencer.MessageRejectedException;
@@ -23,6 +25,8 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.DisabledOnOs;
import org.junit.jupiter.api.condition.OS;
+import static org.awaitility.Awaitility.await;
+
@DisabledOnOs(value = { OS.LINUX },
architectures = { "s390x" },
disabledReason = "This test does not run reliably multiple
platforms (see CAMEL-21438)")
@@ -76,7 +80,9 @@ public class ResequenceStreamRejectOldExchangesTest extends
ContextTestSupport {
template.sendBodyAndHeader("direct:start", "D", "seqno", 4);
template.sendBodyAndHeader("direct:start", "A", "seqno", 1);
- Thread.sleep(100);
+ // wait for at least one message to be delivered before sending the
rest
+ await().atMost(5, TimeUnit.SECONDS)
+ .until(() ->
getMockEndpoint("mock:result").getReceivedCounter() >= 1);
template.sendBodyAndHeader("direct:start", "B", "seqno", 2);
template.sendBodyAndHeader("direct:start", "C", "seqno", 3);
diff --git
a/core/camel-core/src/test/java/org/apache/camel/processor/ShutdownCompleteCurrentTaskOnlyTest.java
b/core/camel-core/src/test/java/org/apache/camel/processor/ShutdownCompleteCurrentTaskOnlyTest.java
index f7b18d91418b..5350de95287a 100644
---
a/core/camel-core/src/test/java/org/apache/camel/processor/ShutdownCompleteCurrentTaskOnlyTest.java
+++
b/core/camel-core/src/test/java/org/apache/camel/processor/ShutdownCompleteCurrentTaskOnlyTest.java
@@ -31,7 +31,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
disabledReason = "This test does not run reliably on s390x (see
CAMEL-21438)")
public class ShutdownCompleteCurrentTaskOnlyTest extends ContextTestSupport {
- public static final String FILE_QUERY =
"?initialDelay=0&delay=10&synchronous=true";
+ public static final String FILE_QUERY =
"?initialDelay=0&delay=2000&synchronous=true";
@Override
@BeforeEach
diff --git
a/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateClosedCorrelationKeyTest.java
b/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateClosedCorrelationKeyTest.java
index b35f2af81719..1c21f508e76e 100644
---
a/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateClosedCorrelationKeyTest.java
+++
b/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateClosedCorrelationKeyTest.java
@@ -16,6 +16,8 @@
*/
package org.apache.camel.processor.aggregator;
+import java.util.concurrent.TimeUnit;
+
import org.apache.camel.CamelExecutionException;
import org.apache.camel.ContextTestSupport;
import org.apache.camel.builder.RouteBuilder;
@@ -23,6 +25,7 @@ import org.apache.camel.processor.BodyInAggregatingStrategy;
import org.apache.camel.processor.aggregate.ClosedCorrelationKeyException;
import org.junit.jupiter.api.Test;
+import static org.awaitility.Awaitility.await;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -79,7 +82,9 @@ public class AggregateClosedCorrelationKeyTest extends
ContextTestSupport {
template.sendBodyAndHeader("direct:start", "D", "id", 2);
template.sendBodyAndHeader("direct:start", "E", "id", 3);
template.sendBodyAndHeader("direct:start", "F", "id", 3);
- Thread.sleep(200);
+ // wait for all 3 aggregated results to arrive (keys become closed)
+ await().atMost(5, TimeUnit.SECONDS)
+ .until(() ->
getMockEndpoint("mock:result").getReceivedCounter() >= 3);
// 2 of them should now be closed
int closed = 0;
diff --git
a/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateCompleteAllOnStopTest.java
b/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateCompleteAllOnStopTest.java
index c378bf8ea8fe..36c20d7acb0d 100644
---
a/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateCompleteAllOnStopTest.java
+++
b/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateCompleteAllOnStopTest.java
@@ -58,7 +58,7 @@ public class AggregateCompleteAllOnStopTest extends
ContextTestSupport {
.to("mock:input")
.aggregate(header("id"), new
BodyInAggregatingStrategy())
.aggregationRepository(new
MemoryAggregationRepository())
-
.completionSize(2).completionTimeout(100).completeAllOnStop().completionTimeoutCheckerInterval(10)
+
.completionSize(2).completionTimeout(5000).completeAllOnStop().completionTimeoutCheckerInterval(10)
.to("mock:aggregated");
}
};
diff --git
a/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateDiscardOnTimeoutTest.java
b/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateDiscardOnTimeoutTest.java
index 098781a262e0..4719ab180a58 100644
---
a/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateDiscardOnTimeoutTest.java
+++
b/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateDiscardOnTimeoutTest.java
@@ -36,9 +36,8 @@ public class AggregateDiscardOnTimeoutTest extends
ContextTestSupport {
template.sendBodyAndHeader("direct:start", "A", "id", 123);
template.sendBodyAndHeader("direct:start", "B", "id", 123);
- // wait 0.25 seconds
- Thread.sleep(250);
-
+ // verify no aggregated message arrives (discarded on timeout)
+ mock.setAssertPeriod(500);
mock.assertIsSatisfied();
// now send 3 which does not timeout
diff --git
a/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateForceCompletionOnStopParallelTest.java
b/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateForceCompletionOnStopParallelTest.java
index b548d4f65a2a..c3c62a53140e 100644
---
a/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateForceCompletionOnStopParallelTest.java
+++
b/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateForceCompletionOnStopParallelTest.java
@@ -16,14 +16,64 @@
*/
package org.apache.camel.processor.aggregator;
+import java.util.concurrent.TimeUnit;
+
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.processor.BodyInAggregatingStrategy;
+import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.DisabledOnOs;
+import static org.awaitility.Awaitility.await;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
@DisabledOnOs(architectures = { "s390x" },
disabledReason = "This test does not run reliably on s390x (see
CAMEL-21438)")
public class AggregateForceCompletionOnStopParallelTest extends
AggregateForceCompletionOnStopTest {
+ @Override
+ @Test
+ public void testForceCompletionTrue() {
+ MyCompletionProcessor myCompletionProcessor
+ =
context.getRegistry().lookupByNameAndType("myCompletionProcessor",
MyCompletionProcessor.class);
+ myCompletionProcessor.reset();
+
+ context.getShutdownStrategy().setShutdownNowOnTimeout(true);
+ context.getShutdownStrategy().setTimeout(5);
+
+ template.sendBodyAndHeader("direct:forceCompletionTrue", "test1",
"id", "1");
+ template.sendBodyAndHeader("direct:forceCompletionTrue", "test2",
"id", "2");
+ template.sendBodyAndHeader("direct:forceCompletionTrue", "test3",
"id", "1");
+ template.sendBodyAndHeader("direct:forceCompletionTrue", "test4",
"id", "2");
+
+ assertEquals(0, myCompletionProcessor.getAggregationCount(),
"aggregation should not have completed yet");
+ context.stop();
+ await().atMost(5, TimeUnit.SECONDS)
+ .untilAsserted(() -> assertEquals(2,
myCompletionProcessor.getAggregationCount(),
+ "aggregation should have completed"));
+ }
+
+ @Override
+ @Test
+ public void testStopRouteForceCompletionTrue() throws Exception {
+ MyCompletionProcessor myCompletionProcessor
+ =
context.getRegistry().lookupByNameAndType("myCompletionProcessor",
MyCompletionProcessor.class);
+ myCompletionProcessor.reset();
+
+ context.getShutdownStrategy().setShutdownNowOnTimeout(true);
+ context.getShutdownStrategy().setTimeout(5);
+
+ template.sendBodyAndHeader("direct:forceCompletionTrue", "test1",
"id", "1");
+ template.sendBodyAndHeader("direct:forceCompletionTrue", "test2",
"id", "2");
+ template.sendBodyAndHeader("direct:forceCompletionTrue", "test3",
"id", "1");
+ template.sendBodyAndHeader("direct:forceCompletionTrue", "test4",
"id", "2");
+
+ assertEquals(0, myCompletionProcessor.getAggregationCount(),
"aggregation should not have completed yet");
+ context.getRouteController().stopRoute("foo");
+ await().atMost(5, TimeUnit.SECONDS)
+ .untilAsserted(() -> assertEquals(2,
myCompletionProcessor.getAggregationCount(),
+ "aggregation should have completed"));
+ }
+
@Override
protected RouteBuilder createRouteBuilder() {
return new RouteBuilder() {
diff --git
a/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeBatchSizeTest.java
b/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeBatchSizeTest.java
index 19a2b6ec1b5a..8c917e44f100 100644
---
a/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeBatchSizeTest.java
+++
b/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeBatchSizeTest.java
@@ -17,6 +17,7 @@
package org.apache.camel.processor.aggregator;
import java.util.List;
+import java.util.concurrent.TimeUnit;
import org.apache.camel.ContextTestSupport;
import org.apache.camel.Exchange;
@@ -25,6 +26,7 @@ import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.processor.aggregate.GroupedExchangeAggregationStrategy;
import org.junit.jupiter.api.Test;
+import static org.awaitility.Awaitility.await;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -58,8 +60,10 @@ public class AggregateGroupedExchangeBatchSizeTest extends
ContextTestSupport {
assertEquals("100", grouped.get(0).getIn().getBody(String.class));
assertEquals("150", grouped.get(1).getIn().getBody(String.class));
- // wait a bit for the remainder to come in
- Thread.sleep(1000);
+ // wait for the remainder to come in via completion timeout
+ await().atMost(5, TimeUnit.SECONDS)
+ .pollDelay(1, TimeUnit.SECONDS)
+ .untilAsserted(() -> assertTrue(result.getReceivedCounter() >=
1));
if (result.getReceivedCounter() == 2) {
diff --git
a/core/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointRedeliveryErrorHandlerNonBlockedDelay2Test.java
b/core/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointRedeliveryErrorHandlerNonBlockedDelay2Test.java
index a7737ab08679..4d0680e03602 100644
---
a/core/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointRedeliveryErrorHandlerNonBlockedDelay2Test.java
+++
b/core/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointRedeliveryErrorHandlerNonBlockedDelay2Test.java
@@ -23,6 +23,7 @@ import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -37,9 +38,16 @@ public class
AsyncEndpointRedeliveryErrorHandlerNonBlockedDelay2Test extends Con
private static String beforeThreadName;
private static String afterThreadName;
+ @Override
+ @BeforeEach
+ public void setUp() throws Exception {
+ super.setUp();
+ attempt.reset();
+ }
+
@Test
public void testRedelivery() throws Exception {
- MockEndpoint before = getMockEndpoint("mock:result");
+ MockEndpoint before = getMockEndpoint("mock:before");
before.expectedBodiesReceived("World");
MockEndpoint result = getMockEndpoint("mock:result");
diff --git
a/core/camel-core/src/test/java/org/apache/camel/processor/onexception/ContextScopedOnExceptionLoadBalancerStopRouteTest.java
b/core/camel-core/src/test/java/org/apache/camel/processor/onexception/ContextScopedOnExceptionLoadBalancerStopRouteTest.java
index 3c2323bbfdb3..ab35bdf0705b 100644
---
a/core/camel-core/src/test/java/org/apache/camel/processor/onexception/ContextScopedOnExceptionLoadBalancerStopRouteTest.java
+++
b/core/camel-core/src/test/java/org/apache/camel/processor/onexception/ContextScopedOnExceptionLoadBalancerStopRouteTest.java
@@ -57,6 +57,7 @@ public class
ContextScopedOnExceptionLoadBalancerStopRouteTest extends ContextTe
@Test
public void testErrorOk() throws Exception {
getMockEndpoint("mock:error").expectedBodiesReceived("Kaboom");
+ getMockEndpoint("mock:error").setResultWaitTime(5000);
getMockEndpoint("mock:start").expectedBodiesReceived("Kaboom",
"World");
getMockEndpoint("mock:result").expectedBodiesReceived("Bye World");
getMockEndpoint("mock:exception").expectedBodiesReceived("Kaboom");
diff --git
a/core/camel-management/src/test/java/org/apache/camel/management/ManagedInflightStatisticsTest.java
b/core/camel-management/src/test/java/org/apache/camel/management/ManagedInflightStatisticsTest.java
index 14d4eb983a8b..ee9342c7615f 100644
---
a/core/camel-management/src/test/java/org/apache/camel/management/ManagedInflightStatisticsTest.java
+++
b/core/camel-management/src/test/java/org/apache/camel/management/ManagedInflightStatisticsTest.java
@@ -70,7 +70,11 @@ public class ManagedInflightStatisticsTest extends
ManagementTestSupport {
// start some exchanges.
template.asyncSendBody("direct:start", latch1);
- Thread.sleep(250);
+ // wait for first exchange to be inflight before sending the second
+ await().atMost(5, TimeUnit.SECONDS).until(() -> {
+ Long num = (Long) mbeanServer.getAttribute(on,
"ExchangesInflight");
+ return num != null && num == 1;
+ });
template.asyncSendBody("direct:start", latch2);
await().atMost(2, TimeUnit.SECONDS).until(() -> {
@@ -91,8 +95,14 @@ public class ManagedInflightStatisticsTest extends
ManagementTestSupport {
// complete first exchange
latch1.countDown();
- // Lets wait for the first exchange to complete.
- Thread.sleep(200);
+ // wait for the first exchange to complete and the oldest to change
+ final Long tsSnapshot = ts;
+ final String idSnapshot = id;
+ await().atMost(5, TimeUnit.SECONDS).untilAsserted(() -> {
+ assertEquals(Long.valueOf(1), (Long) mbeanServer.getAttribute(on,
"ExchangesInflight"));
+ assertNotEquals(idSnapshot, (String) mbeanServer.getAttribute(on,
"OldestInflightExchangeId"));
+ assertNotEquals(tsSnapshot, (Long) mbeanServer.getAttribute(on,
"OldestInflightDuration"));
+ });
Long ts2 = (Long) mbeanServer.getAttribute(on,
"OldestInflightDuration");
assertNotNull(ts2);
String id2 = (String) mbeanServer.getAttribute(on,
"OldestInflightExchangeId");
@@ -100,11 +110,6 @@ public class ManagedInflightStatisticsTest extends
ManagementTestSupport {
log.info("Oldest Exchange id: {}, duration: {}", id2, ts2);
- // Lets verify the oldest changed.
- assertNotEquals(id, id2);
- // The duration values could be different
- assertNotEquals(ts, ts2);
-
latch2.countDown();
// Lets wait for all the exchanges to complete.