This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 913c1cd5898d579604cc87c88edd27444178607f
Author: Claus Ibsen <claus.ib...@gmail.com>
AuthorDate: Wed Dec 14 18:35:41 2022 +0100

    CAMEL-18739: camel-core - AggregationStrategy onCompletion to have access 
to input exchange.
---
 .../src/main/java/org/apache/camel/AggregationStrategy.java | 13 +++++++++++++
 .../java/org/apache/camel/processor/MulticastProcessor.java |  2 +-
 .../aggregate/ShareUnitOfWorkAggregationStrategy.java       |  5 +++++
 3 files changed, 19 insertions(+), 1 deletion(-)

diff --git 
a/core/camel-api/src/main/java/org/apache/camel/AggregationStrategy.java 
b/core/camel-api/src/main/java/org/apache/camel/AggregationStrategy.java
index 79b454f85d8..22a4cea8479 100644
--- a/core/camel-api/src/main/java/org/apache/camel/AggregationStrategy.java
+++ b/core/camel-api/src/main/java/org/apache/camel/AggregationStrategy.java
@@ -120,6 +120,19 @@ public interface AggregationStrategy {
     default void onCompletion(Exchange exchange) {
     }
 
+    /**
+     * The aggregated {@link Exchange} has completed
+     *
+     * <b>Important: </b> This method must <b>not</b> throw any exceptions.
+     *
+     * @param exchange the current aggregated exchange, or the original {@link 
org.apache.camel.Exchange} if no
+     *                 aggregation has been done before the completion occurred
+     * @param  inputExchange the input exchange (input to the EIP)
+     */
+    default void onCompletion(Exchange exchange, Exchange inputExchange) {
+        onCompletion(exchange);
+    }
+
     /**
      * A timeout occurred.
      * <p/>
diff --git 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
index 2cc448af400..380829ff369 100644
--- 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
+++ 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
@@ -776,7 +776,7 @@ public class MulticastProcessor extends 
AsyncProcessorSupport
         AggregationStrategy strategy = getAggregationStrategy(subExchange);
         // invoke the on completion callback
         if (strategy != null) {
-            strategy.onCompletion(subExchange);
+            strategy.onCompletion(subExchange, original);
         }
 
         // cleanup any per exchange aggregation strategy
diff --git 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/aggregate/ShareUnitOfWorkAggregationStrategy.java
 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/aggregate/ShareUnitOfWorkAggregationStrategy.java
index 9cd293059f5..06e97ad4983 100644
--- 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/aggregate/ShareUnitOfWorkAggregationStrategy.java
+++ 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/aggregate/ShareUnitOfWorkAggregationStrategy.java
@@ -72,6 +72,11 @@ public final class ShareUnitOfWorkAggregationStrategy 
extends ServiceSupport imp
         strategy.onCompletion(exchange);
     }
 
+    @Override
+    public void onCompletion(Exchange exchange, Exchange inputExchange) {
+        strategy.onCompletion(exchange, inputExchange);
+    }
+
     @Override
     public void timeout(Exchange exchange, int index, int total, long timeout) 
{
         strategy.timeout(exchange, index, total, timeout);

Reply via email to