This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch exchange-factory in repository https://gitbox.apache.org/repos/asf/camel.git
commit c335c109b8a5b4217925354d70ccdde14ac4b264 Author: Claus Ibsen <[email protected]> AuthorDate: Wed Feb 17 11:02:33 2021 +0100 CAMEL-16222: camel-core - ExchangeFactory SPI --- .../org/apache/camel/ExtendedCamelContext.java | 11 +++++ .../java/org/apache/camel/spi/ExchangeFactory.java | 50 +++++++++++++++++++++ .../camel/impl/engine/AbstractCamelContext.java | 22 +++++++++ .../camel/impl/engine/DefaultExchangeFactory.java | 52 ++++++++++++++++++++++ .../camel/impl/engine/SimpleCamelContext.java | 12 +++++ .../camel/impl/ExtendedCamelContextConfigurer.java | 6 +++ .../camel/impl/lw/LightweightCamelContext.java | 11 +++++ .../impl/lw/LightweightRuntimeCamelContext.java | 13 ++++++ .../org/apache/camel/builder/ExchangeBuilder.java | 4 +- .../apache/camel/processor/WireTapProcessor.java | 7 ++- .../org/apache/camel/support/DefaultEndpoint.java | 14 +++++- 11 files changed, 196 insertions(+), 6 deletions(-) diff --git a/core/camel-api/src/main/java/org/apache/camel/ExtendedCamelContext.java b/core/camel-api/src/main/java/org/apache/camel/ExtendedCamelContext.java index 830a460..6cc0fc5 100644 --- a/core/camel-api/src/main/java/org/apache/camel/ExtendedCamelContext.java +++ b/core/camel-api/src/main/java/org/apache/camel/ExtendedCamelContext.java @@ -36,6 +36,7 @@ import org.apache.camel.spi.DataFormatResolver; import org.apache.camel.spi.DeferServiceFactory; import org.apache.camel.spi.EndpointStrategy; import org.apache.camel.spi.EndpointUriFactory; +import org.apache.camel.spi.ExchangeFactory; import org.apache.camel.spi.FactoryFinder; import org.apache.camel.spi.FactoryFinderResolver; import org.apache.camel.spi.HeadersMapFactory; @@ -214,6 +215,16 @@ public interface ExtendedCamelContext extends CamelContext { List<Service> getServices(); /** + * Gets the exchange factory to use. + */ + ExchangeFactory getExchangeFactory(); + + /** + * Sets a custom exchange factory to use. + */ + void setExchangeFactory(ExchangeFactory exchangeFactory); + + /** * Returns the bean post processor used to do any bean customization. * * @return the bean post processor. diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/ExchangeFactory.java b/core/camel-api/src/main/java/org/apache/camel/spi/ExchangeFactory.java new file mode 100644 index 0000000..f1eefd0 --- /dev/null +++ b/core/camel-api/src/main/java/org/apache/camel/spi/ExchangeFactory.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.spi; + +import org.apache.camel.Endpoint; +import org.apache.camel.Exchange; + +/** + * Factory for creating {@link Exchange}. + * + * The factory is pluggable which allows to use different strategies. The default factory will create a new + * {@link Exchange} instance, and the pooled factory will pool and reuse exchanges. + */ +public interface ExchangeFactory { + + /** + * Service factory key. + */ + String FACTORY = "exchange-factory"; + + /** + * Gets a new {@link Exchange} + */ + Exchange create(); + + /** + * Gets a new {@link Exchange} + * + * @param fromEndpoint the from endpoint + */ + Exchange create(Endpoint fromEndpoint); + + default void release(Exchange exchange) { + // noop + } +} diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java index 9ae5702..3e1451f 100644 --- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java @@ -101,6 +101,7 @@ import org.apache.camel.spi.EndpointRegistry; import org.apache.camel.spi.EndpointStrategy; import org.apache.camel.spi.EndpointUriFactory; import org.apache.camel.spi.EventNotifier; +import org.apache.camel.spi.ExchangeFactory; import org.apache.camel.spi.ExecutorServiceManager; import org.apache.camel.spi.FactoryFinder; import org.apache.camel.spi.FactoryFinderResolver; @@ -264,6 +265,7 @@ public abstract class AbstractCamelContext extends BaseService private volatile String version; private volatile PropertiesComponent propertiesComponent; private volatile CamelContextNameStrategy nameStrategy; + private volatile ExchangeFactory exchangeFactory; private volatile ReactiveExecutor reactiveExecutor; private volatile ManagementNameStrategy managementNameStrategy; private volatile Registry registry; @@ -3680,6 +3682,7 @@ public abstract class AbstractCamelContext extends BaseService typeConverter = null; reactiveExecutor = null; asyncProcessorAwaitManager = null; + exchangeFactory = null; } /** @@ -4640,6 +4643,23 @@ public abstract class AbstractCamelContext extends BaseService } @Override + public ExchangeFactory getExchangeFactory() { + if (exchangeFactory == null) { + synchronized (lock) { + if (exchangeFactory == null) { + setExchangeFactory(createExchangeFactory()); + } + } + } + return exchangeFactory; + } + + @Override + public void setExchangeFactory(ExchangeFactory exchangeFactory) { + this.exchangeFactory = doAddService(exchangeFactory); + } + + @Override public ReactiveExecutor getReactiveExecutor() { if (reactiveExecutor == null) { synchronized (lock) { @@ -4731,6 +4751,8 @@ public abstract class AbstractCamelContext extends BaseService return "CamelContext(" + getName() + ")"; } + protected abstract ExchangeFactory createExchangeFactory(); + protected abstract HealthCheckRegistry createHealthCheckRegistry(); protected abstract ReactiveExecutor createReactiveExecutor(); diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultExchangeFactory.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultExchangeFactory.java new file mode 100644 index 0000000..fedae54 --- /dev/null +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultExchangeFactory.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.impl.engine; + +import org.apache.camel.CamelContext; +import org.apache.camel.CamelContextAware; +import org.apache.camel.Endpoint; +import org.apache.camel.Exchange; +import org.apache.camel.spi.ExchangeFactory; +import org.apache.camel.support.DefaultExchange; + +/** + * Default {@link ExchangeFactory} that creates a new {@link Exchange} instance. + */ +public class DefaultExchangeFactory implements ExchangeFactory, CamelContextAware { + + private CamelContext camelContext; + + @Override + public CamelContext getCamelContext() { + return camelContext; + } + + @Override + public void setCamelContext(CamelContext camelContext) { + this.camelContext = camelContext; + } + + @Override + public Exchange create() { + return new DefaultExchange(camelContext); + } + + @Override + public Exchange create(Endpoint fromEndpoint) { + return new DefaultExchange(fromEndpoint); + } +} diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/SimpleCamelContext.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/SimpleCamelContext.java index 92b95f7..b91710c 100644 --- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/SimpleCamelContext.java +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/SimpleCamelContext.java @@ -41,6 +41,7 @@ import org.apache.camel.spi.ConfigurerResolver; import org.apache.camel.spi.DataFormatResolver; import org.apache.camel.spi.DeferServiceFactory; import org.apache.camel.spi.EndpointRegistry; +import org.apache.camel.spi.ExchangeFactory; import org.apache.camel.spi.ExecutorServiceManager; import org.apache.camel.spi.FactoryFinder; import org.apache.camel.spi.FactoryFinderResolver; @@ -543,6 +544,17 @@ public class SimpleCamelContext extends AbstractCamelContext { } @Override + protected ExchangeFactory createExchangeFactory() { + Optional<ExchangeFactory> result = ResolverHelper.resolveService( + getCamelContextReference(), + getBootstrapFactoryFinder(), + ExchangeFactory.FACTORY, + ExchangeFactory.class); + + return result.orElseGet(DefaultExchangeFactory::new); + } + + @Override protected ReactiveExecutor createReactiveExecutor() { Optional<ReactiveExecutor> result = ResolverHelper.resolveService( getCamelContextReference(), diff --git a/core/camel-core-engine/src/generated/java/org/apache/camel/impl/ExtendedCamelContextConfigurer.java b/core/camel-core-engine/src/generated/java/org/apache/camel/impl/ExtendedCamelContextConfigurer.java index 2310e2e..b6e2a47 100644 --- a/core/camel-core-engine/src/generated/java/org/apache/camel/impl/ExtendedCamelContextConfigurer.java +++ b/core/camel-core-engine/src/generated/java/org/apache/camel/impl/ExtendedCamelContextConfigurer.java @@ -65,6 +65,8 @@ public class ExtendedCamelContextConfigurer extends org.apache.camel.support.com case "ErrorHandlerFactory": target.setErrorHandlerFactory(property(camelContext, org.apache.camel.ErrorHandlerFactory.class, value)); return true; case "eventnotificationapplicable": case "EventNotificationApplicable": target.setEventNotificationApplicable(property(camelContext, boolean.class, value)); return true; + case "exchangefactory": + case "ExchangeFactory": target.setExchangeFactory(property(camelContext, org.apache.camel.spi.ExchangeFactory.class, value)); return true; case "executorservicemanager": case "ExecutorServiceManager": target.setExecutorServiceManager(property(camelContext, org.apache.camel.spi.ExecutorServiceManager.class, value)); return true; case "factoryfinderresolver": @@ -232,6 +234,8 @@ public class ExtendedCamelContextConfigurer extends org.apache.camel.support.com case "ErrorHandlerFactory": return org.apache.camel.ErrorHandlerFactory.class; case "eventnotificationapplicable": case "EventNotificationApplicable": return boolean.class; + case "exchangefactory": + case "ExchangeFactory": return org.apache.camel.spi.ExchangeFactory.class; case "executorservicemanager": case "ExecutorServiceManager": return org.apache.camel.spi.ExecutorServiceManager.class; case "factoryfinderresolver": @@ -400,6 +404,8 @@ public class ExtendedCamelContextConfigurer extends org.apache.camel.support.com case "ErrorHandlerFactory": return target.getErrorHandlerFactory(); case "eventnotificationapplicable": case "EventNotificationApplicable": return target.isEventNotificationApplicable(); + case "exchangefactory": + case "ExchangeFactory": return target.getExchangeFactory(); case "executorservicemanager": case "ExecutorServiceManager": return target.getExecutorServiceManager(); case "factoryfinderresolver": diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/impl/lw/LightweightCamelContext.java b/core/camel-core-engine/src/main/java/org/apache/camel/impl/lw/LightweightCamelContext.java index 033417a..7732b81 100644 --- a/core/camel-core-engine/src/main/java/org/apache/camel/impl/lw/LightweightCamelContext.java +++ b/core/camel-core-engine/src/main/java/org/apache/camel/impl/lw/LightweightCamelContext.java @@ -88,6 +88,7 @@ import org.apache.camel.spi.DeferServiceFactory; import org.apache.camel.spi.EndpointRegistry; import org.apache.camel.spi.EndpointStrategy; import org.apache.camel.spi.EndpointUriFactory; +import org.apache.camel.spi.ExchangeFactory; import org.apache.camel.spi.ExecutorServiceManager; import org.apache.camel.spi.FactoryFinder; import org.apache.camel.spi.FactoryFinderResolver; @@ -1441,6 +1442,16 @@ public class LightweightCamelContext implements ExtendedCamelContext, CatalogCam } @Override + public ExchangeFactory getExchangeFactory() { + return getExtendedCamelContext().getExchangeFactory(); + } + + @Override + public void setExchangeFactory(ExchangeFactory exchangeFactory) { + getExtendedCamelContext().setExchangeFactory(exchangeFactory); + } + + @Override public ReactiveExecutor getReactiveExecutor() { return getExtendedCamelContext().getReactiveExecutor(); } diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/impl/lw/LightweightRuntimeCamelContext.java b/core/camel-core-engine/src/main/java/org/apache/camel/impl/lw/LightweightRuntimeCamelContext.java index 8ff2277..cc96226 100644 --- a/core/camel-core-engine/src/main/java/org/apache/camel/impl/lw/LightweightRuntimeCamelContext.java +++ b/core/camel-core-engine/src/main/java/org/apache/camel/impl/lw/LightweightRuntimeCamelContext.java @@ -85,6 +85,7 @@ import org.apache.camel.spi.DeferServiceFactory; import org.apache.camel.spi.EndpointRegistry; import org.apache.camel.spi.EndpointStrategy; import org.apache.camel.spi.EndpointUriFactory; +import org.apache.camel.spi.ExchangeFactory; import org.apache.camel.spi.ExecutorServiceManager; import org.apache.camel.spi.FactoryFinder; import org.apache.camel.spi.FactoryFinderResolver; @@ -167,6 +168,7 @@ public class LightweightRuntimeCamelContext implements ExtendedCamelContext, Cat private final PropertiesComponent propertiesComponent; private final BeanIntrospection beanIntrospection; private final HeadersMapFactory headersMapFactory; + private final ExchangeFactory exchangeFactory; private final ReactiveExecutor reactiveExecutor; private final AsyncProcessorAwaitManager asyncProcessorAwaitManager; private final ExecutorServiceManager executorServiceManager; @@ -211,6 +213,7 @@ public class LightweightRuntimeCamelContext implements ExtendedCamelContext, Cat propertiesComponent = context.getPropertiesComponent(); beanIntrospection = context.adapt(ExtendedCamelContext.class).getBeanIntrospection(); headersMapFactory = context.adapt(ExtendedCamelContext.class).getHeadersMapFactory(); + exchangeFactory = context.adapt(ExtendedCamelContext.class).getExchangeFactory(); reactiveExecutor = context.adapt(ExtendedCamelContext.class).getReactiveExecutor(); asyncProcessorAwaitManager = context.adapt(ExtendedCamelContext.class).getAsyncProcessorAwaitManager(); executorServiceManager = context.getExecutorServiceManager(); @@ -1559,6 +1562,16 @@ public class LightweightRuntimeCamelContext implements ExtendedCamelContext, Cat } @Override + public ExchangeFactory getExchangeFactory() { + return exchangeFactory; + } + + @Override + public void setExchangeFactory(ExchangeFactory exchangeFactory) { + throw new UnsupportedOperationException(); + } + + @Override public ReactiveExecutor getReactiveExecutor() { return reactiveExecutor; } diff --git a/core/camel-core-model/src/main/java/org/apache/camel/builder/ExchangeBuilder.java b/core/camel-core-model/src/main/java/org/apache/camel/builder/ExchangeBuilder.java index 8594bfa..1088cdc 100644 --- a/core/camel-core-model/src/main/java/org/apache/camel/builder/ExchangeBuilder.java +++ b/core/camel-core-model/src/main/java/org/apache/camel/builder/ExchangeBuilder.java @@ -22,8 +22,8 @@ import java.util.Map; import org.apache.camel.CamelContext; import org.apache.camel.Exchange; import org.apache.camel.ExchangePattern; +import org.apache.camel.ExtendedCamelContext; import org.apache.camel.Message; -import org.apache.camel.support.DefaultExchange; /** * Builder to create {@link Exchange} and add headers and set body on the Exchange {@link Message}. @@ -103,7 +103,7 @@ public final class ExchangeBuilder { * @return exchange */ public Exchange build() { - Exchange exchange = new DefaultExchange(context); + Exchange exchange = context.adapt(ExtendedCamelContext.class).getExchangeFactory().create(); Message message = exchange.getIn(); message.setBody(body); if (headers.size() > 0) { diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/WireTapProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/WireTapProcessor.java index 9ca24e6..4ce6bb5 100644 --- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/WireTapProcessor.java +++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/WireTapProcessor.java @@ -29,6 +29,7 @@ import org.apache.camel.CamelContextAware; import org.apache.camel.Exchange; import org.apache.camel.ExchangePattern; import org.apache.camel.Expression; +import org.apache.camel.ExtendedCamelContext; import org.apache.camel.Message; import org.apache.camel.Processor; import org.apache.camel.RuntimeCamelException; @@ -41,7 +42,6 @@ import org.apache.camel.spi.RouteIdAware; import org.apache.camel.spi.ShutdownAware; import org.apache.camel.support.AsyncProcessorConverterHelper; import org.apache.camel.support.AsyncProcessorSupport; -import org.apache.camel.support.DefaultExchange; import org.apache.camel.support.ExchangeHelper; import org.apache.camel.support.service.ServiceHelper; import org.apache.camel.util.ObjectHelper; @@ -257,7 +257,10 @@ public class WireTapProcessor extends AsyncProcessorSupport } private Exchange configureNewExchange(Exchange exchange) { - return new DefaultExchange(exchange.getFromEndpoint(), ExchangePattern.InOnly); + Exchange answer + = camelContext.adapt(ExtendedCamelContext.class).getExchangeFactory().create(exchange.getFromEndpoint()); + answer.setPattern(ExchangePattern.InOnly); + return answer; } public List<Processor> getNewExchangeProcessors() { diff --git a/core/camel-support/src/main/java/org/apache/camel/support/DefaultEndpoint.java b/core/camel-support/src/main/java/org/apache/camel/support/DefaultEndpoint.java index fc5a21d..ae48c58 100644 --- a/core/camel-support/src/main/java/org/apache/camel/support/DefaultEndpoint.java +++ b/core/camel-support/src/main/java/org/apache/camel/support/DefaultEndpoint.java @@ -27,8 +27,10 @@ import org.apache.camel.Consumer; import org.apache.camel.Endpoint; import org.apache.camel.Exchange; import org.apache.camel.ExchangePattern; +import org.apache.camel.ExtendedCamelContext; import org.apache.camel.PollingConsumer; import org.apache.camel.spi.ExceptionHandler; +import org.apache.camel.spi.ExchangeFactory; import org.apache.camel.spi.HasId; import org.apache.camel.spi.Metadata; import org.apache.camel.spi.PropertyConfigurer; @@ -54,6 +56,7 @@ public abstract class DefaultEndpoint extends ServiceSupport implements Endpoint private volatile String endpointUri; private CamelContext camelContext; private Component component; + private ExchangeFactory exchangeFactory; @Metadata(label = "advanced", defaultValue = "true", description = "Whether autowiring is enabled. This is used for automatic autowiring options (the option must be marked as autowired)" @@ -96,6 +99,7 @@ public abstract class DefaultEndpoint extends ServiceSupport implements Endpoint protected DefaultEndpoint(String endpointUri, Component component) { this.camelContext = component == null ? null : component.getCamelContext(); this.component = component; + this.exchangeFactory = camelContext.adapt(ExtendedCamelContext.class).getExchangeFactory(); this.setEndpointUri(endpointUri); } @@ -226,12 +230,14 @@ public abstract class DefaultEndpoint extends ServiceSupport implements Endpoint @Override public Exchange createExchange() { - return new DefaultExchange(this, getExchangePattern()); + return createExchange(getExchangePattern()); } @Override public Exchange createExchange(ExchangePattern pattern) { - return new DefaultExchange(this, pattern); + Exchange exchange = exchangeFactory.create(this); + exchange.setPattern(pattern); + return exchange; } /** @@ -480,6 +486,10 @@ public abstract class DefaultEndpoint extends ServiceSupport implements Endpoint protected void doInit() throws Exception { ObjectHelper.notNull(getCamelContext(), "camelContext"); + if (exchangeFactory == null) { + exchangeFactory = camelContext.adapt(ExtendedCamelContext.class).getExchangeFactory(); + } + if (autowiredEnabled && getComponent() != null) { PropertyConfigurer configurer = getComponent().getEndpointPropertyConfigurer(); if (configurer instanceof PropertyConfigurerGetter) {
