This is an automated email from the ASF dual-hosted git repository. orpiske pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push: new 3fde21858a9 (chores) camel-core: optimize copying exchanges (#11232) 3fde21858a9 is described below commit 3fde21858a9d6db1b4ad75842f2105b972b8adaa Author: Otavio Rodolfo Piske <orpi...@users.noreply.github.com> AuthorDate: Tue Aug 29 17:51:26 2023 +0200 (chores) camel-core: optimize copying exchanges (#11232) --- .../component/disruptor/DisruptorConsumer.java | 3 +- .../java/org/apache/camel/ExchangeExtension.java | 8 ++++++ .../org/apache/camel/support/AbstractExchange.java | 15 +++++++++- .../org/apache/camel/support/DefaultExchange.java | 10 ++++++- .../org/apache/camel/support/ExchangeHelper.java | 33 ++-------------------- .../camel/support/ExtendedExchangeExtension.java | 20 +++++++++++++ 6 files changed, 54 insertions(+), 35 deletions(-) diff --git a/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorConsumer.java b/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorConsumer.java index 99c79f1646f..1a8b1128b86 100644 --- a/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorConsumer.java +++ b/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorConsumer.java @@ -135,8 +135,7 @@ public class DisruptorConsumer extends ServiceSupport implements Consumer, Suspe private Exchange prepareExchange(final Exchange exchange) { // send a new copied exchange with new camel context // don't copy handovers as they are handled by the Disruptor Event Handlers - final Exchange newExchange = ExchangeHelper - .copyExchangeAndSetCamelContext(exchange, endpoint.getCamelContext(), false); + final Exchange newExchange = ExchangeHelper.copyExchangeWithProperties(exchange, endpoint.getCamelContext()); // set the from endpoint newExchange.getExchangeExtension().setFromEndpoint(endpoint); return newExchange; diff --git a/core/camel-api/src/main/java/org/apache/camel/ExchangeExtension.java b/core/camel-api/src/main/java/org/apache/camel/ExchangeExtension.java index baeaa44aa2f..7ea4a144193 100644 --- a/core/camel-api/src/main/java/org/apache/camel/ExchangeExtension.java +++ b/core/camel-api/src/main/java/org/apache/camel/ExchangeExtension.java @@ -264,4 +264,12 @@ public interface ExchangeExtension { * @param failureHandled true if failure handled or false otherwise */ void setFailureHandled(boolean failureHandled); + + /** + * Create a new exchange copied from this, with the context set to the given context + * + * @param context the context associated with the new exchange + * @return A new Exchange instance + */ + Exchange createCopyWithProperties(CamelContext context); } diff --git a/core/camel-support/src/main/java/org/apache/camel/support/AbstractExchange.java b/core/camel-support/src/main/java/org/apache/camel/support/AbstractExchange.java index 48bffb2ae04..709497805b9 100644 --- a/core/camel-support/src/main/java/org/apache/camel/support/AbstractExchange.java +++ b/core/camel-support/src/main/java/org/apache/camel/support/AbstractExchange.java @@ -52,7 +52,7 @@ import static org.apache.camel.support.MessageHelper.copyBody; * @see DefaultExchange */ class AbstractExchange implements Exchange { - protected final EnumMap<ExchangePropertyKey, Object> internalProperties = new EnumMap<>(ExchangePropertyKey.class); + protected final EnumMap<ExchangePropertyKey, Object> internalProperties; protected final CamelContext context; protected Map<String, Object> properties; // create properties on-demand as we use internal properties mostly @@ -69,6 +69,14 @@ class AbstractExchange implements Exchange { private final ExtendedExchangeExtension privateExtension; private RedeliveryTraitPayload externalRedelivered = RedeliveryTraitPayload.UNDEFINED_REDELIVERY; + AbstractExchange(CamelContext context, EnumMap<ExchangePropertyKey, Object> internalProperties, + Map<String, Object> properties) { + this.context = context; + this.internalProperties = new EnumMap<>(internalProperties); + this.privateExtension = new ExtendedExchangeExtension(this); + this.properties = properties; + } + public AbstractExchange(CamelContext context) { this(context, ExchangePattern.InOnly); } @@ -78,6 +86,7 @@ class AbstractExchange implements Exchange { this.pattern = pattern; this.created = System.currentTimeMillis(); + internalProperties = new EnumMap<>(ExchangePropertyKey.class); privateExtension = new ExtendedExchangeExtension(this); } @@ -86,6 +95,8 @@ class AbstractExchange implements Exchange { this.pattern = parent.getPattern(); this.created = parent.getCreated(); + internalProperties = new EnumMap<>(ExchangePropertyKey.class); + privateExtension = new ExtendedExchangeExtension(this); privateExtension.setFromEndpoint(parent.getFromEndpoint()); privateExtension.setFromRouteId(parent.getFromRouteId()); @@ -97,6 +108,7 @@ class AbstractExchange implements Exchange { this.pattern = fromEndpoint.getExchangePattern(); this.created = System.currentTimeMillis(); + internalProperties = new EnumMap<>(ExchangePropertyKey.class); privateExtension = new ExtendedExchangeExtension(this); privateExtension.setFromEndpoint(fromEndpoint); } @@ -106,6 +118,7 @@ class AbstractExchange implements Exchange { this.pattern = pattern; this.created = System.currentTimeMillis(); + internalProperties = new EnumMap<>(ExchangePropertyKey.class); privateExtension = new ExtendedExchangeExtension(this); privateExtension.setFromEndpoint(fromEndpoint); } diff --git a/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java b/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java index 198484a5523..a3a58e9c1c6 100644 --- a/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java +++ b/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java @@ -16,16 +16,25 @@ */ package org.apache.camel.support; +import java.util.EnumMap; +import java.util.Map; + import org.apache.camel.CamelContext; import org.apache.camel.Endpoint; import org.apache.camel.Exchange; import org.apache.camel.ExchangePattern; +import org.apache.camel.ExchangePropertyKey; /** * The default and only implementation of {@link Exchange}. */ public final class DefaultExchange extends AbstractExchange { + DefaultExchange(CamelContext context, EnumMap<ExchangePropertyKey, Object> internalProperties, + Map<String, Object> properties) { + super(context, internalProperties, properties); + } + public DefaultExchange(CamelContext context) { super(context); } @@ -45,5 +54,4 @@ public final class DefaultExchange extends AbstractExchange { public DefaultExchange(Endpoint fromEndpoint, ExchangePattern pattern) { super(fromEndpoint, pattern); } - } diff --git a/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java b/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java index 1ab46faffed..32cd22eec71 100644 --- a/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java +++ b/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java @@ -24,7 +24,6 @@ import java.nio.charset.Charset; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; @@ -783,17 +782,6 @@ public final class ExchangeHelper { return "(MessageId: " + msgId + " on ExchangeId: " + exchange.getExchangeId() + ")"; } - /** - * Copies the exchange but the copy will be tied to the given context - * - * @param exchange the source exchange - * @param context the camel context - * @return a copy with the given camel context - */ - public static Exchange copyExchangeAndSetCamelContext(Exchange exchange, CamelContext context) { - return copyExchangeAndSetCamelContext(exchange, context, true); - } - /* * Safe copy message history using a defensive copy */ @@ -809,23 +797,13 @@ public final class ExchangeHelper { * Copies the exchange but the copy will be tied to the given context * * @param exchange the source exchange - * @param context the camel context - * @param handover whether to handover on completions from the source to the copy * @return a copy with the given camel context */ - public static Exchange copyExchangeAndSetCamelContext(Exchange exchange, CamelContext context, boolean handover) { - DefaultExchange answer = new DefaultExchange(context, exchange.getPattern()); - if (exchange.hasProperties()) { - answer.getExchangeExtension().setProperties(safeCopyProperties(exchange.getProperties())); - } - exchange.getExchangeExtension().copyInternalProperties(answer); + public static Exchange copyExchangeWithProperties(Exchange exchange, CamelContext context) { + Exchange answer = exchange.getExchangeExtension().createCopyWithProperties(context); setMessageHistory(answer, exchange); - if (handover) { - // Need to hand over the completion for async invocation - exchange.getExchangeExtension().handoverCompletions(answer); - } answer.setIn(exchange.getIn().copy()); if (exchange.hasOut()) { answer.setOut(exchange.getOut().copy()); @@ -892,13 +870,6 @@ public final class ExchangeHelper { return StringHelper.before(uri, ":"); } - private static Map<String, Object> safeCopyProperties(Map<String, Object> properties) { - if (properties == null) { - return null; - } - return new ConcurrentHashMap<>(properties); - } - /** * @see #getCharsetName(Exchange, boolean) */ diff --git a/core/camel-support/src/main/java/org/apache/camel/support/ExtendedExchangeExtension.java b/core/camel-support/src/main/java/org/apache/camel/support/ExtendedExchangeExtension.java index 2ce20cdea33..3f60ac6ca0d 100644 --- a/core/camel-support/src/main/java/org/apache/camel/support/ExtendedExchangeExtension.java +++ b/core/camel-support/src/main/java/org/apache/camel/support/ExtendedExchangeExtension.java @@ -20,8 +20,10 @@ package org.apache.camel.support; import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import org.apache.camel.AsyncCallback; +import org.apache.camel.CamelContext; import org.apache.camel.Endpoint; import org.apache.camel.Exchange; import org.apache.camel.ExchangeExtension; @@ -328,4 +330,22 @@ public class ExtendedExchangeExtension implements ExchangeExtension { setErrorHandlerHandled(null); setStreamCacheDisabled(false); } + + private static Map<String, Object> safeCopyProperties(Map<String, Object> properties) { + if (properties == null) { + return null; + } + return new ConcurrentHashMap<>(properties); + } + + @Override + public Exchange createCopyWithProperties(CamelContext context) { + final Map<String, Object> properties = safeCopyProperties(exchange.properties); + + DefaultExchange answer = new DefaultExchange(context, exchange.internalProperties, properties); + + answer.setPattern(exchange.pattern); + + return answer; + } }