This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch camel-4.14.x
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-4.14.x by this push:
new 0d395871157a CAMEL-23513: Fix completeAllOnStop() not completing
aggregations with completionInterval()
0d395871157a is described below
commit 0d395871157afd318d3543d0f6b25e50583815af
Author: Guillaume Nodet <[email protected]>
AuthorDate: Thu May 14 23:21:47 2026 +0200
CAMEL-23513: Fix completeAllOnStop() not completing aggregations with
completionInterval()
When completeAllOnStop is enabled together with completionInterval, the
aggregator was not completing pending aggregations during shutdown because
forceCompletionOfAllGroups() was only invoked when completionTimeout was
configured, not when completionInterval was configured.
- Add completionInterval check alongside completionTimeout in the
doStop/doShutdown forceCompletion logic in AggregateProcessor
- Add test verifying aggregations complete on stop with interval-only config
---
.../camel/processor/aggregate/AggregateProcessor.java | 2 +-
.../AggregateCompleteAllOnStopWithIntervalTest.java | 14 ++++++++------
2 files changed, 9 insertions(+), 7 deletions(-)
diff --git
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
index 7ca27a828881..b82d52951eca 100644
---
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
+++
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
@@ -1688,7 +1688,7 @@ public class AggregateProcessor extends
BaseProcessorSupport
// but only do this when forced=false, as that is when we have chance
to
// send out new messages to be routed by Camel. When forced=true, then
// we have to shutdown in a hurry
- if (!forced && forceCompletionOnStop) {
+ if (!forced && (forceCompletionOnStop || completeAllOnStop)) {
doForceCompletionOnStop();
}
}
diff --git
a/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateCompleteAllOnStopWithIntervalTest.java
b/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateCompleteAllOnStopWithIntervalTest.java
index 5a2dada1e110..40d388c2a6a4 100644
---
a/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateCompleteAllOnStopWithIntervalTest.java
+++
b/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateCompleteAllOnStopWithIntervalTest.java
@@ -24,13 +24,15 @@ import org.junit.jupiter.api.Test;
/**
* This test verifies that completeAllOnStop() properly completes aggregations
on shutdown when using
- * completionInterval.
+ * completionInterval. Uses a very long interval (60s) so the interval timer
will never fire during the short shutdown
+ * timeout — force completion in prepareShutdown must handle it.
*/
public class AggregateCompleteAllOnStopWithIntervalTest extends
ContextTestSupport {
@Test
- public void testCompleteAllOnStopWithCompletionIntervalOnly() throws
Exception {
- // Set shutdown timeout to 5x the completion interval (1 second)
+ public void testCompleteAllOnStopWithCompletionInterval() throws Exception
{
+ // Set shutdown timeout shorter than the completion interval (60s)
+ // so the interval timer will never fire — force completion must
handle it
context.getShutdownStrategy().setTimeout(5);
MockEndpoint mock = getMockEndpoint("mock:aggregated");
@@ -49,8 +51,8 @@ public class AggregateCompleteAllOnStopWithIntervalTest
extends ContextTestSuppo
input.assertIsSatisfied();
- // Stop the route immediately without waiting for completionInterval
- // With completeAllOnStop(), we expect the aggregation to be completed
+ // Stop the context without waiting for completionInterval
+ // With completeAllOnStop(), the aggregation must be force-completed
during shutdown
context.stop();
mock.assertIsSatisfied();
@@ -66,7 +68,7 @@ public class AggregateCompleteAllOnStopWithIntervalTest
extends ContextTestSuppo
.aggregate(new GroupedBodyAggregationStrategy())
.simple("${in.header.aggregateKey}")
.completionSize(10)
- .completionInterval(1000)
+ .completionInterval(60000)
.completeAllOnStop()
.to("mock:aggregated");
}