This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push:
new 31ba781 CAMEL-16279: camel-core - Optimize core to reduce object
allocations … (#5183)
31ba781 is described below
commit 31ba781997e413bf4e5a57653f47c8b6af6271b3
Author: Claus Ibsen <[email protected]>
AuthorDate: Mon Mar 8 16:48:38 2021 +0100
CAMEL-16279: camel-core - Optimize core to reduce object allocations …
(#5183)
CAMEL-16279: camel-core - Optimize core to reduce object allocations by
pooloing reusable tasks in the routing engine.
CAMEL-16314: camel-core - Some components does not work well with pooled
exchanges
---
.../KinesisConsumerClosedShardWithSilentTest.java | 5 +-
.../camel/component/hdfs/HdfsConsumerTest.java | 6 +
.../component/jetty/CamelContinuationServlet.java | 4 +-
.../component/jms/EndpointMessageListener.java | 24 +--
.../camel/component/jpa/AbstractJpaMethodTest.java | 7 +-
.../camel/component/jpa/JpaWithNamedQueryTest.java | 3 +-
.../component/master/EndpointUriEncodingTest.java | 3 +-
.../http/handlers/HttpServerChannelHandler.java | 3 +-
.../camel/component/netty/http/BaseNettyTest.java | 1 +
.../netty/handlers/ServerChannelHandler.java | 5 +-
.../camel/component/netty/BaseNettyTest.java | 1 +
.../component/quickfixj/QuickfixjEndpoint.java | 3 +
.../reactive/streams/ReactiveStreamsConsumer.java | 4 +-
.../engine/ReactorStreamsServiceTestSupport.java | 5 +
.../engine/RxJavaStreamsServiceTestSupport.java | 5 +
.../sjms/consumer/EndpointMessageListener.java | 25 +--
.../springrabbit/EndpointMessageListener.java | 24 +--
.../yammer/YammerMessageAndUserRouteTest.java | 7 -
.../java/org/apache/camel/spi/ExchangeFactory.java | 86 +---------
.../apache/camel/spi/InternalProcessorFactory.java | 4 +-
.../org/apache/camel/spi/PooledObjectFactory.java | 125 +++++++++++++++
.../camel/impl/engine/AbstractCamelContext.java | 3 +
.../camel/impl/engine/CamelInternalProcessor.java | 1 +
.../camel/impl/engine/PooledExchangeFactory.java | 17 +-
.../impl/engine/PrototypeExchangeFactory.java | 111 ++-----------
.../camel/impl/engine/SimpleCamelContext.java | 6 +-
.../InterceptSendToEndpointProcessor.java | 55 ++++---
.../java/org/apache/camel/processor/Pipeline.java | 61 +++++--
.../apache/camel/processor/PooledExchangeTask.java | 41 +++++
.../camel/processor/PooledExchangeTaskFactory.java | 56 +++++++
.../apache/camel/processor/PooledTaskFactory.java | 72 +++++++++
.../camel/processor/PrototypeTaskFactory.java | 48 ++++++
.../errorhandler/RedeliveryErrorHandler.java | 175 ++++++++++++++++-----
.../camel/support/PooledObjectFactorySupport.java | 152 ++++++++++++++++++
.../support/PrototypeObjectFactorySupport.java | 139 ++++++++++++++++
35 files changed, 931 insertions(+), 356 deletions(-)
diff --git
a/components/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisConsumerClosedShardWithSilentTest.java
b/components/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisConsumerClosedShardWithSilentTest.java
index d43f23f..ce1ebac 100644
---
a/components/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisConsumerClosedShardWithSilentTest.java
+++
b/components/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisConsumerClosedShardWithSilentTest.java
@@ -67,7 +67,7 @@ public class KinesisConsumerClosedShardWithSilentTest {
configuration.setIteratorType(ShardIteratorType.LATEST);
configuration.setShardClosed(Kinesis2ShardClosedStrategyEnum.silent);
configuration.setStreamName("streamName");
- Kinesis2Endpoint endpoint = new Kinesis2Endpoint(null, configuration,
component);
+ Kinesis2Endpoint endpoint = new Kinesis2Endpoint("aws2-kinesis:foo",
configuration, component);
endpoint.start();
undertest = new Kinesis2Consumer(endpoint, processor);
@@ -84,6 +84,9 @@ public class KinesisConsumerClosedShardWithSilentTest {
.streamDescription(StreamDescription.builder().shards(shardList).build()).build());
when(kinesisClient.getShardIterator(any(GetShardIteratorRequest.class)))
.thenReturn(GetShardIteratorResponse.builder().shardIterator("shardIterator").build());
+
+ context.start();
+ undertest.start();
}
@Test
diff --git
a/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsConsumerTest.java
b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsConsumerTest.java
index b1ca851..1681487 100644
---
a/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsConsumerTest.java
+++
b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsConsumerTest.java
@@ -21,8 +21,10 @@ import java.io.InputStream;
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
+import org.apache.camel.ExtendedCamelContext;
import org.apache.camel.Processor;
import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.impl.engine.PrototypeExchangeFactory;
import org.apache.camel.support.DefaultExchange;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
@@ -83,6 +85,8 @@ public class HdfsConsumerTest {
when(endpointConfig.getOpenedSuffix()).thenReturn(DEFAULT_OPENED_SUFFIX);
context = new DefaultCamelContext();
+ // this test is mocking and its easier to test with prototype scoped
+ context.adapt(ExtendedCamelContext.class).setExchangeFactory(new
PrototypeExchangeFactory());
}
@Test
@@ -159,6 +163,7 @@ public class HdfsConsumerTest {
ArgumentCaptor<Exchange> exchangeCaptor =
ArgumentCaptor.forClass(Exchange.class);
underTest = new HdfsConsumer(endpoint, processor, endpointConfig,
hdfsInfoFactory, new StringBuilder(hdfsPath));
+ underTest.start();
// when
int actual = underTest.doPoll();
@@ -211,6 +216,7 @@ public class HdfsConsumerTest {
ArgumentCaptor<Exchange> exchangeCaptor =
ArgumentCaptor.forClass(Exchange.class);
underTest = new HdfsConsumer(endpoint, processor, endpointConfig,
hdfsInfoFactory, new StringBuilder(hdfsPath));
+ underTest.start();
// when
int actual = underTest.doPoll();
diff --git
a/components/camel-jetty-common/src/main/java/org/apache/camel/component/jetty/CamelContinuationServlet.java
b/components/camel-jetty-common/src/main/java/org/apache/camel/component/jetty/CamelContinuationServlet.java
index fdd4974..65db856 100644
---
a/components/camel-jetty-common/src/main/java/org/apache/camel/component/jetty/CamelContinuationServlet.java
+++
b/components/camel-jetty-common/src/main/java/org/apache/camel/component/jetty/CamelContinuationServlet.java
@@ -181,7 +181,8 @@ public class CamelContinuationServlet extends CamelServlet {
}
// a new request so create an exchange
- final Exchange exchange = consumer.createExchange(false);
+ // must be prototype scoped (not pooled) so we create the exchange
via endpoint
+ final Exchange exchange = endpoint.createExchange();
exchange.setPattern(ExchangePattern.InOut);
if (consumer.getEndpoint().isBridgeEndpoint()) {
@@ -269,7 +270,6 @@ public class CamelContinuationServlet extends CamelServlet {
throw new ServletException(e);
} finally {
consumer.doneUoW(result);
- consumer.releaseExchange(result, false);
}
}
diff --git
a/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java
b/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java
index 872b2af..5bb1f9f 100644
---
a/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java
+++
b/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java
@@ -26,7 +26,6 @@ import org.apache.camel.AsyncCallback;
import org.apache.camel.AsyncProcessor;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangePattern;
-import org.apache.camel.ExtendedExchange;
import org.apache.camel.Processor;
import org.apache.camel.RollbackExchangeException;
import org.apache.camel.RuntimeCamelException;
@@ -150,9 +149,6 @@ public class EndpointMessageListener implements
SessionAwareMessageListener {
// if we failed processed the exchange from the async callback
task, then grab the exception
rce = exchange.getException(RuntimeCamelException.class);
- // the exchange is now done so release it
- consumer.releaseExchange(exchange, false);
-
} catch (Exception e) {
rce = wrapRuntimeCamelException(e);
}
@@ -256,31 +252,15 @@ public class EndpointMessageListener implements
SessionAwareMessageListener {
}
}
}
-
- // if we completed from async processing then we should release
the exchange
- // the sync processing will release the exchange outside this
callback
- if (!doneSync) {
- consumer.releaseExchange(exchange, false);
- }
}
}
public Exchange createExchange(Message message, Session session, Object
replyDestination) {
- Exchange exchange = consumer.createExchange(false);
+ // must be prototype scoped (not pooled) so we create the exchange via
endpoint
+ Exchange exchange = endpoint.createExchange(message, session);
JmsBinding binding = getBinding();
exchange.setProperty(Exchange.BINDING, binding);
- // optimize: either create a new JmsMessage or reuse existing if exists
- JmsMessage msg =
exchange.adapt(ExtendedExchange.class).getInOrNull(JmsMessage.class);
- if (msg == null) {
- msg = new JmsMessage(exchange, message, session, binding);
- exchange.setIn(msg);
- } else {
- msg.setJmsMessage(message);
- msg.setJmsSession(session);
- msg.setBinding(binding);
- }
-
// lets set to an InOut if we have some kind of reply-to destination
if (replyDestination != null && !disableReplyTo) {
// only change pattern if not already out capable
diff --git
a/components/camel-jpa/src/test/java/org/apache/camel/component/jpa/AbstractJpaMethodTest.java
b/components/camel-jpa/src/test/java/org/apache/camel/component/jpa/AbstractJpaMethodTest.java
index 5bcf4f6..154dd41 100644
---
a/components/camel-jpa/src/test/java/org/apache/camel/component/jpa/AbstractJpaMethodTest.java
+++
b/components/camel-jpa/src/test/java/org/apache/camel/component/jpa/AbstractJpaMethodTest.java
@@ -45,7 +45,7 @@ public abstract class AbstractJpaMethodTest extends
CamelTestSupport {
protected EntityManager entityManager;
protected TransactionTemplate transactionTemplate;
protected Consumer consumer;
- protected Exchange receivedExchange;
+ protected Customer receivedCustomer;
abstract boolean usePersist();
@@ -123,7 +123,7 @@ public abstract class AbstractJpaMethodTest extends
CamelTestSupport {
consumer = endpoint.createConsumer(new Processor() {
public void process(Exchange e) {
- receivedExchange = e;
+ receivedCustomer = e.getIn().getBody(Customer.class);
assertNotNull(e.getIn().getHeader(JpaConstants.ENTITY_MANAGER,
EntityManager.class));
latch.countDown();
}
@@ -135,8 +135,7 @@ public abstract class AbstractJpaMethodTest extends
CamelTestSupport {
consumer.stop();
Thread.sleep(1000);
- assertNotNull(receivedExchange);
- Customer receivedCustomer =
receivedExchange.getIn().getBody(Customer.class);
+ assertNotNull(receivedCustomer);
assertEquals(customer.getName(), receivedCustomer.getName());
assertEquals(customer.getId(), receivedCustomer.getId());
assertEquals(customer.getAddress().getAddressLine1(),
receivedCustomer.getAddress().getAddressLine1());
diff --git
a/components/camel-jpa/src/test/java/org/apache/camel/component/jpa/JpaWithNamedQueryTest.java
b/components/camel-jpa/src/test/java/org/apache/camel/component/jpa/JpaWithNamedQueryTest.java
index 55c35db..04bc452 100644
---
a/components/camel-jpa/src/test/java/org/apache/camel/component/jpa/JpaWithNamedQueryTest.java
+++
b/components/camel-jpa/src/test/java/org/apache/camel/component/jpa/JpaWithNamedQueryTest.java
@@ -95,7 +95,8 @@ public class JpaWithNamedQueryTest {
consumer = endpoint.createConsumer(new Processor() {
public void process(Exchange e) {
LOG.info("Received exchange: " + e.getIn());
- receivedExchange = e;
+ // make defensive copy
+ receivedExchange = e.copy();
latch.countDown();
}
});
diff --git
a/components/camel-master/src/test/java/org/apache/camel/component/master/EndpointUriEncodingTest.java
b/components/camel-master/src/test/java/org/apache/camel/component/master/EndpointUriEncodingTest.java
index c78a06c..9ec78a6 100644
---
a/components/camel-master/src/test/java/org/apache/camel/component/master/EndpointUriEncodingTest.java
+++
b/components/camel-master/src/test/java/org/apache/camel/component/master/EndpointUriEncodingTest.java
@@ -85,7 +85,8 @@ public class EndpointUriEncodingTest extends CamelTestSupport
{
public Consumer createConsumer(Processor processor) {
return new DefaultConsumer(this, processor) {
@Override
- public void start() {
+ protected void doStart() throws Exception {
+ super.doStart();
Exchange exchange = createExchange(true);
exchange.getMessage().setHeader("foo", foo);
exchange.getMessage().setHeader("bar", bar);
diff --git
a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/handlers/HttpServerChannelHandler.java
b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/handlers/HttpServerChannelHandler.java
index 1831d64..d6873a6 100644
---
a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/handlers/HttpServerChannelHandler.java
+++
b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/handlers/HttpServerChannelHandler.java
@@ -343,7 +343,8 @@ public class HttpServerChannelHandler extends
ServerChannelHandler {
@Override
protected Exchange createExchange(ChannelHandlerContext ctx, Object
message) throws Exception {
- Exchange exchange = consumer.createExchange(false);
+ // must be prototype scoped (not pooled) so we create the exchange via
endpoint
+ Exchange exchange = consumer.getEndpoint().createExchange();
// create a new IN message as we cannot reuse with netty
Message in;
diff --git
a/components/camel-netty-http/src/test/java/org/apache/camel/component/netty/http/BaseNettyTest.java
b/components/camel-netty-http/src/test/java/org/apache/camel/component/netty/http/BaseNettyTest.java
index adafadd..d2b41c4 100644
---
a/components/camel-netty-http/src/test/java/org/apache/camel/component/netty/http/BaseNettyTest.java
+++
b/components/camel-netty-http/src/test/java/org/apache/camel/component/netty/http/BaseNettyTest.java
@@ -48,6 +48,7 @@ public class BaseNettyTest extends CamelTestSupport {
public static void startLeakDetection() {
System.setProperty("io.netty.leakDetection.maxRecords", "100");
System.setProperty("io.netty.leakDetection.acquireAndReleaseOnly",
"true");
+ System.setProperty("io.netty.leakDetection.targetRecords", "100");
ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.PARANOID);
}
diff --git
a/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java
b/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java
index 7c0eaed..3a6a796 100644
---
a/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java
+++
b/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java
@@ -113,7 +113,8 @@ public class ServerChannelHandler extends
SimpleChannelInboundHandler<Object> {
}
protected Exchange createExchange(ChannelHandlerContext ctx, Object
message) throws Exception {
- Exchange exchange = consumer.createExchange(false);
+ // must be prototype scoped (not pooled) so we create the exchange via
endpoint
+ Exchange exchange = consumer.getEndpoint().createExchange();
consumer.getEndpoint().updateMessageHeader(exchange.getIn(), ctx);
NettyPayloadHelper.setIn(exchange, message);
return exchange;
@@ -140,7 +141,6 @@ public class ServerChannelHandler extends
SimpleChannelInboundHandler<Object> {
consumer.getExceptionHandler().handleException(e);
} finally {
consumer.doneUoW(exchange);
- consumer.releaseExchange(exchange, false);
}
}
@@ -155,7 +155,6 @@ public class ServerChannelHandler extends
SimpleChannelInboundHandler<Object> {
consumer.getExceptionHandler().handleException(e);
} finally {
consumer.doneUoW(exchange);
- consumer.releaseExchange(exchange, false);
}
});
}
diff --git
a/components/camel-netty/src/test/java/org/apache/camel/component/netty/BaseNettyTest.java
b/components/camel-netty/src/test/java/org/apache/camel/component/netty/BaseNettyTest.java
index b98e501..8d40832 100644
---
a/components/camel-netty/src/test/java/org/apache/camel/component/netty/BaseNettyTest.java
+++
b/components/camel-netty/src/test/java/org/apache/camel/component/netty/BaseNettyTest.java
@@ -49,6 +49,7 @@ public class BaseNettyTest extends CamelTestSupport {
public static void startLeakDetection() {
System.setProperty("io.netty.leakDetection.maxRecords", "100");
System.setProperty("io.netty.leakDetection.acquireAndReleaseOnly",
"true");
+ System.setProperty("io.netty.leakDetection.targetRecords", "100");
ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.PARANOID);
}
diff --git
a/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEndpoint.java
b/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEndpoint.java
index 0f45d55..b83f0bc 100644
---
a/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEndpoint.java
+++
b/components/camel-quickfix/src/main/java/org/apache/camel/component/quickfixj/QuickfixjEndpoint.java
@@ -33,6 +33,7 @@ import org.apache.camel.spi.UriEndpoint;
import org.apache.camel.spi.UriParam;
import org.apache.camel.spi.UriPath;
import org.apache.camel.support.DefaultEndpoint;
+import org.apache.camel.support.service.ServiceHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import quickfix.Message;
@@ -132,6 +133,8 @@ public class QuickfixjEndpoint extends DefaultEndpoint
implements QuickfixjEvent
public void onEvent(QuickfixjEventCategory eventCategory, SessionID
sessionID, Message message) throws Exception {
if (this.sessionID == null || isMatching(sessionID)) {
for (QuickfixjConsumer consumer : consumers) {
+ // ensure consumer is started
+ ServiceHelper.startService(consumer);
Exchange exchange
= QuickfixjConverters.toExchange(consumer, sessionID,
message, eventCategory, getExchangePattern());
try {
diff --git
a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsConsumer.java
b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsConsumer.java
index 815c3e2..989b901 100644
---
a/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsConsumer.java
+++
b/components/camel-reactive-streams/src/main/java/org/apache/camel/component/reactive/streams/ReactiveStreamsConsumer.java
@@ -75,7 +75,7 @@ public class ReactiveStreamsConsumer extends DefaultConsumer {
public void onComplete() {
if (endpoint.isForwardOnComplete()) {
- Exchange exchange = createExchange(true);
+ Exchange exchange = endpoint.createExchange();
exchange.getIn().setHeader(ReactiveStreamsConstants.REACTIVE_STREAMS_EVENT_TYPE,
"onComplete");
doSend(exchange, done -> {
@@ -85,7 +85,7 @@ public class ReactiveStreamsConsumer extends DefaultConsumer {
public void onError(Throwable error) {
if (endpoint.isForwardOnError()) {
- Exchange exchange = createExchange(true);
+ Exchange exchange = endpoint.createExchange();
exchange.getIn().setHeader(ReactiveStreamsConstants.REACTIVE_STREAMS_EVENT_TYPE,
"onError");
exchange.getIn().setBody(error);
diff --git
a/components/camel-reactor/src/test/java/org/apache/camel/component/reactor/engine/ReactorStreamsServiceTestSupport.java
b/components/camel-reactor/src/test/java/org/apache/camel/component/reactor/engine/ReactorStreamsServiceTestSupport.java
index 2a86a93..f8723b3 100644
---
a/components/camel-reactor/src/test/java/org/apache/camel/component/reactor/engine/ReactorStreamsServiceTestSupport.java
+++
b/components/camel-reactor/src/test/java/org/apache/camel/component/reactor/engine/ReactorStreamsServiceTestSupport.java
@@ -17,10 +17,12 @@
package org.apache.camel.component.reactor.engine;
import org.apache.camel.CamelContext;
+import org.apache.camel.ExtendedCamelContext;
import org.apache.camel.component.reactive.streams.ReactiveStreamsComponent;
import org.apache.camel.component.reactive.streams.ReactiveStreamsConstants;
import org.apache.camel.component.reactive.streams.api.CamelReactiveStreams;
import
org.apache.camel.component.reactive.streams.api.CamelReactiveStreamsService;
+import org.apache.camel.impl.engine.PrototypeExchangeFactory;
import org.apache.camel.test.junit5.CamelTestSupport;
import org.apache.camel.util.ObjectHelper;
@@ -31,6 +33,9 @@ class ReactorStreamsServiceTestSupport extends
CamelTestSupport {
protected CamelContext createCamelContext() throws Exception {
CamelContext context = super.createCamelContext();
+ // camel-reactor does not work with pooled exchanges
+ context.adapt(ExtendedCamelContext.class).setExchangeFactory(new
PrototypeExchangeFactory());
+
context.addComponent(
ReactiveStreamsConstants.SCHEME,
ReactiveStreamsComponent.withServiceType(ReactorStreamsConstants.SERVICE_NAME));
diff --git
a/components/camel-rxjava/src/test/java/org/apache/camel/component/rxjava/engine/RxJavaStreamsServiceTestSupport.java
b/components/camel-rxjava/src/test/java/org/apache/camel/component/rxjava/engine/RxJavaStreamsServiceTestSupport.java
index 93519fd..81d8549 100644
---
a/components/camel-rxjava/src/test/java/org/apache/camel/component/rxjava/engine/RxJavaStreamsServiceTestSupport.java
+++
b/components/camel-rxjava/src/test/java/org/apache/camel/component/rxjava/engine/RxJavaStreamsServiceTestSupport.java
@@ -17,10 +17,12 @@
package org.apache.camel.component.rxjava.engine;
import org.apache.camel.CamelContext;
+import org.apache.camel.ExtendedCamelContext;
import org.apache.camel.component.reactive.streams.ReactiveStreamsComponent;
import org.apache.camel.component.reactive.streams.ReactiveStreamsConstants;
import org.apache.camel.component.reactive.streams.api.CamelReactiveStreams;
import
org.apache.camel.component.reactive.streams.api.CamelReactiveStreamsService;
+import org.apache.camel.impl.engine.PrototypeExchangeFactory;
import org.apache.camel.test.junit5.CamelTestSupport;
import org.apache.camel.util.ObjectHelper;
@@ -31,6 +33,9 @@ class RxJavaStreamsServiceTestSupport extends
CamelTestSupport {
protected CamelContext createCamelContext() throws Exception {
CamelContext context = super.createCamelContext();
+ // camel-rxjava does not work with pooled exchanges
+ context.adapt(ExtendedCamelContext.class).setExchangeFactory(new
PrototypeExchangeFactory());
+
context.addComponent(
ReactiveStreamsConstants.SCHEME,
ReactiveStreamsComponent.withServiceType(RxJavaStreamsConstants.SERVICE_NAME));
diff --git
a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/EndpointMessageListener.java
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/EndpointMessageListener.java
index b2c9515..99ae442 100644
---
a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/EndpointMessageListener.java
+++
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/EndpointMessageListener.java
@@ -29,7 +29,6 @@ import org.apache.camel.AsyncProcessor;
import org.apache.camel.CamelExchangeException;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangePattern;
-import org.apache.camel.ExtendedExchange;
import org.apache.camel.Processor;
import org.apache.camel.RollbackExchangeException;
import org.apache.camel.RuntimeCamelException;
@@ -38,7 +37,6 @@ import org.apache.camel.component.sjms.SessionMessageListener;
import org.apache.camel.component.sjms.SjmsConstants;
import org.apache.camel.component.sjms.SjmsConsumer;
import org.apache.camel.component.sjms.SjmsEndpoint;
-import org.apache.camel.component.sjms.SjmsMessage;
import org.apache.camel.component.sjms.SjmsTemplate;
import org.apache.camel.component.sjms.jms.JmsMessageHelper;
import org.apache.camel.support.AsyncProcessorConverterHelper;
@@ -212,9 +210,6 @@ public class EndpointMessageListener implements
SessionMessageListener {
// if we failed processed the exchange from the async callback
task, then grab the exception
rce = exchange.getException(RuntimeCamelException.class);
- // the exchange is now done so release it
- consumer.releaseExchange(exchange, false);
-
} catch (Exception e) {
rce = wrapRuntimeCamelException(e);
}
@@ -242,18 +237,8 @@ public class EndpointMessageListener implements
SessionMessageListener {
}
public Exchange createExchange(Message message, Session session, Object
replyDestination) {
- Exchange exchange = consumer.createExchange(false);
-
- // optimize: either create a new SjmsMessage or reuse existing if
exists
- SjmsMessage msg =
exchange.adapt(ExtendedExchange.class).getInOrNull(SjmsMessage.class);
- if (msg == null) {
- msg = new SjmsMessage(exchange, message, session,
endpoint.getBinding());
- exchange.setIn(msg);
- } else {
- msg.setJmsMessage(message);
- msg.setJmsSession(session);
- msg.setBinding(endpoint.getBinding());
- }
+ // must be prototype scoped (not pooled) so we create the exchange via
endpoint
+ Exchange exchange = endpoint.createExchange(message, session);
// lets set to an InOut if we have some kind of reply-to destination
if (replyDestination != null && !disableReplyTo) {
@@ -474,12 +459,6 @@ public class EndpointMessageListener implements
SessionMessageListener {
}
}
}
-
- // if we completed from async processing then we should release
the exchange
- // the sync processing will release the exchange outside this
callback
- if (!doneSync) {
- consumer.releaseExchange(exchange, false);
- }
}
}
diff --git
a/components/camel-spring-rabbitmq/src/main/java/org/apache/camel/component/springrabbit/EndpointMessageListener.java
b/components/camel-spring-rabbitmq/src/main/java/org/apache/camel/component/springrabbit/EndpointMessageListener.java
index 39a7f02..0766bc1 100644
---
a/components/camel-spring-rabbitmq/src/main/java/org/apache/camel/component/springrabbit/EndpointMessageListener.java
+++
b/components/camel-spring-rabbitmq/src/main/java/org/apache/camel/component/springrabbit/EndpointMessageListener.java
@@ -16,8 +16,6 @@
*/
package org.apache.camel.component.springrabbit;
-import java.util.Map;
-
import com.rabbitmq.client.Channel;
import org.apache.camel.AsyncCallback;
import org.apache.camel.AsyncProcessor;
@@ -141,9 +139,6 @@ public class EndpointMessageListener implements
ChannelAwareMessageListener {
// if we failed processed the exchange from the async callback
task, then grab the exception
rce = exchange.getException(RuntimeCamelException.class);
- // the exchange is now done so release it
- consumer.releaseExchange(exchange, false);
-
} catch (Exception e) {
rce = wrapRuntimeCamelException(e);
}
@@ -160,18 +155,7 @@ public class EndpointMessageListener implements
ChannelAwareMessageListener {
}
protected Exchange createExchange(Message message, Channel channel, Object
replyDestination) {
- Exchange exchange = consumer.createExchange(false);
-
- Object body = messageConverter.fromMessage(message);
- exchange.getMessage().setBody(body);
-
- // TODO: optimize to use existing headers map
- Map<String, Object> headers
- =
messagePropertiesConverter.fromMessageProperties(message.getMessageProperties(),
exchange);
- if (!headers.isEmpty()) {
- exchange.getMessage().setHeaders(headers);
- }
-
+ Exchange exchange = endpoint.createExchange(message);
exchange.setProperty(SpringRabbitMQConstants.CHANNEL, channel);
// lets set to an InOut if we have some kind of reply-to destination
@@ -261,12 +245,6 @@ public class EndpointMessageListener implements
ChannelAwareMessageListener {
}
}
}
-
- // if we completed from async processing then we should release
the exchange
- // the sync processing will release the exchange outside this
callback
- if (!doneSync) {
- consumer.releaseExchange(exchange, false);
- }
}
private void sendReply(Address replyDestination, Message message,
Exchange exchange, org.apache.camel.Message out) {
diff --git
a/components/camel-yammer/src/test/java/org/apache/camel/component/yammer/YammerMessageAndUserRouteTest.java
b/components/camel-yammer/src/test/java/org/apache/camel/component/yammer/YammerMessageAndUserRouteTest.java
index eadb46d..056f558 100644
---
a/components/camel-yammer/src/test/java/org/apache/camel/component/yammer/YammerMessageAndUserRouteTest.java
+++
b/components/camel-yammer/src/test/java/org/apache/camel/component/yammer/YammerMessageAndUserRouteTest.java
@@ -21,7 +21,6 @@ import org.apache.camel.Exchange;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.component.yammer.model.Messages;
-import org.apache.camel.component.yammer.model.User;
import org.apache.camel.test.junit5.CamelTestSupport;
import org.junit.jupiter.api.Test;
@@ -70,12 +69,6 @@ public class YammerMessageAndUserRouteTest extends
CamelTestSupport {
template.sendBody("direct:start", "overwrite me");
userMock.assertIsSatisfied();
-
- exchange = userMock.getExchanges().get(0);
- User user = exchange.getIn().getBody(User.class);
-
- assertEquals("Joe Camel", user.getFullName());
- assertEquals("[email protected]",
user.getContact().getEmailAddresses().get(0).getAddress());
}
@Override
diff --git
a/core/camel-api/src/main/java/org/apache/camel/spi/ExchangeFactory.java
b/core/camel-api/src/main/java/org/apache/camel/spi/ExchangeFactory.java
index fcedd54..29b3fdb 100644
--- a/core/camel-api/src/main/java/org/apache/camel/spi/ExchangeFactory.java
+++ b/core/camel-api/src/main/java/org/apache/camel/spi/ExchangeFactory.java
@@ -16,12 +16,10 @@
*/
package org.apache.camel.spi;
-import org.apache.camel.CamelContextAware;
import org.apache.camel.Consumer;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.NonManagedService;
-import org.apache.camel.Service;
/**
* Factory used by {@link Consumer} to create Camel {@link Exchange} holding
the incoming message received by the
@@ -36,50 +34,7 @@ import org.apache.camel.Service;
* The factory is pluggable which allows to use different strategies. The
default factory will create a new
* {@link Exchange} instance, and the pooled factory will pool and reuse
exchanges.
*/
-public interface ExchangeFactory extends Service, CamelContextAware,
NonManagedService, RouteIdAware {
-
- /**
- * Utilization statistics of the this factory.
- */
- interface Statistics {
-
- /**
- * Number of new exchanges created.
- */
- long getCreatedCounter();
-
- /**
- * Number of exchanges acquired (reused) when using pooled factory.
- */
- long getAcquiredCounter();
-
- /**
- * Number of exchanges released back to pool
- */
- long getReleasedCounter();
-
- /**
- * Number of exchanges discarded (thrown away) such as if no space in
cache pool.
- */
- long getDiscardedCounter();
-
- /**
- * Reset the counters
- */
- void reset();
-
- /**
- * Whether statistics is enabled.
- */
- boolean isStatisticsEnabled();
-
- /**
- * Sets whether statistics is enabled.
- *
- * @param statisticsEnabled <tt>true</tt> to enable
- */
- void setStatisticsEnabled(boolean statisticsEnabled);
- }
+public interface ExchangeFactory extends PooledObjectFactory<Exchange>,
NonManagedService, RouteIdAware {
/**
* Service factory key.
@@ -125,43 +80,8 @@ public interface ExchangeFactory extends Service,
CamelContextAware, NonManagedS
}
/**
- * The capacity the pool (for each consumer) uses for storing exchanges.
The default capacity is 100.
- */
- int getCapacity();
-
- /**
- * The current number of exchanges in the pool
- */
- int getSize();
-
- /**
- * The capacity the pool (for each consumer) uses for storing exchanges.
The default capacity is 100.
- */
- void setCapacity(int capacity);
-
- /**
- * Whether statistics is enabled.
- */
- boolean isStatisticsEnabled();
-
- /**
- * Whether statistics is enabled.
- */
- void setStatisticsEnabled(boolean statisticsEnabled);
-
- /**
- * Reset the statistics
- */
- void resetStatistics();
-
- /**
- * Purges the internal cache (if pooled)
- */
- void purge();
-
- /**
- * Gets the usage statistics
+ * Whether the factory is pooled.
*/
- Statistics getStatistics();
+ boolean isPooled();
}
diff --git
a/core/camel-api/src/main/java/org/apache/camel/spi/InternalProcessorFactory.java
b/core/camel-api/src/main/java/org/apache/camel/spi/InternalProcessorFactory.java
index b3f8868..8a56f21 100644
---
a/core/camel-api/src/main/java/org/apache/camel/spi/InternalProcessorFactory.java
+++
b/core/camel-api/src/main/java/org/apache/camel/spi/InternalProcessorFactory.java
@@ -27,7 +27,9 @@ import org.apache.camel.Route;
/**
* A factory used internally by Camel to create {@link Processor} and other
internal building blocks. This factory is
- * used to have loose coupling between the modules in core. Camel user user
should only use {@link ProcessorFactory}.
+ * used to have loose coupling between the modules in core.
+ *
+ * Camel end user should NOT use this, but use {@link ProcessorFactory}
instead.
*
* @see ProcessorFactory
*/
diff --git
a/core/camel-api/src/main/java/org/apache/camel/spi/PooledObjectFactory.java
b/core/camel-api/src/main/java/org/apache/camel/spi/PooledObjectFactory.java
new file mode 100644
index 0000000..db4c0d1
--- /dev/null
+++ b/core/camel-api/src/main/java/org/apache/camel/spi/PooledObjectFactory.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.spi;
+
+import org.apache.camel.CamelContextAware;
+import org.apache.camel.Service;
+
+/**
+ * Factory for pooled objects or tasks.
+ */
+public interface PooledObjectFactory<T> extends Service, CamelContextAware {
+
+ /**
+ * Utilization statistics of the this factory.
+ */
+ interface Statistics {
+
+ /**
+ * Number of new exchanges created.
+ */
+ long getCreatedCounter();
+
+ /**
+ * Number of exchanges acquired (reused) when using pooled factory.
+ */
+ long getAcquiredCounter();
+
+ /**
+ * Number of exchanges released back to pool
+ */
+ long getReleasedCounter();
+
+ /**
+ * Number of exchanges discarded (thrown away) such as if no space in
cache pool.
+ */
+ long getDiscardedCounter();
+
+ /**
+ * Reset the counters
+ */
+ void reset();
+
+ /**
+ * Whether statistics is enabled.
+ */
+ boolean isStatisticsEnabled();
+
+ /**
+ * Sets whether statistics is enabled.
+ *
+ * @param statisticsEnabled <tt>true</tt> to enable
+ */
+ void setStatisticsEnabled(boolean statisticsEnabled);
+ }
+
+ /**
+ * The current number of objects in the pool
+ */
+ int getSize();
+
+ /**
+ * The capacity the pool uses for storing objects. The default capacity is
100.
+ */
+ int getCapacity();
+
+ /**
+ * The capacity the pool uses for storing objects. The default capacity is
100.
+ */
+ void setCapacity(int capacity);
+
+ /**
+ * Whether statistics is enabled.
+ */
+ boolean isStatisticsEnabled();
+
+ /**
+ * Whether statistics is enabled.
+ */
+ void setStatisticsEnabled(boolean statisticsEnabled);
+
+ /**
+ * Reset the statistics
+ */
+ void resetStatistics();
+
+ /**
+ * Purges the internal cache (if pooled)
+ */
+ void purge();
+
+ /**
+ * Gets the usage statistics
+ */
+ Statistics getStatistics();
+
+ /**
+ * Acquires an object from the pool (if any)
+ *
+ * @return the object or <tt>null</tt> if the pool is empty
+ */
+ T acquire();
+
+ /**
+ * Releases the object back to the pool
+ *
+ * @param t the object
+ * @return true if released into the pool, or false if something went
wrong and the object was discarded
+ */
+ boolean release(T t);
+
+}
diff --git
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java
index d7e6dd4..45fcf6d 100644
---
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java
+++
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java
@@ -2882,6 +2882,9 @@ public abstract class AbstractCamelContext extends
BaseService
}
bootstraps.clear();
+ if (adapt(ExtendedCamelContext.class).getExchangeFactory().isPooled())
{
+ LOG.info("Pooled mode enabled. Camel pools and reuses objects to
reduce JVM object allocations.");
+ }
if (isLightweight()) {
LOG.info("Lightweight mode enabled. Performing optimizations and
memory reduction.");
ReifierStrategy.clearReifiers();
diff --git
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java
index 9696d1b..9e847fe 100644
---
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java
+++
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java
@@ -271,6 +271,7 @@ public class CamelInternalProcessor extends
DelegateAsyncProcessor implements In
}
// create internal callback which will execute the advices in reverse
order when done
+ // TODO: pool this task, and the states array
AsyncCallback callback = new AsyncAfterTask(states, exchange,
originalCallback);
if (exchange.isTransacted()) {
diff --git
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java
index 4079a46..bd40516 100644
---
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java
+++
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java
@@ -168,11 +168,20 @@ public final class PooledExchangeFactory extends
PrototypeExchangeFactory {
}
@Override
+ public boolean isPooled() {
+ return true;
+ }
+
+ @Override
protected void doStop() throws Exception {
- exchangeFactoryManager.removeExchangeFactory(this);
- logUsageSummary(LOG, "PooledExchangeFactory", pool.size());
- statistics.reset();
- pool.clear();
+ if (exchangeFactoryManager != null) {
+ exchangeFactoryManager.removeExchangeFactory(this);
+ }
+ if (pool != null) {
+ logUsageSummary(LOG, "PooledExchangeFactory", pool.size());
+ statistics.reset();
+ pool.clear();
+ }
// do not call super
}
diff --git
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PrototypeExchangeFactory.java
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PrototypeExchangeFactory.java
index fb17cb0..eef1ea7 100644
---
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PrototypeExchangeFactory.java
+++
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PrototypeExchangeFactory.java
@@ -16,9 +16,6 @@
*/
package org.apache.camel.impl.engine;
-import java.util.concurrent.atomic.LongAdder;
-
-import org.apache.camel.CamelContext;
import org.apache.camel.Consumer;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
@@ -26,7 +23,7 @@ import org.apache.camel.ExtendedCamelContext;
import org.apache.camel.spi.ExchangeFactory;
import org.apache.camel.spi.ExchangeFactoryManager;
import org.apache.camel.support.DefaultExchange;
-import org.apache.camel.support.service.ServiceSupport;
+import org.apache.camel.support.PooledObjectFactorySupport;
import org.apache.camel.util.URISupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -34,13 +31,11 @@ import org.slf4j.LoggerFactory;
/**
* {@link ExchangeFactory} that creates a new {@link Exchange} instance.
*/
-public class PrototypeExchangeFactory extends ServiceSupport implements
ExchangeFactory {
+public class PrototypeExchangeFactory extends
PooledObjectFactorySupport<Exchange> implements ExchangeFactory {
private static final Logger LOG =
LoggerFactory.getLogger(PrototypeExchangeFactory.class);
- final UtilizationStatistics statistics = new UtilizationStatistics();
final Consumer consumer;
- CamelContext camelContext;
ExchangeFactoryManager exchangeFactoryManager;
String routeId;
@@ -54,6 +49,7 @@ public class PrototypeExchangeFactory extends ServiceSupport
implements Exchange
@Override
protected void doBuild() throws Exception {
+ super.doBuild();
this.exchangeFactoryManager =
camelContext.adapt(ExtendedCamelContext.class).getExchangeFactoryManager();
}
@@ -73,16 +69,6 @@ public class PrototypeExchangeFactory extends ServiceSupport
implements Exchange
}
@Override
- public CamelContext getCamelContext() {
- return camelContext;
- }
-
- @Override
- public void setCamelContext(CamelContext camelContext) {
- this.camelContext = camelContext;
- }
-
- @Override
public ExchangeFactory newExchangeFactory(Consumer consumer) {
PrototypeExchangeFactory answer = new
PrototypeExchangeFactory(consumer);
answer.setStatisticsEnabled(statistics.isStatisticsEnabled());
@@ -92,6 +78,11 @@ public class PrototypeExchangeFactory extends ServiceSupport
implements Exchange
}
@Override
+ public Exchange acquire() {
+ throw new UnsupportedOperationException("Not in use");
+ }
+
+ @Override
public Exchange create(boolean autoRelease) {
if (statistics.isStatisticsEnabled()) {
statistics.created.increment();
@@ -116,47 +107,18 @@ public class PrototypeExchangeFactory extends
ServiceSupport implements Exchange
}
@Override
- public boolean isStatisticsEnabled() {
- return statistics.isStatisticsEnabled();
- }
-
- @Override
- public void setStatisticsEnabled(boolean statisticsEnabled) {
- statistics.setStatisticsEnabled(statisticsEnabled);
- }
-
- @Override
- public int getCapacity() {
- return 0;
- }
-
- @Override
- public int getSize() {
- return 0;
- }
-
- @Override
- public void setCapacity(int capacity) {
- // not in use
- }
-
- @Override
public void resetStatistics() {
statistics.reset();
}
@Override
- public void purge() {
- // not in use
- }
-
- @Override
- public Statistics getStatistics() {
- return statistics;
+ public boolean isPooled() {
+ return false;
}
@Override
protected void doStart() throws Exception {
+ super.doStart();
if (exchangeFactoryManager != null) {
exchangeFactoryManager.addExchangeFactory(this);
}
@@ -164,6 +126,7 @@ public class PrototypeExchangeFactory extends
ServiceSupport implements Exchange
@Override
protected void doStop() throws Exception {
+ super.doStop();
if (exchangeFactoryManager != null) {
exchangeFactoryManager.removeExchangeFactory(this);
}
@@ -190,54 +153,4 @@ public class PrototypeExchangeFactory extends
ServiceSupport implements Exchange
}
}
- /**
- * Represents utilization statistics
- */
- final class UtilizationStatistics implements ExchangeFactory.Statistics {
-
- boolean statisticsEnabled;
- final LongAdder created = new LongAdder();
- final LongAdder acquired = new LongAdder();
- final LongAdder released = new LongAdder();
- final LongAdder discarded = new LongAdder();
-
- @Override
- public void reset() {
- created.reset();
- acquired.reset();
- released.reset();
- discarded.reset();
- }
-
- @Override
- public long getCreatedCounter() {
- return created.longValue();
- }
-
- @Override
- public long getAcquiredCounter() {
- return acquired.longValue();
- }
-
- @Override
- public long getReleasedCounter() {
- return released.longValue();
- }
-
- @Override
- public long getDiscardedCounter() {
- return discarded.longValue();
- }
-
- @Override
- public boolean isStatisticsEnabled() {
- return statisticsEnabled;
- }
-
- @Override
- public void setStatisticsEnabled(boolean statisticsEnabled) {
- this.statisticsEnabled = statisticsEnabled;
- }
- }
-
}
diff --git
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/SimpleCamelContext.java
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/SimpleCamelContext.java
index 057cd5a..efe5d39 100644
---
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/SimpleCamelContext.java
+++
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/SimpleCamelContext.java
@@ -16,9 +16,6 @@
*/
package org.apache.camel.impl.engine;
-import java.util.Map;
-import java.util.Optional;
-
import org.apache.camel.CamelContext;
import org.apache.camel.Endpoint;
import org.apache.camel.Processor;
@@ -85,6 +82,9 @@ import org.apache.camel.support.ResolverHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Map;
+import java.util.Optional;
+
/**
* Represents the context used to configure routes and the policies to use.
*/
diff --git
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/InterceptSendToEndpointProcessor.java
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/InterceptSendToEndpointProcessor.java
index 859a48d..275fa6a 100644
---
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/InterceptSendToEndpointProcessor.java
+++
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/InterceptSendToEndpointProcessor.java
@@ -46,6 +46,7 @@ public class InterceptSendToEndpointProcessor extends
DefaultAsyncProducer {
private final Endpoint delegate;
private final AsyncProducer producer;
private final boolean skip;
+ private AsyncProcessor pipeline;
public InterceptSendToEndpointProcessor(InterceptSendToEndpoint endpoint,
Endpoint delegate, AsyncProducer producer,
boolean skip) {
@@ -71,24 +72,9 @@ public class InterceptSendToEndpointProcessor extends
DefaultAsyncProducer {
// add header with the real endpoint uri
exchange.getIn().setHeader(Exchange.INTERCEPTED_ENDPOINT,
delegate.getEndpointUri());
- if (endpoint.getBefore() != null || endpoint.getAfter() != null) {
- // detour the exchange using synchronous processing
- AsyncProcessor before = null;
- if (endpoint.getBefore() != null) {
- before =
AsyncProcessorConverterHelper.convert(endpoint.getBefore());
- }
- AsyncProcessor ascb = new AsyncProcessorSupport() {
- @Override
- public boolean process(Exchange exchange, AsyncCallback
callback) {
- return callback(exchange, callback, true);
- }
- };
- AsyncProcessor after = null;
- if (endpoint.getAfter() != null) {
- after =
AsyncProcessorConverterHelper.convert(endpoint.getAfter());
- }
-
- return new Pipeline(exchange.getContext(), Arrays.asList(before,
ascb, after)).process(exchange, callback);
+ if (pipeline != null) {
+ // detour the exchange with the pipeline that has before and after
included
+ return pipeline.process(exchange, callback);
}
return callback(exchange, callback, true);
@@ -140,19 +126,38 @@ public class InterceptSendToEndpointProcessor extends
DefaultAsyncProducer {
@Override
protected void doBuild() throws Exception {
- ServiceHelper.buildService(producer);
+ // build pipeline with befofe/after processors
+ if (endpoint.getBefore() != null || endpoint.getAfter() != null) {
+ // detour the exchange using synchronous processing
+ AsyncProcessor before = null;
+ if (endpoint.getBefore() != null) {
+ before =
AsyncProcessorConverterHelper.convert(endpoint.getBefore());
+ }
+ AsyncProcessor ascb = new AsyncProcessorSupport() {
+ @Override
+ public boolean process(Exchange exchange, AsyncCallback
callback) {
+ return callback(exchange, callback, true);
+ }
+ };
+ AsyncProcessor after = null;
+ if (endpoint.getAfter() != null) {
+ after =
AsyncProcessorConverterHelper.convert(endpoint.getAfter());
+ }
+
+ pipeline = new Pipeline(getEndpoint().getCamelContext(),
Arrays.asList(before, ascb, after));
+ }
+
+ ServiceHelper.buildService(producer, pipeline);
}
@Override
protected void doInit() throws Exception {
- ServiceHelper.initService(producer);
+ ServiceHelper.initService(producer, pipeline);
}
@Override
protected void doStart() throws Exception {
- ServiceHelper.startService(endpoint.getBefore(), endpoint.getAfter());
- // here we also need to start the producer
- ServiceHelper.startService(producer);
+ ServiceHelper.startService(producer, pipeline);
}
@Override
@@ -162,4 +167,8 @@ public class InterceptSendToEndpointProcessor extends
DefaultAsyncProducer {
ServiceHelper.stopService(producer);
}
+ @Override
+ protected void doShutdown() throws Exception {
+ ServiceHelper.stopAndShutdownServices(producer, pipeline);
+ }
}
diff --git
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/Pipeline.java
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/Pipeline.java
index 3bbbf14..6d6b3d1 100644
---
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/Pipeline.java
+++
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/Pipeline.java
@@ -53,18 +53,32 @@ public class Pipeline extends AsyncProcessorSupport
implements Navigate<Processo
private final ReactiveExecutor reactiveExecutor;
private final List<AsyncProcessor> processors;
private final int size;
+ private PooledExchangeTaskFactory taskFactory;
+
private String id;
private String routeId;
- private final class PipelineTask implements Runnable, AsyncCallback {
+ private final class PipelineTask implements PooledExchangeTask,
AsyncCallback {
- private final Exchange exchange;
- private final AsyncCallback callback;
+ private Exchange exchange;
+ private AsyncCallback callback;
private int index;
- PipelineTask(Exchange exchange, AsyncCallback callback) {
+ PipelineTask() {
+ }
+
+ @Override
+ public void prepare(Exchange exchange, AsyncCallback callback) {
this.exchange = exchange;
this.callback = callback;
+ this.index = 0;
+ }
+
+ @Override
+ public void reset() {
+ this.exchange = null;
+ this.callback = null;
+ this.index = 0;
}
@Override
@@ -101,7 +115,9 @@ public class Pipeline extends AsyncProcessorSupport
implements Navigate<Processo
LOG.trace("Processing complete for exchangeId: {} >>> {}",
exchange.getExchangeId(), exchange);
}
- reactiveExecutor.schedule(callback);
+ AsyncCallback cb = callback;
+ taskFactory.release(this);
+ reactiveExecutor.schedule(cb);
}
}
}
@@ -142,7 +158,7 @@ public class Pipeline extends AsyncProcessorSupport
implements Navigate<Processo
@Override
public boolean process(Exchange exchange, AsyncCallback callback) {
// create task which has state used during routing
- PipelineTask task = new PipelineTask(exchange, callback);
+ PooledExchangeTask task = taskFactory.acquire(exchange, callback);
if (exchange.isTransacted()) {
reactiveExecutor.scheduleSync(task);
@@ -154,22 +170,47 @@ public class Pipeline extends AsyncProcessorSupport
implements Navigate<Processo
@Override
protected void doBuild() throws Exception {
- ServiceHelper.buildService(processors);
+ boolean pooled =
camelContext.adapt(ExtendedCamelContext.class).getExchangeFactory().isPooled();
+ if (pooled) {
+ taskFactory = new PooledTaskFactory() {
+ @Override
+ public PooledExchangeTask create(Exchange exchange,
AsyncCallback callback) {
+ return new PipelineTask();
+ }
+ };
+ int capacity =
camelContext.adapt(ExtendedCamelContext.class).getExchangeFactory().getCapacity();
+ taskFactory.setCapacity(capacity);
+ } else {
+ taskFactory = new PrototypeTaskFactory() {
+ @Override
+ public PooledExchangeTask create(Exchange exchange,
AsyncCallback callback) {
+ return new PipelineTask();
+ }
+ };
+ }
+ LOG.trace("Using TaskFactory: {}", taskFactory);
+
+ ServiceHelper.buildService(taskFactory, processors);
}
@Override
protected void doInit() throws Exception {
- ServiceHelper.initService(processors);
+ ServiceHelper.initService(taskFactory, processors);
}
@Override
protected void doStart() throws Exception {
- ServiceHelper.startService(processors);
+ ServiceHelper.startService(taskFactory, processors);
}
@Override
protected void doStop() throws Exception {
- ServiceHelper.stopService(processors);
+ ServiceHelper.stopService(taskFactory, processors);
+ }
+
+ @Override
+ protected void doShutdown() throws Exception {
+ ServiceHelper.stopAndShutdownServices(taskFactory, processors);
}
@Override
diff --git
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/PooledExchangeTask.java
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/PooledExchangeTask.java
new file mode 100644
index 0000000..d4e0226
--- /dev/null
+++
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/PooledExchangeTask.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.Exchange;
+
+/**
+ * A task that EIPs and internal routing engine uses to store state when
processing an {@link Exchange}.
+ *
+ * @see org.apache.camel.processor.PooledExchangeTaskFactory
+ */
+public interface PooledExchangeTask extends Runnable {
+
+ /**
+ * Prepares the task for the given exchange and its callback
+ *
+ * @param exchange the exchange
+ * @param callback the callback
+ */
+ void prepare(Exchange exchange, AsyncCallback callback);
+
+ /**
+ * Resets the task after its done and can be reused for another exchange.
+ */
+ void reset();
+}
diff --git
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/PooledExchangeTaskFactory.java
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/PooledExchangeTaskFactory.java
new file mode 100644
index 0000000..b90e9f5
--- /dev/null
+++
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/PooledExchangeTaskFactory.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.Exchange;
+import org.apache.camel.spi.PooledObjectFactory;
+
+/**
+ * Factory to create {@link PooledExchangeTask}.
+ *
+ * @see PooledExchangeTask
+ */
+public interface PooledExchangeTaskFactory extends
PooledObjectFactory<PooledExchangeTask> {
+
+ /**
+ * Creates a new task to use for processing the exchange.
+ *
+ * @param exchange the current exchange
+ * @param callback the callback for the exchange
+ * @return the task
+ */
+ PooledExchangeTask create(Exchange exchange, AsyncCallback callback);
+
+ /**
+ * Attempts to acquire a pooled task to use for processing the exchange,
if not possible then a new task is created.
+ *
+ * @param exchange the current exchange
+ * @param callback the callback for the exchange
+ * @return the task
+ */
+ PooledExchangeTask acquire(Exchange exchange, AsyncCallback callback);
+
+ /**
+ * Releases the task after its done being used
+ *
+ * @param task the task
+ * @return true if the task was released, and false if the task
failed to be released or no space in pool, and
+ * the task was discarded.
+ */
+ boolean release(PooledExchangeTask task);
+}
diff --git
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/PooledTaskFactory.java
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/PooledTaskFactory.java
new file mode 100644
index 0000000..3775032
--- /dev/null
+++
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/PooledTaskFactory.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.Exchange;
+import org.apache.camel.support.PooledObjectFactorySupport;
+
+public abstract class PooledTaskFactory extends
PooledObjectFactorySupport<PooledExchangeTask>
+ implements PooledExchangeTaskFactory {
+
+ @Override
+ public PooledExchangeTask acquire() {
+ return pool.poll();
+ }
+
+ public PooledExchangeTask acquire(Exchange exchange, AsyncCallback
callback) {
+ PooledExchangeTask task = acquire();
+ if (task == null) {
+ if (statistics.isStatisticsEnabled()) {
+ statistics.created.increment();
+ }
+ task = create(exchange, callback);
+ } else {
+ if (statistics.isStatisticsEnabled()) {
+ statistics.acquired.increment();
+ }
+ }
+ task.prepare(exchange, callback);
+ return task;
+ }
+
+ @Override
+ public boolean release(PooledExchangeTask task) {
+ try {
+ task.reset();
+ boolean inserted = pool.offer(task);
+ if (statistics.isStatisticsEnabled()) {
+ if (inserted) {
+ statistics.released.increment();
+ } else {
+ statistics.discarded.increment();
+ }
+ }
+ return inserted;
+ } catch (Throwable e) {
+ if (statistics.isStatisticsEnabled()) {
+ statistics.discarded.increment();
+ }
+ return false;
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "PooledTaskFactory[capacity: " + getCapacity() + "]";
+ }
+}
diff --git
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/PrototypeTaskFactory.java
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/PrototypeTaskFactory.java
new file mode 100644
index 0000000..bd113d5
--- /dev/null
+++
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/PrototypeTaskFactory.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.Exchange;
+import org.apache.camel.support.PrototypeObjectFactorySupport;
+
+public abstract class PrototypeTaskFactory extends
PrototypeObjectFactorySupport<PooledExchangeTask>
+ implements PooledExchangeTaskFactory {
+
+ @Override
+ public PooledExchangeTask acquire(Exchange exchange, AsyncCallback
callback) {
+ PooledExchangeTask task = create(exchange, callback);
+ task.prepare(exchange, callback);
+ return task;
+ }
+
+ @Override
+ public PooledExchangeTask acquire() {
+ throw new UnsupportedOperationException("Not in use");
+ }
+
+ @Override
+ public boolean release(PooledExchangeTask task) {
+ // not pooled so no need to reset task
+ return true;
+ }
+
+ @Override
+ public String toString() {
+ return "PrototypeTaskFactory";
+ }
+}
diff --git
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java
index 3c0eaf3..ad0ac3f 100644
---
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java
+++
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/errorhandler/RedeliveryErrorHandler.java
@@ -38,6 +38,10 @@ import org.apache.camel.Predicate;
import org.apache.camel.Processor;
import org.apache.camel.Route;
import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.processor.PooledExchangeTask;
+import org.apache.camel.processor.PooledExchangeTaskFactory;
+import org.apache.camel.processor.PooledTaskFactory;
+import org.apache.camel.processor.PrototypeTaskFactory;
import org.apache.camel.spi.AsyncProcessorAwaitManager;
import org.apache.camel.spi.CamelLogger;
import org.apache.camel.spi.ErrorHandlerRedeliveryCustomizer;
@@ -69,6 +73,9 @@ public abstract class RedeliveryErrorHandler extends
ErrorHandlerSupport
private static final Logger LOG =
LoggerFactory.getLogger(RedeliveryErrorHandler.class);
+ // factory
+ protected PooledExchangeTaskFactory taskFactory;
+
// state
protected final AtomicInteger redeliverySleepCounter = new AtomicInteger();
protected ScheduledExecutorService executorService;
@@ -169,12 +176,8 @@ public abstract class RedeliveryErrorHandler extends
ErrorHandlerSupport
@Override
public boolean process(final Exchange exchange, final AsyncCallback
callback) {
// Create the redelivery task object for this exchange (optimize to
only create task can do redelivery or not)
- Runnable task;
- if (simpleTask) {
- task = new SimpleTask(exchange, callback);
- } else {
- task = new RedeliveryTask(exchange, callback);
- }
+ Runnable task = taskFactory.acquire(exchange, callback);
+
// Run it
if (exchange.isTransacted()) {
reactiveExecutor.scheduleSync(task);
@@ -345,14 +348,18 @@ public abstract class RedeliveryErrorHandler extends
ErrorHandlerSupport
/**
* Simple task to perform calling the processor with no redelivery support
*/
- protected class SimpleTask implements Runnable, AsyncCallback {
- private final ExtendedExchange exchange;
- private final AsyncCallback callback;
- private boolean first = true;
+ protected class SimpleTask implements PooledExchangeTask, Runnable,
AsyncCallback {
+ private ExtendedExchange exchange;
+ private AsyncCallback callback;
+ private boolean first;
- SimpleTask(Exchange exchange, AsyncCallback callback) {
+ public SimpleTask() {
+ }
+
+ public void prepare(Exchange exchange, AsyncCallback callback) {
this.exchange = (ExtendedExchange) exchange;
this.callback = callback;
+ this.first = true;
}
@Override
@@ -360,6 +367,12 @@ public abstract class RedeliveryErrorHandler extends
ErrorHandlerSupport
return "SimpleTask";
}
+ public void reset() {
+ this.exchange = null;
+ this.callback = null;
+ this.first = true;
+ }
+
@Override
public void done(boolean doneSync) {
// the run method decides what to do when we are done
@@ -385,7 +398,9 @@ public abstract class RedeliveryErrorHandler extends
ErrorHandlerSupport
if (exchange.getException() == null) {
exchange.setException(new RejectedExecutionException());
}
- callback.done(false);
+ AsyncCallback cb = callback;
+ taskFactory.release(this);
+ cb.done(false);
return;
}
if (exchange.isInterrupted()) {
@@ -396,7 +411,9 @@ public abstract class RedeliveryErrorHandler extends
ErrorHandlerSupport
}
exchange.setRouteStop(true);
// we should not continue routing so call callback
- callback.done(false);
+ AsyncCallback cb = callback;
+ taskFactory.release(this);
+ cb.done(false);
return;
}
@@ -413,14 +430,18 @@ public abstract class RedeliveryErrorHandler extends
ErrorHandlerSupport
onExceptionOccurred();
prepareExchangeAfterFailure(exchange);
// we do not support redelivery so continue callback
- reactiveExecutor.schedule(callback);
+ AsyncCallback cb = callback;
+ taskFactory.release(this);
+ reactiveExecutor.schedule(cb);
} else if (first) {
// first time call the target processor
first = false;
outputAsync.process(exchange, this);
} else {
// we are done so continue callback
- reactiveExecutor.schedule(callback);
+ AsyncCallback cb = callback;
+ taskFactory.release(this);
+ reactiveExecutor.schedule(cb);
}
}
@@ -585,15 +606,16 @@ public abstract class RedeliveryErrorHandler extends
ErrorHandlerSupport
/**
* Task to perform calling the processor and handling redelivery if it
fails (more advanced than ProcessTask)
*/
- protected class RedeliveryTask implements Runnable {
- private final Exchange original;
- private final ExtendedExchange exchange;
- private final AsyncCallback callback;
+ protected class RedeliveryTask implements PooledExchangeTask, Runnable {
+ // state
+ private Exchange original;
+ private ExtendedExchange exchange;
+ private AsyncCallback callback;
private int redeliveryCounter;
private long redeliveryDelay;
- private Predicate retryWhilePredicate;
// default behavior which can be overloaded on a per exception basis
+ private Predicate retryWhilePredicate;
private RedeliveryPolicy currentRedeliveryPolicy;
private Processor failureProcessor;
private Processor onRedeliveryProcessor;
@@ -603,7 +625,16 @@ public abstract class RedeliveryErrorHandler extends
ErrorHandlerSupport
private boolean useOriginalInMessage;
private boolean useOriginalInBody;
- public RedeliveryTask(Exchange exchange, AsyncCallback callback) {
+ public RedeliveryTask() {
+ }
+
+ @Override
+ public String toString() {
+ return "RedeliveryTask";
+ }
+
+ @Override
+ public void prepare(Exchange exchange, AsyncCallback callback) {
this.retryWhilePredicate = retryWhilePolicy;
this.currentRedeliveryPolicy = redeliveryPolicy;
this.handledPredicate = getDefaultHandledPredicate();
@@ -611,7 +642,6 @@ public abstract class RedeliveryErrorHandler extends
ErrorHandlerSupport
this.useOriginalInBody = useOriginalBodyPolicy;
this.onRedeliveryProcessor = redeliveryProcessor;
this.onExceptionProcessor =
RedeliveryErrorHandler.this.onExceptionProcessor;
-
// do a defensive copy of the original Exchange, which is needed
for redelivery so we can ensure the
// original Exchange is being redelivered, and not a mutated
Exchange
this.original = redeliveryEnabled ?
defensiveCopyExchangeIfNeeded(exchange) : null;
@@ -620,8 +650,20 @@ public abstract class RedeliveryErrorHandler extends
ErrorHandlerSupport
}
@Override
- public String toString() {
- return "RedeliveryTask";
+ public void reset() {
+ this.retryWhilePredicate = null;
+ this.currentRedeliveryPolicy = null;
+ this.handledPredicate = null;
+ this.continuedPredicate = null;
+ this.useOriginalInMessage = false;
+ this.useOriginalInBody = false;
+ this.onRedeliveryProcessor = null;
+ this.onExceptionProcessor = null;
+ this.original = null;
+ this.exchange = null;
+ this.callback = null;
+ this.redeliveryCounter = 0;
+ this.redeliveryDelay = 0;
}
/**
@@ -635,14 +677,17 @@ public abstract class RedeliveryErrorHandler extends
ErrorHandlerSupport
if (exchange.getException() == null) {
exchange.setException(new RejectedExecutionException());
}
- callback.done(false);
+ AsyncCallback cb = callback;
+ taskFactory.release(this);
+ cb.done(false);
return;
}
try {
doRun();
} catch (Throwable e) {
- // unexpected exception during running so break out
+ // unexpected exception during running so set exception and
trigger callback
+ // (do not do taskFactory.release as that happens later)
exchange.setException(e);
callback.done(false);
}
@@ -804,7 +849,9 @@ public abstract class RedeliveryErrorHandler extends
ErrorHandlerSupport
// only process if the exchange hasn't failed
// and it has not been handled by the error processor
if (isDone(exchange)) {
- reactiveExecutor.schedule(callback);
+ AsyncCallback cb = callback;
+ taskFactory.release(this);
+ reactiveExecutor.schedule(cb);
} else {
// error occurred so loop back around which we do by
invoking the processAsyncErrorHandler
reactiveExecutor.schedule(this);
@@ -1043,6 +1090,7 @@ public abstract class RedeliveryErrorHandler extends
ErrorHandlerSupport
// because you can continue and still let the failure processor do
some routing
// before continue in the main route.
boolean allowFailureProcessor = !shouldContinue ||
!isDeadLetterChannel;
+ final boolean fHandled = handled;
if (allowFailureProcessor && processor != null) {
@@ -1108,6 +1156,23 @@ public abstract class RedeliveryErrorHandler extends
ErrorHandlerSupport
} finally {
// if the fault was handled asynchronously, this
should be reflected in the callback as well
reactiveExecutor.schedule(callback);
+
+ // create log message
+ String msg = "Failed delivery for " +
ExchangeHelper.logIds(exchange);
+ msg = msg + ". Exhausted after delivery attempt: " +
redeliveryCounter + " caught: " + caught;
+ if (processor != null) {
+ if (isDeadLetterChannel && deadLetterUri != null) {
+ msg = msg + ". Handled by DeadLetterChannel:
[" + URISupport.sanitizeUri(deadLetterUri) + "]";
+ } else {
+ msg = msg + ". Processed by failure processor:
" + processor;
+ }
+ }
+
+ // log that we failed delivery as we are exhausted
+ logFailedDelivery(false, false, fHandled, false,
isDeadLetterChannel, exchange, msg, null);
+
+ // we are done so we can release the task
+ taskFactory.release(this);
}
});
} else {
@@ -1127,22 +1192,25 @@ public abstract class RedeliveryErrorHandler extends
ErrorHandlerSupport
} finally {
// callback we are done
reactiveExecutor.schedule(callback);
- }
- }
- // create log message
- String msg = "Failed delivery for " +
ExchangeHelper.logIds(exchange);
- msg = msg + ". Exhausted after delivery attempt: " +
redeliveryCounter + " caught: " + caught;
- if (processor != null) {
- if (isDeadLetterChannel && deadLetterUri != null) {
- msg = msg + ". Handled by DeadLetterChannel: [" +
URISupport.sanitizeUri(deadLetterUri) + "]";
- } else {
- msg = msg + ". Processed by failure processor: " +
processor;
+ // create log message
+ String msg = "Failed delivery for " +
ExchangeHelper.logIds(exchange);
+ msg = msg + ". Exhausted after delivery attempt: " +
redeliveryCounter + " caught: " + caught;
+ if (processor != null) {
+ if (isDeadLetterChannel && deadLetterUri != null) {
+ msg = msg + ". Handled by DeadLetterChannel: [" +
URISupport.sanitizeUri(deadLetterUri) + "]";
+ } else {
+ msg = msg + ". Processed by failure processor: " +
processor;
+ }
+ }
+
+ // log that we failed delivery as we are exhausted
+ logFailedDelivery(false, false, fHandled, false,
isDeadLetterChannel, exchange, msg, null);
+
+ // we are done so we can release the task
+ taskFactory.release(this);
}
}
-
- // log that we failed delivery as we are exhausted
- logFailedDelivery(false, false, handled, false,
isDeadLetterChannel, exchange, msg, null);
}
protected void prepareExchangeAfterFailure(
@@ -1503,8 +1571,6 @@ public abstract class RedeliveryErrorHandler extends
ErrorHandlerSupport
@Override
protected void doStart() throws Exception {
- ServiceHelper.startService(output, outputAsync, deadLetter);
-
// determine if redeliver is enabled or not
redeliveryEnabled = determineIfRedeliveryIsEnabled();
if (LOG.isTraceEnabled()) {
@@ -1531,6 +1597,28 @@ public abstract class RedeliveryErrorHandler extends
ErrorHandlerSupport
// however if we dont then its less memory overhead (and a bit less
cpu) of using the simple task
simpleTask = deadLetter == null && !redeliveryEnabled &&
(exceptionPolicies == null || exceptionPolicies.isEmpty())
&& onPrepareProcessor == null;
+
+ boolean pooled =
camelContext.adapt(ExtendedCamelContext.class).getExchangeFactory().isPooled();
+ if (pooled) {
+ taskFactory = new PooledTaskFactory() {
+ @Override
+ public PooledExchangeTask create(Exchange exchange,
AsyncCallback callback) {
+ return simpleTask ? new SimpleTask() : new
RedeliveryTask();
+ }
+ };
+ int capacity =
camelContext.adapt(ExtendedCamelContext.class).getExchangeFactory().getCapacity();
+ taskFactory.setCapacity(capacity);
+ } else {
+ taskFactory = new PrototypeTaskFactory() {
+ @Override
+ public PooledExchangeTask create(Exchange exchange,
AsyncCallback callback) {
+ return simpleTask ? new SimpleTask() : new
RedeliveryTask();
+ }
+ };
+ }
+ LOG.trace("Using TaskFactory: {}", taskFactory);
+
+ ServiceHelper.startService(taskFactory, output, outputAsync,
deadLetter);
}
@Override
@@ -1542,6 +1630,7 @@ public abstract class RedeliveryErrorHandler extends
ErrorHandlerSupport
@Override
protected void doShutdown() throws Exception {
- ServiceHelper.stopAndShutdownServices(deadLetter, output, outputAsync);
+ ServiceHelper.stopAndShutdownServices(deadLetter, output, outputAsync,
taskFactory);
}
+
}
diff --git
a/core/camel-support/src/main/java/org/apache/camel/support/PooledObjectFactorySupport.java
b/core/camel-support/src/main/java/org/apache/camel/support/PooledObjectFactorySupport.java
new file mode 100644
index 0000000..64867b3
--- /dev/null
+++
b/core/camel-support/src/main/java/org/apache/camel/support/PooledObjectFactorySupport.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.support;
+
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.LongAdder;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.spi.PooledObjectFactory;
+import org.apache.camel.support.service.ServiceSupport;
+
+public abstract class PooledObjectFactorySupport<T> extends ServiceSupport
implements PooledObjectFactory<T> {
+
+ protected final UtilizationStatistics statistics = new
UtilizationStatistics();
+
+ protected CamelContext camelContext;
+ protected BlockingQueue<T> pool;
+ protected int capacity = 100;
+
+ @Override
+ protected void doBuild() throws Exception {
+ super.doBuild();
+ this.pool = new ArrayBlockingQueue<>(capacity);
+ }
+
+ @Override
+ public CamelContext getCamelContext() {
+ return camelContext;
+ }
+
+ @Override
+ public void setCamelContext(CamelContext camelContext) {
+ this.camelContext = camelContext;
+ }
+
+ @Override
+ public boolean isStatisticsEnabled() {
+ return statistics.isStatisticsEnabled();
+ }
+
+ @Override
+ public void setStatisticsEnabled(boolean statisticsEnabled) {
+ statistics.setStatisticsEnabled(statisticsEnabled);
+ }
+
+ @Override
+ public int getSize() {
+ if (pool != null) {
+ return pool.size();
+ } else {
+ return 0;
+ }
+ }
+
+ @Override
+ public int getCapacity() {
+ return capacity;
+ }
+
+ @Override
+ public void setCapacity(int capacity) {
+ this.capacity = capacity;
+ }
+
+ @Override
+ public void resetStatistics() {
+ statistics.reset();
+ }
+
+ @Override
+ public void purge() {
+ pool.clear();
+ }
+
+ @Override
+ public Statistics getStatistics() {
+ return statistics;
+ }
+
+ @Override
+ protected void doShutdown() throws Exception {
+ super.doShutdown();
+ statistics.reset();
+ pool.clear();
+ }
+
+ /**
+ * Represents utilization statistics
+ */
+ protected final class UtilizationStatistics implements
PooledObjectFactory.Statistics {
+
+ public final LongAdder created = new LongAdder();
+ public final LongAdder acquired = new LongAdder();
+ public final LongAdder released = new LongAdder();
+ public final LongAdder discarded = new LongAdder();
+ private boolean statisticsEnabled;
+
+ @Override
+ public void reset() {
+ created.reset();
+ acquired.reset();
+ released.reset();
+ discarded.reset();
+ }
+
+ @Override
+ public long getCreatedCounter() {
+ return created.longValue();
+ }
+
+ @Override
+ public long getAcquiredCounter() {
+ return acquired.longValue();
+ }
+
+ @Override
+ public long getReleasedCounter() {
+ return released.longValue();
+ }
+
+ @Override
+ public long getDiscardedCounter() {
+ return discarded.longValue();
+ }
+
+ @Override
+ public boolean isStatisticsEnabled() {
+ return statisticsEnabled;
+ }
+
+ @Override
+ public void setStatisticsEnabled(boolean statisticsEnabled) {
+ this.statisticsEnabled = statisticsEnabled;
+ }
+ }
+
+}
diff --git
a/core/camel-support/src/main/java/org/apache/camel/support/PrototypeObjectFactorySupport.java
b/core/camel-support/src/main/java/org/apache/camel/support/PrototypeObjectFactorySupport.java
new file mode 100644
index 0000000..8c038e4
--- /dev/null
+++
b/core/camel-support/src/main/java/org/apache/camel/support/PrototypeObjectFactorySupport.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.support;
+
+import java.util.concurrent.atomic.LongAdder;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.spi.PooledObjectFactory;
+import org.apache.camel.support.service.ServiceSupport;
+
+/**
+ * {@link org.apache.camel.spi.PooledObjectFactory} that creates a new
instance (does not pool).
+ */
+public abstract class PrototypeObjectFactorySupport<T> extends ServiceSupport
implements PooledObjectFactory<T> {
+
+ protected final UtilizationStatistics statistics = new
UtilizationStatistics();
+ private CamelContext camelContext;
+
+ @Override
+ public CamelContext getCamelContext() {
+ return camelContext;
+ }
+
+ @Override
+ public void setCamelContext(CamelContext camelContext) {
+ this.camelContext = camelContext;
+ }
+
+ @Override
+ public boolean isStatisticsEnabled() {
+ return statistics.isStatisticsEnabled();
+ }
+
+ @Override
+ public void setStatisticsEnabled(boolean statisticsEnabled) {
+ statistics.setStatisticsEnabled(statisticsEnabled);
+ }
+
+ @Override
+ public int getSize() {
+ return 0;
+ }
+
+ @Override
+ public int getCapacity() {
+ return 0;
+ }
+
+ @Override
+ public void setCapacity(int capacity) {
+ // not in use
+ }
+
+ @Override
+ public void resetStatistics() {
+ statistics.reset();
+ }
+
+ @Override
+ public void purge() {
+ // not in use
+ }
+
+ @Override
+ public Statistics getStatistics() {
+ return statistics;
+ }
+
+ @Override
+ protected void doShutdown() throws Exception {
+ super.doShutdown();
+ statistics.reset();
+ }
+
+ /**
+ * Represents utilization statistics
+ */
+ protected final class UtilizationStatistics implements Statistics {
+
+ public final LongAdder created = new LongAdder();
+ public final LongAdder acquired = new LongAdder();
+ public final LongAdder released = new LongAdder();
+ public final LongAdder discarded = new LongAdder();
+ private boolean statisticsEnabled;
+
+ @Override
+ public void reset() {
+ created.reset();
+ acquired.reset();
+ released.reset();
+ discarded.reset();
+ }
+
+ @Override
+ public long getCreatedCounter() {
+ return created.longValue();
+ }
+
+ @Override
+ public long getAcquiredCounter() {
+ return acquired.longValue();
+ }
+
+ @Override
+ public long getReleasedCounter() {
+ return released.longValue();
+ }
+
+ @Override
+ public long getDiscardedCounter() {
+ return discarded.longValue();
+ }
+
+ @Override
+ public boolean isStatisticsEnabled() {
+ return statisticsEnabled;
+ }
+
+ @Override
+ public void setStatisticsEnabled(boolean statisticsEnabled) {
+ this.statisticsEnabled = statisticsEnabled;
+ }
+ }
+
+}