[
https://issues.apache.org/jira/browse/CAMEL-23513?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Claus Ibsen resolved CAMEL-23513.
---------------------------------
Resolution: Fixed
> completeAllOnStop() does not complete aggregations when used with
> completionInterval()
> --------------------------------------------------------------------------------------
>
> Key: CAMEL-23513
> URL: https://issues.apache.org/jira/browse/CAMEL-23513
> Project: Camel
> Issue Type: Bug
> Components: camel-core
> Reporter: Guillaume Nodet
> Assignee: Guillaume Nodet
> Priority: Minor
> Fix For: 4.14.8, 4.18.3, 4.21.0
>
>
> h2. Problem
> When an aggregator is configured with both {{completeAllOnStop()}} and
> {{completionInterval()}}, stopping the CamelContext does not reliably trigger
> completion of in-flight aggregations. The
> {{AggregateCompleteAllOnStopWithIntervalTest}} test demonstrates this: it
> sends 3 messages (below the {{completionSize(10)}} threshold), then calls
> {{context.stop()}}. The aggregation should be force-completed during
> shutdown, but the mock endpoint at {{mock:aggregated}} never receives the
> completed exchange.
> h2. Steps to Reproduce
> Run {{AggregateCompleteAllOnStopWithIntervalTest}} in a loop:
> {code}
> for i in $(seq 1 100); do
> mvn test -B -pl core/camel-core \\
> -Dtest=AggregateCompleteAllOnStopWithIntervalTest \\
> -Dsurefire.rerunFailingTestsCount=0
> done
> {code}
> The test fails intermittently — the Awaitility timeout expires because the
> mock never receives the aggregated message.
> h2. Analysis
> The issue appears to be in {{AggregateProcessor.doStop()}} or the shutdown
> sequence: when {{completionInterval}} is set, the interval-based scheduled
> task is cancelled during shutdown, but {{completeAllOnStop}} may not
> force-complete the remaining groups before the task is cancelled.
> With {{completionTimeout()}} instead of {{completionInterval()}},
> {{completeAllOnStop()}} works correctly because the timeout map is flushed
> during shutdown.
> h2. Route Configuration
> {code:java}
> from("direct:start")
> .to("mock:input")
> .aggregate(new GroupedBodyAggregationStrategy())
> .simple("${in.header.aggregateKey}")
> .completionSize(10)
> .completionInterval(1000)
> .completeAllOnStop()
> .to("mock:aggregated");
> {code}
> _Claude Code on behalf of Guillaume Nodet_
--
This message was sent by Atlassian Jira
(v8.20.10#820010)