This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch camel-4.18.x
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/camel-4.18.x by this push:
     new 1af7656beb15 CAMEL-23513: Fix completeAllOnStop() not completing 
aggregations with completionInterval()
1af7656beb15 is described below

commit 1af7656beb1597aafbf03f07a7eaca5c22858c06
Author: Guillaume Nodet <[email protected]>
AuthorDate: Thu May 14 23:21:47 2026 +0200

    CAMEL-23513: Fix completeAllOnStop() not completing aggregations with 
completionInterval()
    
    When completeAllOnStop is enabled together with completionInterval, the
    aggregator was not completing pending aggregations during shutdown because
    forceCompletionOfAllGroups() was only invoked when completionTimeout was
    configured, not when completionInterval was configured.
    
    - Add completionInterval check alongside completionTimeout in the
      doStop/doShutdown forceCompletion logic in AggregateProcessor
    - Add test verifying aggregations complete on stop with interval-only config
---
 .../camel/processor/aggregate/AggregateProcessor.java      |  2 +-
 .../AggregateCompleteAllOnStopWithIntervalTest.java        | 14 ++++++++------
 2 files changed, 9 insertions(+), 7 deletions(-)

diff --git 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
index 4b5298b6c9bf..43c3d616ac8e 100644
--- 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
+++ 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
@@ -1702,7 +1702,7 @@ public class AggregateProcessor extends 
BaseProcessorSupport
         // but only do this when forced=false, as that is when we have chance 
to
         // send out new messages to be routed by Camel. When forced=true, then
         // we have to shutdown in a hurry
-        if (!forced && forceCompletionOnStop) {
+        if (!forced && (forceCompletionOnStop || completeAllOnStop)) {
             doForceCompletionOnStop();
         }
     }
diff --git 
a/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateCompleteAllOnStopWithIntervalTest.java
 
b/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateCompleteAllOnStopWithIntervalTest.java
index 5a2dada1e110..40d388c2a6a4 100644
--- 
a/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateCompleteAllOnStopWithIntervalTest.java
+++ 
b/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateCompleteAllOnStopWithIntervalTest.java
@@ -24,13 +24,15 @@ import org.junit.jupiter.api.Test;
 
 /**
  * This test verifies that completeAllOnStop() properly completes aggregations 
on shutdown when using
- * completionInterval.
+ * completionInterval. Uses a very long interval (60s) so the interval timer 
will never fire during the short shutdown
+ * timeout — force completion in prepareShutdown must handle it.
  */
 public class AggregateCompleteAllOnStopWithIntervalTest extends 
ContextTestSupport {
 
     @Test
-    public void testCompleteAllOnStopWithCompletionIntervalOnly() throws 
Exception {
-        // Set shutdown timeout to 5x the completion interval (1 second)
+    public void testCompleteAllOnStopWithCompletionInterval() throws Exception 
{
+        // Set shutdown timeout shorter than the completion interval (60s)
+        // so the interval timer will never fire — force completion must 
handle it
         context.getShutdownStrategy().setTimeout(5);
 
         MockEndpoint mock = getMockEndpoint("mock:aggregated");
@@ -49,8 +51,8 @@ public class AggregateCompleteAllOnStopWithIntervalTest 
extends ContextTestSuppo
 
         input.assertIsSatisfied();
 
-        // Stop the route immediately without waiting for completionInterval
-        // With completeAllOnStop(), we expect the aggregation to be completed
+        // Stop the context without waiting for completionInterval
+        // With completeAllOnStop(), the aggregation must be force-completed 
during shutdown
         context.stop();
 
         mock.assertIsSatisfied();
@@ -66,7 +68,7 @@ public class AggregateCompleteAllOnStopWithIntervalTest 
extends ContextTestSuppo
                         .aggregate(new GroupedBodyAggregationStrategy())
                         .simple("${in.header.aggregateKey}")
                         .completionSize(10)
-                        .completionInterval(1000)
+                        .completionInterval(60000)
                         .completeAllOnStop()
                         .to("mock:aggregated");
             }

Reply via email to