This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
commit 913c1cd5898d579604cc87c88edd27444178607f Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Wed Dec 14 18:35:41 2022 +0100 CAMEL-18739: camel-core - AggregationStrategy onCompletion to have access to input exchange. --- .../src/main/java/org/apache/camel/AggregationStrategy.java | 13 +++++++++++++ .../java/org/apache/camel/processor/MulticastProcessor.java | 2 +- .../aggregate/ShareUnitOfWorkAggregationStrategy.java | 5 +++++ 3 files changed, 19 insertions(+), 1 deletion(-) diff --git a/core/camel-api/src/main/java/org/apache/camel/AggregationStrategy.java b/core/camel-api/src/main/java/org/apache/camel/AggregationStrategy.java index 79b454f85d8..22a4cea8479 100644 --- a/core/camel-api/src/main/java/org/apache/camel/AggregationStrategy.java +++ b/core/camel-api/src/main/java/org/apache/camel/AggregationStrategy.java @@ -120,6 +120,19 @@ public interface AggregationStrategy { default void onCompletion(Exchange exchange) { } + /** + * The aggregated {@link Exchange} has completed + * + * <b>Important: </b> This method must <b>not</b> throw any exceptions. + * + * @param exchange the current aggregated exchange, or the original {@link org.apache.camel.Exchange} if no + * aggregation has been done before the completion occurred + * @param inputExchange the input exchange (input to the EIP) + */ + default void onCompletion(Exchange exchange, Exchange inputExchange) { + onCompletion(exchange); + } + /** * A timeout occurred. * <p/> diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java index 2cc448af400..380829ff369 100644 --- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java +++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java @@ -776,7 +776,7 @@ public class MulticastProcessor extends AsyncProcessorSupport AggregationStrategy strategy = getAggregationStrategy(subExchange); // invoke the on completion callback if (strategy != null) { - strategy.onCompletion(subExchange); + strategy.onCompletion(subExchange, original); } // cleanup any per exchange aggregation strategy diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/aggregate/ShareUnitOfWorkAggregationStrategy.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/aggregate/ShareUnitOfWorkAggregationStrategy.java index 9cd293059f5..06e97ad4983 100644 --- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/aggregate/ShareUnitOfWorkAggregationStrategy.java +++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/aggregate/ShareUnitOfWorkAggregationStrategy.java @@ -72,6 +72,11 @@ public final class ShareUnitOfWorkAggregationStrategy extends ServiceSupport imp strategy.onCompletion(exchange); } + @Override + public void onCompletion(Exchange exchange, Exchange inputExchange) { + strategy.onCompletion(exchange, inputExchange); + } + @Override public void timeout(Exchange exchange, int index, int total, long timeout) { strategy.timeout(exchange, index, total, timeout);