This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new 5ce7e9e6a393 CAMEL-21438: Fix and re-enable flaky tests on s390x
(#23997)
5ce7e9e6a393 is described below
commit 5ce7e9e6a393eb8dbc5b3a686f420d72d0b1d721
Author: Adriano Machado <[email protected]>
AuthorDate: Sat Jun 13 01:54:04 2026 -0400
CAMEL-21438: Fix and re-enable flaky tests on s390x (#23997)
* CAMEL-21438: Replace Thread.sleep with deterministic waits in three tests
MainListenerTest: replaced Thread.sleep(100) with a CountDownLatch that
counts down on the first MainListener event. The sleep was a fixed-time
guess that the background main.run() thread had initialized; the latch
makes the synchronization exact without adding a dependency.
ManagedThrottlingExceptionRoutePolicyTest: replaced Thread.sleep(200)
with Awaitility polling on proxy.getLastFailure() > 0. The sleep assumed
the JMX MBean would reflect the failure within 200ms, which is brittle
on loaded CI machines.
AsyncCompletionServiceTest: removed Thread.sleep(300) in
testSubmitOrderedFirstTaskIsSlow. The service.take() call already blocks
until the result is available in submission order, making the sleep
redundant. The parallel test
testSubmitOrderedFirstTaskIsSlowUsingPollTimeout
demonstrates the correct pattern.
Co-Authored-By: Claude Sonnet 4.6 <[email protected]>
* CAMEL-21438: Fix AggregateCompleteAllOnStopTest race on s390x
mock:input fires before the aggregator step, so assertIsSatisfied()
can return while C is still in-flight between mock:input and the
aggregation repository. Add an Awaitility barrier that polls the
MemoryAggregationRepository directly, ensuring C is stored before
stopRoute() is called. Remove the @DisabledOnOs(s390x) exclusion.
Co-Authored-By: Claude Sonnet 4.6 <[email protected]>
* CAMEL-21438: Fix FileNoOpLockFileTest 1s timeout too tight for s390x
After the mock is satisfied the file consumer still needs to delete
the .camelLock marker file. The 1-second Awaitility budget is too
small under QEMU-emulated s390x. Increase to 5 seconds and remove
the @DisabledOnOs(s390x) exclusion.
Co-Authored-By: Claude Sonnet 4.6 <[email protected]>
* CAMEL-21438: Fix FileConsumerResumeFromOffsetStrategyTest 2s wait on s390x
MockEndpoint.assertWait(2, SECONDS) unconditionally sleeps 2 seconds
then asserts. With the default file consumer initialDelay and slow
QEMU-emulated s390x the first poll may not have fired within that
budget. Replace with an Awaitility wait (up to 10 seconds) that
exits as soon as the first exchange arrives. Remove @DisabledOnOs(s390x).
Co-Authored-By: Claude Sonnet 4.6 <[email protected]>
* CAMEL-21438: Fix BarcodeDataFormatSpringTest flakiness on s390x
Pass the JUnit @TempDir path into the Spring XML property substitution so
each test method gets an isolated directory. Previously {{testDirectory}}
resolved to a fixed class-based path, causing the file consumer to
re-deliver files from prior tests when the context restarted.
Also removes deprecated SpringCamelContext.springCamelContext() call and
replaces isUseRouteBuilder() override with
testConfiguration().withUseRouteBuilder(false).
Co-Authored-By: Claude Sonnet 4.6 <[email protected]>
* CAMEL-21438: Re-enable XsltCustomizeEntityResolverTest on s390x
The @DisabledOnOs(s390x) annotation was a workaround for a file-consumer
race condition that CAMEL-23189 already fixed by replacing the file: route
with direct:start. The annotation is now stale; remove it.
Also simplify the EntityResolver anonymous class to a lambda and drop the
unused SAXException import.
Co-Authored-By: Claude Sonnet 4.6 <[email protected]>
* CAMEL-21438: Fix ThrottlerTest flakiness and re-enable on s390x
Three root causes fixed:
- testSendLotsOfMessagesButOnly3GetThroughWithin2Seconds: raise
setResultWaitTime from 2s to 10s so slow machines can still deliver
the first 3 messages; add assertEquals(3) after assertIsSatisfied()
to make the throttle-correctness assertion explicit.
- assertThrottlerTiming: drop the minimum elapsed-time bound, which
tests machine speed rather than throttle correctness, and caused
false failures on fast machines.
- sendMessagesWithHeaderExpression: start the elapsed timer from a
CountDownLatch fired by the first executing thread, not from task
submission, to avoid inflating elapsed with thread pool scheduling
overhead.
Also: remove stale "lets pause" comments, convert anonymous Runnables
to lambdas, drop the now-dead calculateMinimum method, add s390x to
the enabled architectures.
Co-Authored-By: Claude Sonnet 4.6 <[email protected]>
* CAMEL-21438: Fix ConcurrentRequestsThrottlerTest permit leak and
re-enable on s390x
The test semaphore acquire and release lived in two separate processors
with a delay(100) between them. If an exchange failed between the two
processors (assertion error, Camel error handler diversion), the
release never ran and the permit leaked, causing subsequent tryAcquire
calls to fail spuriously.
Fix: register an onCompletion callback inside the acquire processor so
the permit is released on both success and failure, regardless of what
happens to the exchange after the acquire. Remove the now-redundant
release processor from all three routes and from the Spring XML.
Also: remove the unused INTERVAL constant (left over after CAMEL-22539
replaced Thread.sleep with Awaitility), convert remaining anonymous
Runnables to lambdas, remove stale comments, and add s390x to the
enabled architectures.
Co-Authored-By: Claude Sonnet 4.6 <[email protected]>
* Update
core/camel-core/src/test/java/org/apache/camel/processor/throttle/concurrent/ConcurrentRequestsThrottlerTest.java
---------
Co-authored-by: Claude Sonnet 4.6 <[email protected]>
Co-authored-by: Claus Ibsen <[email protected]>
---
.../barcode/BarcodeDataFormatSpringTest.java | 25 ++++----
.../spring/processor/SpringThrottlerTest.java | 22 ++++---
.../SpringAggregateCompleteAllOnStopTest.java | 15 +++++
.../SpringAggregateCompleteAllOnStopTest.xml | 4 +-
.../apache/camel/spring/processor/throttler.xml | 7 +--
.../FileConsumerResumeFromOffsetStrategyTest.java | 8 +--
.../camel/component/file/FileNoOpLockFileTest.java | 7 +--
.../xslt/XsltCustomizeEntityResolverTest.java | 11 +---
.../aggregator/AggregateCompleteAllOnStopTest.java | 21 +++++--
.../ConcurrentRequestsThrottlerTest.java | 73 +++++++++++++---------
.../processor/throttle/requests/ThrottlerTest.java | 66 ++++++++-----------
.../org/apache/camel/main/MainListenerTest.java | 9 ++-
.../ManagedThrottlingExceptionRoutePolicyTest.java | 4 +-
.../concurrent/AsyncCompletionServiceTest.java | 2 -
14 files changed, 152 insertions(+), 122 deletions(-)
diff --git
a/components/camel-barcode/src/test/java/org/apache/camel/dataformat/barcode/BarcodeDataFormatSpringTest.java
b/components/camel-barcode/src/test/java/org/apache/camel/dataformat/barcode/BarcodeDataFormatSpringTest.java
index b60f7dfd1a1c..9c8773f09936 100644
---
a/components/camel-barcode/src/test/java/org/apache/camel/dataformat/barcode/BarcodeDataFormatSpringTest.java
+++
b/components/camel-barcode/src/test/java/org/apache/camel/dataformat/barcode/BarcodeDataFormatSpringTest.java
@@ -16,27 +16,26 @@
*/
package org.apache.camel.dataformat.barcode;
+import java.util.HashMap;
+import java.util.Map;
+
import org.apache.camel.CamelContext;
import org.apache.camel.spring.SpringCamelContext;
import org.apache.camel.test.spring.junit6.CamelSpringTestSupport;
-import org.junit.jupiter.api.condition.DisabledOnOs;
-import org.springframework.context.ApplicationContext;
+import org.springframework.context.support.AbstractXmlApplicationContext;
-@DisabledOnOs(architectures = { "s390x" },
- disabledReason = "This test does not run reliably on s390x (see
CAMEL-21438)")
public class BarcodeDataFormatSpringTest extends BarcodeDataFormatCamelTest {
- @Override
- public boolean isUseRouteBuilder() {
- return false;
- }
-
@Override
protected CamelContext createCamelContext() throws Exception {
- ApplicationContext applicationContext
- =
CamelSpringTestSupport.newAppContext("barcodeDataformatSpring.xml",
- getClass());
- return SpringCamelContext.springCamelContext(applicationContext, true);
+ testConfiguration().withUseRouteBuilder(false);
+ Map<String, String> props = new HashMap<>();
+ props.put(CamelSpringTestSupport.TEST_CLASS_NAME_PROPERTY,
getClass().getName());
+ props.put(CamelSpringTestSupport.TEST_CLASS_SIMPLE_NAME_PROPERTY,
getClass().getSimpleName());
+ props.put(CamelSpringTestSupport.TEST_DIRECTORY_PROPERTY,
testDirectory.toString());
+ AbstractXmlApplicationContext applicationContext
+ =
CamelSpringTestSupport.newAppContext("barcodeDataformatSpring.xml", getClass(),
props);
+ return applicationContext.getBean(SpringCamelContext.class);
}
}
diff --git
a/components/camel-spring-parent/camel-spring-xml/src/test/java/org/apache/camel/spring/processor/SpringThrottlerTest.java
b/components/camel-spring-parent/camel-spring-xml/src/test/java/org/apache/camel/spring/processor/SpringThrottlerTest.java
index 631313c2c5ec..c449332ad2d6 100644
---
a/components/camel-spring-parent/camel-spring-xml/src/test/java/org/apache/camel/spring/processor/SpringThrottlerTest.java
+++
b/components/camel-spring-parent/camel-spring-xml/src/test/java/org/apache/camel/spring/processor/SpringThrottlerTest.java
@@ -16,10 +16,13 @@
*/
package org.apache.camel.spring.processor;
+import java.util.concurrent.Semaphore;
+
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import
org.apache.camel.processor.throttle.concurrent.ConcurrentRequestsThrottlerTest;
+import org.apache.camel.support.SynchronizationAdapter;
import static
org.apache.camel.spring.processor.SpringTestHelper.createSpringCamelContext;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -35,14 +38,19 @@ public class SpringThrottlerTest extends
ConcurrentRequestsThrottlerTest {
public static class IncrementProcessor implements Processor {
@Override
public void process(Exchange exchange) throws Exception {
- assertTrue(semaphore.tryAcquire(), "too many requests");
- }
- }
+ Semaphore s = semaphore;
+ assertTrue(s.tryAcquire(), "too many requests");
+ exchange.getExchangeExtension().addOnCompletion(new
SynchronizationAdapter() {
+ @Override
+ public void onComplete(Exchange ex) {
+ s.release();
+ }
- public static class DecrementProcessor implements Processor {
- @Override
- public void process(Exchange exchange) throws Exception {
- semaphore.release();
+ @Override
+ public void onFailure(Exchange ex) {
+ s.release();
+ }
+ });
}
}
diff --git
a/components/camel-spring-parent/camel-spring-xml/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregateCompleteAllOnStopTest.java
b/components/camel-spring-parent/camel-spring-xml/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregateCompleteAllOnStopTest.java
index e9ed58c30cd9..7cd5321e934b 100644
---
a/components/camel-spring-parent/camel-spring-xml/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregateCompleteAllOnStopTest.java
+++
b/components/camel-spring-parent/camel-spring-xml/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregateCompleteAllOnStopTest.java
@@ -16,10 +16,13 @@
*/
package org.apache.camel.spring.processor.aggregator;
+import java.util.concurrent.TimeUnit;
+
import org.apache.camel.CamelContext;
import org.apache.camel.processor.aggregator.AggregateCompleteAllOnStopTest;
import static
org.apache.camel.spring.processor.SpringTestHelper.createSpringCamelContext;
+import static org.awaitility.Awaitility.await;
public class SpringAggregateCompleteAllOnStopTest extends
AggregateCompleteAllOnStopTest {
@@ -29,4 +32,16 @@ public class SpringAggregateCompleteAllOnStopTest extends
AggregateCompleteAllOn
"org/apache/camel/spring/processor/aggregator/SpringAggregateCompleteAllOnStopTest.xml");
}
+ @Override
+ protected void awaitLastMessageInAggregator() throws Exception {
+ // The Spring route uses its own internal MemoryAggregationRepository
— the Java
+ // test's repo field is not wired here. By the time
input.assertIsSatisfied() has
+ // returned (C passed mock:input) and mock:aggregated has received
A+B, the single-
+ // threaded seda consumer is processing C between mock:input and the
aggregator.
+ // stopRoute's graceful shutdown (DefaultShutdownStrategy) tracks
in-flight exchanges
+ // and blocks until C completes, so completeAllOnStop will then flush
C correctly.
+ await().atMost(10, TimeUnit.SECONDS)
+ .until(() ->
getMockEndpoint("mock:aggregated").getReceivedCounter() >= 1);
+ }
+
}
diff --git
a/components/camel-spring-parent/camel-spring-xml/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateCompleteAllOnStopTest.xml
b/components/camel-spring-parent/camel-spring-xml/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateCompleteAllOnStopTest.xml
index 032f2eb67e00..fd28a13932c1 100644
---
a/components/camel-spring-parent/camel-spring-xml/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateCompleteAllOnStopTest.xml
+++
b/components/camel-spring-parent/camel-spring-xml/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateCompleteAllOnStopTest.xml
@@ -26,11 +26,11 @@
<!-- START SNIPPET: e1 -->
<camelContext xmlns="http://camel.apache.org/schema/spring">
- <jmxAgent id="jmx" disabled="true"/>
+ <jmxAgent id="jmx" disabled="true"/>
<route id="foo">
<from uri="seda:start"/>
<to uri="mock:input"/>
- <aggregate aggregationStrategy="aggregatorStrategy"
completionSize="2" completionTimeout="100"
+ <aggregate aggregationStrategy="aggregatorStrategy"
completionSize="2" completionTimeout="5000"
completionTimeoutCheckerInterval="10"
completeAllOnStop="true">
<correlationExpression>
<simple>header.id</simple>
diff --git
a/components/camel-spring-parent/camel-spring-xml/src/test/resources/org/apache/camel/spring/processor/throttler.xml
b/components/camel-spring-parent/camel-spring-xml/src/test/resources/org/apache/camel/spring/processor/throttler.xml
index e882a8ff7e6b..d48593eb5afc 100644
---
a/components/camel-spring-parent/camel-spring-xml/src/test/resources/org/apache/camel/spring/processor/throttler.xml
+++
b/components/camel-spring-parent/camel-spring-xml/src/test/resources/org/apache/camel/spring/processor/throttler.xml
@@ -25,7 +25,6 @@
">
<bean id="runtimeExceptionProcessor"
class="org.apache.camel.spring.processor.SpringThrottlerTest$RuntimeExceptionProcessor"/>
<bean id="incrementProcessor"
class="org.apache.camel.spring.processor.SpringThrottlerTest$IncrementProcessor"/>
- <bean id="decrementProcessor"
class="org.apache.camel.spring.processor.SpringThrottlerTest$DecrementProcessor"/>
<camelContext xmlns="http://camel.apache.org/schema/spring">
<jmxAgent id="jmx" disabled="true"/>
@@ -48,7 +47,7 @@
<delay>
<constant>100</constant>
</delay>
- <process ref="decrementProcessor"/>
+
<to uri="log:result"/>
<to uri="mock:result"/>
</route>
@@ -62,7 +61,7 @@
<delay>
<constant>100</constant>
</delay>
- <process ref="decrementProcessor"/>
+
<to uri="log:result"/>
<to uri="mock:result"/>
</route>
@@ -78,7 +77,7 @@
<delay>
<constant>100</constant>
</delay>
- <process ref="decrementProcessor"/>
+
<to uri="log:result"/>
<to uri="mock:result"/>
</route>
diff --git
a/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeFromOffsetStrategyTest.java
b/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeFromOffsetStrategyTest.java
index 647985a70ee0..977c4414e5a3 100644
---
a/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeFromOffsetStrategyTest.java
+++
b/core/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerResumeFromOffsetStrategyTest.java
@@ -35,12 +35,11 @@ import org.apache.camel.support.resume.Resumables;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.condition.DisabledOnOs;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-@DisabledOnOs(architectures = { "s390x" },
- disabledReason = "This test does not run reliably on s390x (see
CAMEL-21438)")
+import static org.awaitility.Awaitility.await;
+
public class FileConsumerResumeFromOffsetStrategyTest extends
ContextTestSupport {
private static final Logger LOG =
LoggerFactory.getLogger(FileConsumerResumeFromOffsetStrategyTest.class);
@@ -107,7 +106,8 @@ public class FileConsumerResumeFromOffsetStrategyTest
extends ContextTestSupport
template.sendBodyAndHeader(fileUri("resumeMissingOffset"),
"01234567890", Exchange.FILE_NAME, "resume-from-offset.txt");
- MockEndpoint.assertWait(2, TimeUnit.SECONDS, mock);
+ await().atMost(10, TimeUnit.SECONDS)
+ .until(() -> !mock.getExchanges().isEmpty());
List<Exchange> exchangeList = mock.getExchanges();
Assertions.assertFalse(exchangeList.isEmpty(), "It should have
received a few messages");
diff --git
a/core/camel-core/src/test/java/org/apache/camel/component/file/FileNoOpLockFileTest.java
b/core/camel-core/src/test/java/org/apache/camel/component/file/FileNoOpLockFileTest.java
index 2b5fd2e3b96c..a33e338a06f7 100644
---
a/core/camel-core/src/test/java/org/apache/camel/component/file/FileNoOpLockFileTest.java
+++
b/core/camel-core/src/test/java/org/apache/camel/component/file/FileNoOpLockFileTest.java
@@ -25,7 +25,6 @@ import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.condition.DisabledOnOs;
import static org.awaitility.Awaitility.await;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -33,8 +32,6 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
/**
* Unit test to verify that the noop file strategy usage of lock files.
*/
-@DisabledOnOs(architectures = { "s390x" },
- disabledReason = "This test does not run reliably on s390x (see
CAMEL-21438)")
public class FileNoOpLockFileTest extends ContextTestSupport {
@Test
@@ -47,7 +44,7 @@ public class FileNoOpLockFileTest extends ContextTestSupport {
mock.assertIsSatisfied();
// sleep to let file consumer do its unlocking
- await().atMost(1, TimeUnit.SECONDS).until(() -> existsLockFile(false));
+ await().atMost(5, TimeUnit.SECONDS).until(() -> existsLockFile(false));
// should be deleted after processing
checkLockFile(false);
@@ -63,7 +60,7 @@ public class FileNoOpLockFileTest extends ContextTestSupport {
mock.assertIsSatisfied();
// sleep to let file consumer do its unlocking
- await().atMost(1, TimeUnit.SECONDS).until(() -> existsLockFile(false));
+ await().atMost(5, TimeUnit.SECONDS).until(() -> existsLockFile(false));
// no lock files should exists after processing
checkLockFile(false);
diff --git
a/core/camel-core/src/test/java/org/apache/camel/component/xslt/XsltCustomizeEntityResolverTest.java
b/core/camel-core/src/test/java/org/apache/camel/component/xslt/XsltCustomizeEntityResolverTest.java
index 4609cb016bc9..1324358af91e 100644
---
a/core/camel-core/src/test/java/org/apache/camel/component/xslt/XsltCustomizeEntityResolverTest.java
+++
b/core/camel-core/src/test/java/org/apache/camel/component/xslt/XsltCustomizeEntityResolverTest.java
@@ -22,17 +22,13 @@ import java.nio.file.Path;
import org.xml.sax.EntityResolver;
import org.xml.sax.InputSource;
-import org.xml.sax.SAXException;
import org.apache.camel.ContextTestSupport;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.spi.Registry;
import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.condition.DisabledOnOs;
-@DisabledOnOs(architectures = { "s390x" },
- disabledReason = "This test does not run reliably on s390x (see
CAMEL-21438)")
public class XsltCustomizeEntityResolverTest extends ContextTestSupport {
private static final String EXPECTED_XML_CONSTANT = "<A>1</A>";
@@ -63,12 +59,7 @@ public class XsltCustomizeEntityResolverTest extends
ContextTestSupport {
}
private EntityResolver getCustomEntityResolver() {
- return new EntityResolver() {
- @Override
- public InputSource resolveEntity(String publicId, String systemId)
throws SAXException {
- return new InputSource(new StringReader("<!ELEMENT A
(#PCDATA)>"));
- }
- };
+ return (publicId, systemId) -> new InputSource(new
StringReader("<!ELEMENT A (#PCDATA)>"));
}
@Override
diff --git
a/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateCompleteAllOnStopTest.java
b/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateCompleteAllOnStopTest.java
index 36c20d7acb0d..fb70813ba014 100644
---
a/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateCompleteAllOnStopTest.java
+++
b/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateCompleteAllOnStopTest.java
@@ -16,18 +16,21 @@
*/
package org.apache.camel.processor.aggregator;
+import java.util.concurrent.TimeUnit;
+
import org.apache.camel.ContextTestSupport;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.processor.BodyInAggregatingStrategy;
import org.apache.camel.processor.aggregate.MemoryAggregationRepository;
import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.condition.DisabledOnOs;
-@DisabledOnOs(architectures = { "s390x" },
- disabledReason = "This test does not run reliably on s390x (see
CAMEL-21438)")
+import static org.awaitility.Awaitility.await;
+
public class AggregateCompleteAllOnStopTest extends ContextTestSupport {
+ protected final MemoryAggregationRepository repo = new
MemoryAggregationRepository();
+
@Test
public void testCompleteAllOnStop() throws Exception {
MockEndpoint mock = getMockEndpoint("mock:aggregated");
@@ -44,11 +47,21 @@ public class AggregateCompleteAllOnStopTest extends
ContextTestSupport {
input.assertIsSatisfied();
+ // mock:input fires before the aggregator step, so assertIsSatisfied()
can
+ // return while C is still in-flight between mock:input and the
aggregator.
+ // Wait until C is actually stored in the repository before stopping.
+ awaitLastMessageInAggregator();
+
context.getRouteController().stopRoute("foo");
assertMockEndpointsSatisfied();
}
+ protected void awaitLastMessageInAggregator() throws Exception {
+ await().atMost(10, TimeUnit.SECONDS)
+ .until(() -> repo.get(context, "foo") != null);
+ }
+
@Override
protected RouteBuilder createRouteBuilder() {
return new RouteBuilder() {
@@ -57,7 +70,7 @@ public class AggregateCompleteAllOnStopTest extends
ContextTestSupport {
from("seda:start").routeId("foo")
.to("mock:input")
.aggregate(header("id"), new
BodyInAggregatingStrategy())
- .aggregationRepository(new
MemoryAggregationRepository())
+ .aggregationRepository(repo)
.completionSize(2).completionTimeout(5000).completeAllOnStop().completionTimeoutCheckerInterval(10)
.to("mock:aggregated");
}
diff --git
a/core/camel-core/src/test/java/org/apache/camel/processor/throttle/concurrent/ConcurrentRequestsThrottlerTest.java
b/core/camel-core/src/test/java/org/apache/camel/processor/throttle/concurrent/ConcurrentRequestsThrottlerTest.java
index 3cdb94d07778..3a377e5d48ab 100644
---
a/core/camel-core/src/test/java/org/apache/camel/processor/throttle/concurrent/ConcurrentRequestsThrottlerTest.java
+++
b/core/camel-core/src/test/java/org/apache/camel/processor/throttle/concurrent/ConcurrentRequestsThrottlerTest.java
@@ -23,9 +23,11 @@ import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.processor.ThrottlerRejectedExecutionException;
+import org.apache.camel.support.SynchronizationAdapter;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.EnabledOnOs;
@@ -35,10 +37,9 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
// time-bound that does not run well in shared environments
@EnabledOnOs(value = { OS.LINUX, OS.MAC, OS.FREEBSD, OS.OPENBSD },
- architectures = { "amd64", "aarch64", "ppc64le" },
- disabledReason = "This test does not run reliably multiple
platforms (see CAMEL-21438)")
+ architectures = { "amd64", "aarch64", "ppc64le", "s390x" },
+ disabledReason = "This test does not run reliably on all
platforms (see CAMEL-21438)")
public class ConcurrentRequestsThrottlerTest extends ContextTestSupport {
- private static final int INTERVAL = 500;
private static final int MESSAGE_COUNT = 9;
private static final int CONCURRENT_REQUESTS = 2;
protected static Semaphore semaphore;
@@ -140,14 +141,9 @@ public class ConcurrentRequestsThrottlerTest extends
ContextTestSupport {
}
for (int i = 0; i < messageCount; i++) {
- executor.execute(new Runnable() {
- public void run() {
- template.sendBody(endpointUri,
"<message>payload</message>");
- }
- });
+ executor.execute(() -> template.sendBody(endpointUri,
"<message>payload</message>"));
}
- // let's wait for the exchanges to arrive
if (receivingEndpoint != null) {
receivingEndpoint.assertIsSatisfied();
}
@@ -163,15 +159,10 @@ public class ConcurrentRequestsThrottlerTest extends
ContextTestSupport {
semaphore = new Semaphore(throttle);
for (int i = 0; i < messageCount; i++) {
- executor.execute(new Runnable() {
- public void run() {
- template.sendBodyAndHeader("direct:expressionHeader",
"<message>payload</message>", "throttleValue",
- throttle);
- }
- });
+ executor.execute(() -> template.sendBodyAndHeader(
+ "direct:expressionHeader", "<message>payload</message>",
"throttleValue", throttle));
}
- // let's wait for the exchanges to arrive
resultEndpoint.assertIsSatisfied();
}
@@ -199,32 +190,54 @@ public class ConcurrentRequestsThrottlerTest extends
ContextTestSupport {
from("direct:a").throttle(CONCURRENT_REQUESTS).concurrentRequestsMode()
.process(exchange -> {
- assertTrue(semaphore.tryAcquire(), "'direct:a' too
many requests");
+ Semaphore s = semaphore;
+ assertTrue(s.tryAcquire(), "'direct:a' too many
requests");
+
exchange.getExchangeExtension().addOnCompletion(new SynchronizationAdapter() {
+ @Override
+ public void onComplete(Exchange ex) {
+ s.release();
+ }
+
+ @Override
+ public void onFailure(Exchange ex) {
+ s.release();
+ }
+ });
})
.delay(100)
- .process(exchange -> {
- semaphore.release();
- })
.to("log:result", "mock:result");
from("direct:expressionConstant").throttle(constant(CONCURRENT_REQUESTS)).concurrentRequestsMode()
.process(exchange -> {
- assertTrue(semaphore.tryAcquire(),
"'direct:expressionConstant' too many requests");
- })
- .delay(100)
- .process(exchange -> {
- semaphore.release();
- })
+ Semaphore s = semaphore;
+ assertTrue(s.tryAcquire(),
"'direct:expressionConstant' too many requests");
+
exchange.getExchangeExtension().addOnCompletion(new SynchronizationAdapter() {
+ @Override
+
exchange.getExchangeExtension().addOnCompletion(new SynchronizationAdapter() {
+ @Override
+ public void onDone(Exchange ex) {
+ s.release();
+ }
+ });
.to("log:result", "mock:result");
from("direct:expressionHeader").throttle(header("throttleValue")).concurrentRequestsMode()
.process(exchange -> {
- assertTrue(semaphore.tryAcquire(),
"'direct:expressionHeader' too many requests");
+ Semaphore s = semaphore;
+ assertTrue(s.tryAcquire(),
"'direct:expressionHeader' too many requests");
+
exchange.getExchangeExtension().addOnCompletion(new SynchronizationAdapter() {
+ @Override
+ public void onComplete(Exchange ex) {
+ s.release();
+ }
+
+ @Override
+ public void onFailure(Exchange ex) {
+ s.release();
+ }
+ });
})
.delay(100)
- .process(exchange -> {
- semaphore.release();
- })
.to("log:result", "mock:result");
from("direct:start").throttle(2).concurrentRequestsMode().rejectExecution(true).delay(1000).to("log:result",
diff --git
a/core/camel-core/src/test/java/org/apache/camel/processor/throttle/requests/ThrottlerTest.java
b/core/camel-core/src/test/java/org/apache/camel/processor/throttle/requests/ThrottlerTest.java
index 2a699cb4ba35..7a3489356fec 100644
---
a/core/camel-core/src/test/java/org/apache/camel/processor/throttle/requests/ThrottlerTest.java
+++
b/core/camel-core/src/test/java/org/apache/camel/processor/throttle/requests/ThrottlerTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.camel.processor.throttle.requests;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@@ -29,12 +30,13 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.EnabledOnOs;
import org.junit.jupiter.api.condition.OS;
+import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
// time-bound that does not run well in shared environments
@EnabledOnOs(value = { OS.LINUX, OS.MAC, OS.FREEBSD, OS.OPENBSD },
- architectures = { "amd64", "aarch64", "ppc64le" },
- disabledReason = "This test does not run reliably multiple
platforms (see CAMEL-21438)")
+ architectures = { "amd64", "aarch64", "ppc64le", "s390x" },
+ disabledReason = "This test does not run reliably on all
platforms (see CAMEL-21438)")
public class ThrottlerTest extends ContextTestSupport {
private static final int INTERVAL = 500;
private static final int TOLERANCE = 50;
@@ -44,15 +46,17 @@ public class ThrottlerTest extends ContextTestSupport {
public void testSendLotsOfMessagesButOnly3GetThroughWithin2Seconds()
throws Exception {
MockEndpoint resultEndpoint = resolveMandatoryEndpoint("mock:result",
MockEndpoint.class);
resultEndpoint.expectedMessageCount(3);
- resultEndpoint.setResultWaitTime(2000);
+ // Generous timeout so slow machines still deliver the first 3
messages.
+ // exactMessageCount(3) still catches throttle violations if more
arrive.
+ resultEndpoint.setResultWaitTime(10_000);
for (int i = 0; i < MESSAGE_COUNT; i++) {
template.sendBody("seda:a", "<message>" + i + "</message>");
}
- // lets pause to give the requests time to be processed
- // to check that the throttle really does kick in
resultEndpoint.assertIsSatisfied();
+ // Messages 4-9 must still be queued: the throttle window has not
elapsed.
+ assertEquals(3, resultEndpoint.getReceivedCounter());
}
@Test
@@ -67,8 +71,6 @@ public class ThrottlerTest extends ContextTestSupport {
template.sendBody("direct:start", "<message>" + i + "</message>");
}
- // lets pause to give the requests time to be processed
- // to check that the throttle really does kick in
assertMockEndpointsSatisfied();
}
@@ -129,17 +131,14 @@ public class ThrottlerTest extends ContextTestSupport {
private void assertThrottlerTiming(
final long elapsedTimeMs, final int throttle, final int
intervalMs, final int messageCount) {
- // now assert that they have actually been throttled (use +/- 50 as
- // slack)
- long minimum = calculateMinimum(intervalMs, throttle, messageCount) -
50;
- long maximum = calculateMaximum(intervalMs, throttle, messageCount) +
50;
- // add 3000 in case running on slow CI boxes
- maximum += 3000;
- log.info("Sent {} exchanges in {}ms, with throttle rate of {} per
{}ms. Calculated min {}ms and max {}ms", messageCount,
- elapsedTimeMs, throttle, intervalMs, minimum,
- maximum);
-
- assertTrue(elapsedTimeMs >= minimum, "Should take at least " + minimum
+ "ms, was: " + elapsedTimeMs);
+ // Assert only the upper bound: messages must not arrive faster than
the throttle allows.
+ // The minimum bound (system not too fast) does not test throttle
correctness and is
+ // dropped to avoid false failures on fast machines.
+ // Add 3000ms slack for slow CI boxes.
+ long maximum = calculateMaximum(intervalMs, throttle, messageCount) +
50 + 3000;
+ log.info("Sent {} exchanges in {}ms, with throttle rate of {} per
{}ms. Calculated max {}ms", messageCount,
+ elapsedTimeMs, throttle, intervalMs, maximum);
+
assertTrue(elapsedTimeMs <= maximum + TOLERANCE, "Should take at most
" + maximum + "ms, was: " + elapsedTimeMs);
}
@@ -154,14 +153,9 @@ public class ThrottlerTest extends ContextTestSupport {
long start = System.nanoTime();
for (int i = 0; i < messageCount; i++) {
- executor.execute(new Runnable() {
- public void run() {
- template.sendBody(endpointUri,
"<message>payload</message>");
- }
- });
+ executor.execute(() -> template.sendBody(endpointUri,
"<message>payload</message>"));
}
- // let's wait for the exchanges to arrive
if (receivingEndpoint != null) {
receivingEndpoint.assertIsSatisfied();
}
@@ -178,30 +172,24 @@ public class ThrottlerTest extends ContextTestSupport {
throws InterruptedException {
resultEndpoint.expectedMessageCount(messageCount);
- long start = System.nanoTime();
+ // Start the clock when the first thread actually begins executing,
not when tasks
+ // are submitted, to avoid inflating elapsed with thread pool
scheduling overhead.
+ CountDownLatch firstStarted = new CountDownLatch(1);
for (int i = 0; i < messageCount; i++) {
- executor.execute(new Runnable() {
- public void run() {
- template.sendBodyAndHeader("direct:expressionHeader",
"<message>payload</message>", "throttleValue",
- throttle);
- }
+ executor.execute(() -> {
+ firstStarted.countDown();
+ template.sendBodyAndHeader("direct:expressionHeader",
"<message>payload</message>", "throttleValue",
+ throttle);
});
}
- // let's wait for the exchanges to arrive
+ assertTrue(firstStarted.await(10, TimeUnit.SECONDS), "Timed out
waiting for first thread to start");
+ long start = System.nanoTime();
resultEndpoint.assertIsSatisfied();
long elapsed = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() -
start);
assertThrottlerTiming(elapsed, throttle, intervalMs, messageCount);
}
- private long calculateMinimum(final long periodMs, final long
throttleRate, final long messageCount) {
- if (messageCount % throttleRate > 0) {
- return (long) Math.floor((double) messageCount / (double)
throttleRate) * periodMs;
- } else {
- return (long) (Math.floor((double) messageCount / (double)
throttleRate) * periodMs) - periodMs;
- }
- }
-
private long calculateMaximum(final long periodMs, final long
throttleRate, final long messageCount) {
return ((long) Math.ceil((double) messageCount / (double)
throttleRate)) * periodMs;
}
diff --git
a/core/camel-main/src/test/java/org/apache/camel/main/MainListenerTest.java
b/core/camel-main/src/test/java/org/apache/camel/main/MainListenerTest.java
index 138688f5d1ea..a03602644ea3 100644
--- a/core/camel-main/src/test/java/org/apache/camel/main/MainListenerTest.java
+++ b/core/camel-main/src/test/java/org/apache/camel/main/MainListenerTest.java
@@ -20,29 +20,36 @@ import java.lang.reflect.Proxy;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.Test;
import static org.apache.camel.util.CollectionHelper.propertiesOf;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
public class MainListenerTest {
@Test
public void testEventOrder() throws Exception {
List<String> events = new ArrayList<>();
+ CountDownLatch started = new CountDownLatch(1);
Main main = new Main();
main.addMainListener((MainListener) Proxy.newProxyInstance(
MainListener.class.getClassLoader(),
new Class[] { MainListener.class },
(proxy, method, args) -> {
events.add(method.getName());
+ if ("beforeInitialize".equals(method.getName())) {
+ started.countDown();
+ }
return null;
}));
Thread thread = new Thread(() -> assertDoesNotThrow(() -> main.run()));
thread.start();
- Thread.sleep(100);
+ assertTrue(started.await(10, TimeUnit.SECONDS), "Main did not
initialize within 10 seconds");
main.completed();
thread.join();
assertEquals(Arrays.asList("beforeInitialize", "beforeConfigure",
"afterConfigure",
diff --git
a/core/camel-management/src/test/java/org/apache/camel/management/ManagedThrottlingExceptionRoutePolicyTest.java
b/core/camel-management/src/test/java/org/apache/camel/management/ManagedThrottlingExceptionRoutePolicyTest.java
index c666351731a2..0d9e5c2335f5 100644
---
a/core/camel-management/src/test/java/org/apache/camel/management/ManagedThrottlingExceptionRoutePolicyTest.java
+++
b/core/camel-management/src/test/java/org/apache/camel/management/ManagedThrottlingExceptionRoutePolicyTest.java
@@ -19,6 +19,7 @@ package org.apache.camel.management;
import java.io.IOException;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.TimeUnit;
import javax.management.JMX;
import javax.management.MBeanServer;
@@ -35,6 +36,7 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.DisabledOnOs;
import org.junit.jupiter.api.condition.OS;
+import static org.awaitility.Awaitility.await;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -139,7 +141,7 @@ public class ManagedThrottlingExceptionRoutePolicyTest
extends ManagementTestSup
val = proxy.getCurrentFailures();
assertEquals(1, val.intValue());
- Thread.sleep(200);
+ await().atMost(10, TimeUnit.SECONDS).until(() ->
proxy.getLastFailure() > 0);
// the route has 1 failure X mills ago
lastFail = proxy.getLastFailure();
diff --git
a/core/camel-util/src/test/java/org/apache/camel/util/concurrent/AsyncCompletionServiceTest.java
b/core/camel-util/src/test/java/org/apache/camel/util/concurrent/AsyncCompletionServiceTest.java
index 63aefa837e06..c02ed191d603 100644
---
a/core/camel-util/src/test/java/org/apache/camel/util/concurrent/AsyncCompletionServiceTest.java
+++
b/core/camel-util/src/test/java/org/apache/camel/util/concurrent/AsyncCompletionServiceTest.java
@@ -68,8 +68,6 @@ public class AsyncCompletionServiceTest {
service.submit(result("A", 200));
service.submit(result("B"));
- Thread.sleep(300);
-
Object a = service.take();
Object b = service.take();