This is an automated email from the ASF dual-hosted git repository.
gnodet pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new c0bb719a13d1 CAMEL-23686: Inline ExtendedExchangeExtension into
AbstractExchange
c0bb719a13d1 is described below
commit c0bb719a13d163382a5b7a559d55882643285cfa
Author: Guillaume Nodet <[email protected]>
AuthorDate: Fri Jun 5 18:17:04 2026 +0200
CAMEL-23686: Inline ExtendedExchangeExtension into AbstractExchange
Move all 23 fields from the separate ExtendedExchangeExtension object
directly into AbstractExchange. AbstractExchange now implements
ExchangeExtension and getExchangeExtension() returns this.
This eliminates one 80-byte object allocation per exchange. In a
pipeline benchmark with ~926K in-flight exchanges, this removes 74MB
of ExtendedExchangeExtension instances entirely.
The original decoupling (CAMEL-15105) was done to eliminate instanceof
casts. That benefit is preserved: callers still use
exchange.getExchangeExtension().method() without casting.
Co-authored-by: Claude Opus 4.6 <[email protected]>
---
.../org/apache/camel/support/AbstractExchange.java | 452 +++++++++++++++++----
.../camel/support/DefaultPooledExchange.java | 2 +-
2 files changed, 375 insertions(+), 79 deletions(-)
diff --git
a/core/camel-support/src/main/java/org/apache/camel/support/AbstractExchange.java
b/core/camel-support/src/main/java/org/apache/camel/support/AbstractExchange.java
index a9e72993777c..2674beae335f 100644
---
a/core/camel-support/src/main/java/org/apache/camel/support/AbstractExchange.java
+++
b/core/camel-support/src/main/java/org/apache/camel/support/AbstractExchange.java
@@ -16,6 +16,7 @@
*/
package org.apache.camel.support;
+import java.util.ArrayList;
import java.util.EnumMap;
import java.util.HashMap;
import java.util.List;
@@ -24,19 +25,24 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
+import org.apache.camel.AsyncCallback;
import org.apache.camel.CamelContext;
import org.apache.camel.CamelContextAware;
import org.apache.camel.CamelExecutionException;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
+import org.apache.camel.ExchangeExtension;
import org.apache.camel.ExchangePattern;
import org.apache.camel.ExchangePropertyKey;
import org.apache.camel.Message;
import org.apache.camel.MessageHistory;
import org.apache.camel.Route;
import org.apache.camel.SafeCopyProperty;
+import org.apache.camel.spi.Synchronization;
import org.apache.camel.spi.UnitOfWork;
import org.apache.camel.spi.VariableRepository;
+import org.apache.camel.trait.message.MessageTrait;
+import org.apache.camel.trait.message.RedeliveryTraitPayload;
import org.apache.camel.util.ObjectHelper;
/**
@@ -48,13 +54,12 @@ import org.apache.camel.util.ObjectHelper;
*
* @see DefaultExchange
*/
-abstract class AbstractExchange implements Exchange {
+abstract class AbstractExchange implements Exchange, ExchangeExtension {
private final CamelContext context;
- private final ExtendedExchangeExtension privateExtension;
protected final EnumMap<ExchangePropertyKey, Object> internalProperties;
- protected Map<String, Object> properties; // create properties on-demand
as we use internal properties mostly
+ protected Map<String, Object> properties;
protected Map<String, SafeCopyProperty> safeCopyProperties;
protected ExchangeVariableRepository variableRepository;
protected Message in;
@@ -63,11 +68,35 @@ abstract class AbstractExchange implements Exchange {
protected String exchangeId;
protected ExchangePattern pattern;
+ // fields inlined from ExtendedExchangeExtension
+ private Boolean errorHandlerHandled;
+ private boolean failureHandled;
+ private Endpoint fromEndpoint;
+ private String fromRouteId;
+ private boolean streamCacheDisabled;
+ private boolean redeliveryExhausted;
+ private int redeliveryCounter = -1;
+ private int redeliveryMaxCounter = -1;
+ private String sagaLongRunningAction;
+ private String historyNodeId;
+ private String historyNodeSource;
+ private String historyNodeLabel;
+ private boolean transacted;
+ private boolean notifyEvent;
+ private boolean interruptable = true;
+ private boolean interrupted;
+ private boolean routeStop;
+ private boolean rollbackOnly;
+ private boolean rollbackOnlyLast;
+ private AsyncCallback defaultConsumerCallback;
+ private UnitOfWork unitOfWork;
+ private List<Synchronization> onCompletions;
+ private RedeliveryTraitPayload externalRedelivered =
RedeliveryTraitPayload.UNDEFINED_REDELIVERY;
+
protected AbstractExchange(CamelContext context,
EnumMap<ExchangePropertyKey, Object> internalProperties,
Map<String, Object> properties) {
this.context = context;
this.internalProperties = new EnumMap<>(internalProperties);
- this.privateExtension = new ExtendedExchangeExtension(this);
this.properties = safeCopyProperties(properties);
}
@@ -79,17 +108,15 @@ abstract class AbstractExchange implements Exchange {
this.context = context;
this.pattern = pattern;
this.internalProperties = new EnumMap<>(ExchangePropertyKey.class);
- this.privateExtension = new ExtendedExchangeExtension(this);
}
protected AbstractExchange(Exchange parent) {
this.context = parent.getContext();
this.pattern = parent.getPattern();
this.internalProperties = new EnumMap<>(ExchangePropertyKey.class);
- this.privateExtension = new ExtendedExchangeExtension(this);
- this.privateExtension.setFromEndpoint(parent.getFromEndpoint());
- this.privateExtension.setFromRouteId(parent.getFromRouteId());
- this.privateExtension.setUnitOfWork(parent.getUnitOfWork());
+ this.fromEndpoint = parent.getFromEndpoint();
+ this.fromRouteId = parent.getFromRouteId();
+ this.unitOfWork = parent.getUnitOfWork();
}
@SuppressWarnings("CopyConstructorMissesField")
@@ -97,10 +124,9 @@ abstract class AbstractExchange implements Exchange {
this.context = parent.getContext();
this.pattern = parent.getPattern();
this.internalProperties = new EnumMap<>(parent.internalProperties);
- this.privateExtension = new ExtendedExchangeExtension(this);
- this.privateExtension.setFromEndpoint(parent.getFromEndpoint());
- this.privateExtension.setFromRouteId(parent.getFromRouteId());
- this.privateExtension.setUnitOfWork(parent.getUnitOfWork());
+ this.fromEndpoint = parent.getFromEndpoint();
+ this.fromRouteId = parent.getFromRouteId();
+ this.unitOfWork = parent.getUnitOfWork();
setIn(parent.getIn().copy());
@@ -110,13 +136,13 @@ abstract class AbstractExchange implements Exchange {
setException(parent.exception);
-
privateExtension.setNotifyEvent(parent.getExchangeExtension().isNotifyEvent());
-
privateExtension.setRedeliveryExhausted(parent.getExchangeExtension().isRedeliveryExhausted());
-
privateExtension.setErrorHandlerHandled(parent.getExchangeExtension().getErrorHandlerHandled());
-
privateExtension.setStreamCacheDisabled(parent.getExchangeExtension().isStreamCacheDisabled());
-
privateExtension.setRollbackOnly(parent.getExchangeExtension().isRollbackOnly());
-
privateExtension.setRollbackOnlyLast(parent.getExchangeExtension().isRollbackOnlyLast());
-
privateExtension.setRouteStop(parent.getExchangeExtension().isRouteStop());
+ this.notifyEvent = parent.notifyEvent;
+ this.redeliveryExhausted = parent.redeliveryExhausted;
+ this.errorHandlerHandled = parent.errorHandlerHandled;
+ this.streamCacheDisabled = parent.streamCacheDisabled;
+ this.rollbackOnly = parent.rollbackOnly;
+ this.rollbackOnlyLast = parent.rollbackOnlyLast;
+ this.routeStop = parent.routeStop;
if (parent.hasVariables()) {
if (this.variableRepository == null) {
@@ -268,15 +294,6 @@ abstract class AbstractExchange implements Exchange {
}
}
- void setProperties(Map<String, Object> properties) {
- if (this.properties == null) {
- this.properties = new ConcurrentHashMap<>(8);
- } else {
- this.properties.clear();
- }
- this.properties.putAll(properties);
- }
-
@Override
public Object removeProperty(String name) {
ExchangePropertyKey key =
ExchangePropertyKey.asExchangePropertyKey(name);
@@ -479,17 +496,6 @@ abstract class AbstractExchange implements Exchange {
return context.getTypeConverter().convertTo(type, this, in);
}
- <T> T getInOrNull(Class<T> type) {
- if (in == null) {
- return null;
- }
- if (type.isInstance(in)) {
- return type.cast(in);
- }
-
- return null;
- }
-
@Override
public void setIn(Message in) {
this.in = in;
@@ -588,8 +594,7 @@ abstract class AbstractExchange implements Exchange {
this.exception =
CamelExecutionException.wrapCamelExecutionException(this, t);
}
if (t instanceof InterruptedException) {
- // mark the exchange as interrupted due to the interrupt exception
- privateExtension.setInterrupted(true);
+ setInterrupted(true);
}
}
@@ -605,12 +610,12 @@ abstract class AbstractExchange implements Exchange {
@Override
public Endpoint getFromEndpoint() {
- return privateExtension.getFromEndpoint();
+ return fromEndpoint;
}
@Override
public String getFromRouteId() {
- return privateExtension.getFromRouteId();
+ return fromRouteId;
}
@Override
@@ -645,47 +650,47 @@ abstract class AbstractExchange implements Exchange {
@Override
public boolean isTransacted() {
- return privateExtension.isTransacted();
+ return transacted;
}
@Override
public boolean isRouteStop() {
- return privateExtension.isRouteStop();
+ return routeStop;
}
@Override
public void setRouteStop(boolean routeStop) {
- privateExtension.setRouteStop(routeStop);
+ this.routeStop = routeStop;
}
@Override
public boolean isExternalRedelivered() {
- return privateExtension.isExternalRedelivered(getIn());
+ return isExternalRedelivered(getIn());
}
@Override
public boolean isRollbackOnly() {
- return privateExtension.isRollbackOnly();
+ return rollbackOnly;
}
@Override
public void setRollbackOnly(boolean rollbackOnly) {
- privateExtension.setRollbackOnly(rollbackOnly);
+ this.rollbackOnly = rollbackOnly;
}
@Override
public boolean isRollbackOnlyLast() {
- return privateExtension.isRollbackOnlyLast();
+ return rollbackOnlyLast;
}
@Override
public void setRollbackOnlyLast(boolean rollbackOnlyLast) {
- privateExtension.setRollbackOnlyLast(rollbackOnlyLast);
+ this.rollbackOnlyLast = rollbackOnlyLast;
}
@Override
public UnitOfWork getUnitOfWork() {
- return privateExtension.getUnitOfWork();
+ return unitOfWork;
}
/**
@@ -698,21 +703,6 @@ abstract class AbstractExchange implements Exchange {
}
}
- void copyInternalProperties(Exchange target) {
- ((AbstractExchange)
target).internalProperties.putAll(internalProperties);
- }
-
- Map<String, Object> getInternalProperties() {
- Map<String, Object> map = new HashMap<>();
- for (ExchangePropertyKey key : ExchangePropertyKey.values()) {
- Object value = internalProperties.get(key);
- if (value != null) {
- map.put(key.getName(), value);
- }
- }
- return map;
- }
-
protected String createExchangeId() {
return context.getUuidGenerator().generateExchangeUuid();
}
@@ -727,36 +717,342 @@ abstract class AbstractExchange implements Exchange {
}
}
- void setSafeCopyProperty(String key, SafeCopyProperty value) {
+ @Override
+ public ExchangeExtension getExchangeExtension() {
+ return this;
+ }
+
+ // --- ExchangeExtension implementation (inlined from
ExtendedExchangeExtension) ---
+
+ @Override
+ public void setFromEndpoint(Endpoint fromEndpoint) {
+ this.fromEndpoint = fromEndpoint;
+ }
+
+ @Override
+ public void setFromRouteId(String fromRouteId) {
+ this.fromRouteId = fromRouteId;
+ }
+
+ @Override
+ public boolean isStreamCacheDisabled() {
+ return streamCacheDisabled;
+ }
+
+ @Override
+ public void setStreamCacheDisabled(boolean streamCacheDisabled) {
+ this.streamCacheDisabled = streamCacheDisabled;
+ }
+
+ @Override
+ public void addOnCompletion(Synchronization onCompletion) {
+ if (unitOfWork == null) {
+ if (onCompletions == null) {
+ onCompletions = new ArrayList<>();
+ }
+ onCompletions.add(onCompletion);
+ } else {
+ unitOfWork.addSynchronization(onCompletion);
+ }
+ }
+
+ @Override
+ public boolean isErrorHandlerHandledSet() {
+ return errorHandlerHandled != null;
+ }
+
+ @Override
+ public Boolean getErrorHandlerHandled() {
+ return errorHandlerHandled;
+ }
+
+ @Override
+ public void setErrorHandlerHandled(Boolean errorHandlerHandled) {
+ this.errorHandlerHandled = errorHandlerHandled;
+ }
+
+ @Override
+ public boolean isErrorHandlerHandled() {
+ return errorHandlerHandled;
+ }
+
+ @Override
+ public boolean isRedeliveryExhausted() {
+ return redeliveryExhausted;
+ }
+
+ @Override
+ public void setRedeliveryExhausted(boolean redeliveryExhausted) {
+ this.redeliveryExhausted = redeliveryExhausted;
+ }
+
+ @Override
+ public int getRedeliveryCounter() {
+ return redeliveryCounter;
+ }
+
+ @Override
+ public void setRedeliveryCounter(int redeliveryCounter) {
+ this.redeliveryCounter = redeliveryCounter;
+ }
+
+ @Override
+ public int getRedeliveryMaxCounter() {
+ return redeliveryMaxCounter;
+ }
+
+ @Override
+ public void setRedeliveryMaxCounter(int redeliveryMaxCounter) {
+ this.redeliveryMaxCounter = redeliveryMaxCounter;
+ }
+
+ @Override
+ public String getSagaLongRunningAction() {
+ return sagaLongRunningAction;
+ }
+
+ @Override
+ public void setSagaLongRunningAction(String sagaLongRunningAction) {
+ this.sagaLongRunningAction = sagaLongRunningAction;
+ }
+
+ @Override
+ public void handoverCompletions(Exchange target) {
+ if (onCompletions != null) {
+ for (Synchronization onCompletion : onCompletions) {
+ target.getExchangeExtension().addOnCompletion(onCompletion);
+ }
+ onCompletions.clear();
+ onCompletions = null;
+ } else if (unitOfWork != null) {
+ unitOfWork.handoverSynchronization(target);
+ }
+ }
+
+ @Override
+ public List<Synchronization> handoverCompletions() {
+ List<Synchronization> answer = null;
+ if (onCompletions != null) {
+ answer = new ArrayList<>(onCompletions);
+ onCompletions.clear();
+ onCompletions = null;
+ }
+ return answer;
+ }
+
+ @Override
+ public void setUnitOfWork(UnitOfWork unitOfWork) {
+ this.unitOfWork = unitOfWork;
+ if (unitOfWork != null && onCompletions != null) {
+ for (Synchronization onCompletion : onCompletions) {
+ unitOfWork.addSynchronization(onCompletion);
+ }
+ onCompletions.clear();
+ onCompletions = null;
+ }
+ }
+
+ @Override
+ public void setHistoryNodeId(String historyNodeId) {
+ this.historyNodeId = historyNodeId;
+ }
+
+ @Override
+ public String getHistoryNodeId() {
+ return historyNodeId;
+ }
+
+ @Override
+ public String getHistoryNodeSource() {
+ return historyNodeSource;
+ }
+
+ @Override
+ public void setHistoryNodeSource(String historyNodeSource) {
+ this.historyNodeSource = historyNodeSource;
+ }
+
+ @Override
+ public String getHistoryNodeLabel() {
+ return historyNodeLabel;
+ }
+
+ @Override
+ public void setHistoryNodeLabel(String historyNodeLabel) {
+ this.historyNodeLabel = historyNodeLabel;
+ }
+
+ @Override
+ public boolean isNotifyEvent() {
+ return notifyEvent;
+ }
+
+ @Override
+ public void setNotifyEvent(boolean notifyEvent) {
+ this.notifyEvent = notifyEvent;
+ }
+
+ @Override
+ public boolean containsOnCompletion(Synchronization onCompletion) {
+ if (unitOfWork != null) {
+ return unitOfWork.containsSynchronization(onCompletion);
+ } else {
+ return onCompletions != null &&
onCompletions.contains(onCompletion);
+ }
+ }
+
+ @Override
+ public void setTransacted(boolean transacted) {
+ this.transacted = transacted;
+ }
+
+ @Override
+ public void setInterruptable(boolean interruptable) {
+ this.interruptable = interruptable;
+ }
+
+ @Override
+ public boolean isInterrupted() {
+ return interrupted;
+ }
+
+ @Override
+ public void setInterrupted(boolean interrupted) {
+ if (interruptable) {
+ this.interrupted = interrupted;
+ }
+ }
+
+ @Override
+ public <T> T getInOrNull(Class<T> type) {
+ if (in == null) {
+ return null;
+ }
+ if (type.isInstance(in)) {
+ return type.cast(in);
+ }
+ return null;
+ }
+
+ @Override
+ public AsyncCallback getDefaultConsumerCallback() {
+ return defaultConsumerCallback;
+ }
+
+ @Override
+ public void setDefaultConsumerCallback(AsyncCallback callback) {
+ this.defaultConsumerCallback = callback;
+ }
+
+ @Override
+ public void setSafeCopyProperty(String key, SafeCopyProperty value) {
if (value != null) {
- // avoid the NullPointException
if (safeCopyProperties == null) {
this.safeCopyProperties = new ConcurrentHashMap<>(2);
}
safeCopyProperties.put(key, value);
} else if (safeCopyProperties != null) {
- // if the value is null, we just remove the key from the map
safeCopyProperties.remove(key);
}
-
}
- @SuppressWarnings("unchecked")
- <T> T getSafeCopyProperty(String key, Class<T> type) {
+ @Override
+ public <T> T getSafeCopyProperty(String key, Class<T> type) {
if (!hasSafeCopyProperties()) {
return null;
}
Object value = safeCopyProperties.get(key);
-
if (type.isInstance(value)) {
return (T) value;
}
-
return ExchangeHelper.convertToType(this, type, value);
}
- public ExtendedExchangeExtension getExchangeExtension() {
- return privateExtension;
+ @Override
+ public void copySafeCopyPropertiesTo(ExchangeExtension target) {
+ if (safeCopyProperties != null && !safeCopyProperties.isEmpty()) {
+ safeCopyProperties.forEach((k, v) -> target.setSafeCopyProperty(k,
v.safeCopy()));
+ }
+ }
+
+ @Override
+ public boolean isFailureHandled() {
+ return failureHandled;
+ }
+
+ @Override
+ public void setFailureHandled(boolean failureHandled) {
+ this.failureHandled = failureHandled;
+ }
+
+ @Override
+ public boolean isExternalRedelivered(Message message) {
+ if (externalRedelivered ==
RedeliveryTraitPayload.UNDEFINED_REDELIVERY) {
+ externalRedelivered = (RedeliveryTraitPayload)
message.getPayloadForTrait(MessageTrait.REDELIVERY);
+ }
+ return externalRedelivered == RedeliveryTraitPayload.IS_REDELIVERY;
+ }
+
+ @Override
+ public void copyInternalProperties(Exchange target) {
+ ((AbstractExchange)
target).internalProperties.putAll(internalProperties);
+ }
+
+ @Override
+ public void setProperties(Map<String, Object> properties) {
+ if (this.properties == null) {
+ this.properties = new ConcurrentHashMap<>(8);
+ } else {
+ this.properties.clear();
+ }
+ this.properties.putAll(properties);
+ }
+
+ @Override
+ public Map<String, Object> getInternalProperties() {
+ Map<String, Object> map = new HashMap<>();
+ for (ExchangePropertyKey key : ExchangePropertyKey.values()) {
+ Object value = internalProperties.get(key);
+ if (value != null) {
+ map.put(key.getName(), value);
+ }
+ }
+ return map;
+ }
+
+ @Override
+ public Exchange createCopyWithProperties(CamelContext context) {
+ DefaultExchange answer = new DefaultExchange(context,
internalProperties, properties);
+ answer.setPattern(pattern);
+ return answer;
+ }
+
+ public void resetExtension() {
+ if (unitOfWork != null) {
+ unitOfWork.reset();
+ }
+ if (onCompletions != null) {
+ onCompletions.clear();
+ }
+ if (variableRepository != null) {
+ variableRepository.clear();
+ }
+ externalRedelivered = RedeliveryTraitPayload.UNDEFINED_REDELIVERY;
+ historyNodeId = null;
+ historyNodeLabel = null;
+ transacted = false;
+ notifyEvent = false;
+ interrupted = false;
+ interruptable = true;
+ redeliveryExhausted = false;
+ redeliveryCounter = -1;
+ redeliveryMaxCounter = -1;
+ sagaLongRunningAction = null;
+ errorHandlerHandled = null;
+ streamCacheDisabled = false;
+ rollbackOnly = false;
+ rollbackOnlyLast = false;
+ routeStop = false;
}
private static Map<String, Object> safeCopyProperties(Map<String, Object>
properties) {
diff --git
a/core/camel-support/src/main/java/org/apache/camel/support/DefaultPooledExchange.java
b/core/camel-support/src/main/java/org/apache/camel/support/DefaultPooledExchange.java
index 528075c8e1b6..551559912734 100644
---
a/core/camel-support/src/main/java/org/apache/camel/support/DefaultPooledExchange.java
+++
b/core/camel-support/src/main/java/org/apache/camel/support/DefaultPooledExchange.java
@@ -120,7 +120,7 @@ public final class DefaultPooledExchange extends
AbstractExchange implements Poo
this.pattern = originalPattern;
// do not reset endpoint/fromRouteId as it would be the same
consumer/endpoint again
- getExchangeExtension().reset();
+ resetExtension();
if (onDone != null) {
onDone.onDone(this);