This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch exchange-factory
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/exchange-factory by this push:
new 8b92a7f CAMEL-16222: PooledExchangeFactory experiment
8b92a7f is described below
commit 8b92a7f340a335c2968fea9179eab386d22c8b75
Author: Claus Ibsen <[email protected]>
AuthorDate: Thu Feb 18 14:59:18 2021 +0100
CAMEL-16222: PooledExchangeFactory experiment
---
.../camel/catalog/components/vertx-http.json | 2 +
.../camel/component/dataset/DataSetConsumer.java | 30 ++++++++++---
.../apache/camel/component/file/FileConsumer.java | 9 ++++
.../camel/component/file/GenericFileConsumer.java | 8 +++-
.../component/file/remote/RemoteFileConsumer.java | 9 ++++
.../apache/camel/component/http/HttpProducer.java | 2 +-
.../component/scheduler/SchedulerConsumer.java | 19 ++++++++-
.../camel/component/timer/TimerConsumer.java | 14 +++++--
.../src/main/java/org/apache/camel/Consumer.java | 17 ++++++++
.../src/main/java/org/apache/camel/Endpoint.java | 7 ++++
.../java/org/apache/camel/ExtendedExchange.java | 2 +
.../java/org/apache/camel/spi/ExchangeFactory.java | 12 +++++-
.../main/java/org/apache/camel/spi/UnitOfWork.java | 7 ++++
.../camel/impl/engine/DefaultExchangeFactory.java | 5 ++-
.../camel/impl/engine/DefaultUnitOfWork.java | 12 ++++++
.../camel/impl/engine/PooledExchangeFactory.java | 49 +++++++++++++++-------
.../org/apache/camel/builder/ExchangeBuilder.java | 4 +-
.../apache/camel/processor/WireTapProcessor.java | 7 +---
.../component/dataset/DataSetTestEndpointTest.java | 12 +++++-
.../org/apache/camel/support/DefaultConsumer.java | 18 ++++++++
.../org/apache/camel/support/DefaultEndpoint.java | 23 ++++------
.../org/apache/camel/support/DefaultExchange.java | 16 +++++--
.../support/DefaultInterceptSendToEndpoint.java | 5 +++
.../camel/support/PollingConsumerSupport.java | 14 +++++--
24 files changed, 244 insertions(+), 59 deletions(-)
diff --git
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/vertx-http.json
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/vertx-http.json
index 852e081..069eb8c 100644
---
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/vertx-http.json
+++
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/vertx-http.json
@@ -23,6 +23,7 @@
},
"componentProperties": {
"lazyStartProducer": { "kind": "property", "displayName": "Lazy Start
Producer", "group": "producer", "label": "producer", "required": false, "type":
"boolean", "javaType": "boolean", "deprecated": false, "autowired": false,
"secret": false, "defaultValue": false, "description": "Whether the producer
should be started lazy (on the first message). By starting lazy you can use
this to allow CamelContext and routes to startup in situations where a producer
may otherwise fail during star [...]
+ "responsePayloadAsByteArray": { "kind": "property", "displayName":
"Response Payload As Byte Array", "group": "producer", "label": "producer",
"required": false, "type": "boolean", "javaType": "boolean", "deprecated":
false, "autowired": false, "secret": false, "defaultValue": true,
"description": "Whether the response body should be byte or as
io.vertx.core.buffer.Buffer" },
"allowJavaSerializedObject": { "kind": "property", "displayName": "Allow
Java Serialized Object", "group": "advanced", "label": "advanced", "required":
false, "type": "boolean", "javaType": "boolean", "deprecated": false,
"autowired": false, "secret": false, "defaultValue": false, "description":
"Whether to allow java serialization when a request has the Content-Type
application\/x-java-serialized-object This is disabled by default. If you
enable this, be aware that Java will deseria [...]
"autowiredEnabled": { "kind": "property", "displayName": "Autowired
Enabled", "group": "advanced", "label": "advanced", "required": false, "type":
"boolean", "javaType": "boolean", "deprecated": false, "autowired": false,
"secret": false, "defaultValue": true, "description": "Whether autowiring is
enabled. This is used for automatic autowiring options (the option must be
marked as autowired) by looking up in the registry to find if there is a single
instance of matching type, which t [...]
"vertx": { "kind": "property", "displayName": "Vertx", "group":
"advanced", "label": "advanced", "required": false, "type": "object",
"javaType": "io.vertx.core.Vertx", "deprecated": false, "autowired": false,
"secret": false, "description": "To use an existing vertx instead of creating a
new instance" },
@@ -48,6 +49,7 @@
"httpMethod": { "kind": "parameter", "displayName": "Http Method",
"group": "producer", "label": "producer", "required": false, "type": "object",
"javaType": "io.vertx.core.http.HttpMethod", "enum": [ "OPTIONS", "GET",
"HEAD", "POST", "PUT", "DELETE", "TRACE", "CONNECT", "PATCH", "OTHER" ],
"deprecated": false, "autowired": false, "secret": false, "configurationClass":
"org.apache.camel.component.vertx.http.VertxHttpConfiguration",
"configurationField": "configuration", "description" [...]
"lazyStartProducer": { "kind": "parameter", "displayName": "Lazy Start
Producer", "group": "producer", "label": "producer", "required": false, "type":
"boolean", "javaType": "boolean", "deprecated": false, "autowired": false,
"secret": false, "defaultValue": false, "description": "Whether the producer
should be started lazy (on the first message). By starting lazy you can use
this to allow CamelContext and routes to startup in situations where a producer
may otherwise fail during sta [...]
"okStatusCodeRange": { "kind": "parameter", "displayName": "Ok Status Code
Range", "group": "producer", "label": "producer", "required": false, "type":
"string", "javaType": "java.lang.String", "deprecated": false, "autowired":
false, "secret": false, "defaultValue": "200-299", "configurationClass":
"org.apache.camel.component.vertx.http.VertxHttpConfiguration",
"configurationField": "configuration", "description": "The status codes which
are considered a success response. The values [...]
+ "responsePayloadAsByteArray": { "kind": "parameter", "displayName":
"Response Payload As Byte Array", "group": "producer", "label": "producer",
"required": false, "type": "boolean", "javaType": "boolean", "deprecated":
false, "autowired": false, "secret": false, "defaultValue": true,
"configurationClass":
"org.apache.camel.component.vertx.http.VertxHttpConfiguration",
"configurationField": "configuration", "description": "Whether the response
body should be byte or as io.vertx.core.b [...]
"sessionManagement": { "kind": "parameter", "displayName": "Session
Management", "group": "producer", "label": "producer", "required": false,
"type": "boolean", "javaType": "boolean", "deprecated": false, "autowired":
false, "secret": false, "defaultValue": false, "configurationClass":
"org.apache.camel.component.vertx.http.VertxHttpConfiguration",
"configurationField": "configuration", "description": "Enables session
management via WebClientSession. By default the client is configur [...]
"throwExceptionOnFailure": { "kind": "parameter", "displayName": "Throw
Exception On Failure", "group": "producer", "label": "producer", "required":
false, "type": "boolean", "javaType": "boolean", "deprecated": false,
"autowired": false, "secret": false, "defaultValue": true,
"configurationClass":
"org.apache.camel.component.vertx.http.VertxHttpConfiguration",
"configurationField": "configuration", "description": "Disable throwing
HttpOperationFailedException in case of failed respo [...]
"timeout": { "kind": "parameter", "displayName": "Timeout", "group":
"producer", "label": "producer", "required": false, "type": "integer",
"javaType": "long", "deprecated": false, "autowired": false, "secret": false,
"defaultValue": -1, "configurationClass":
"org.apache.camel.component.vertx.http.VertxHttpConfiguration",
"configurationField": "configuration", "description": "The amount of time in
milliseconds after which if the request does not return any data within the
timeout per [...]
diff --git
a/components/camel-dataset/src/main/java/org/apache/camel/component/dataset/DataSetConsumer.java
b/components/camel-dataset/src/main/java/org/apache/camel/component/dataset/DataSetConsumer.java
index 43fbb4b..81fd019 100644
---
a/components/camel-dataset/src/main/java/org/apache/camel/component/dataset/DataSetConsumer.java
+++
b/components/camel-dataset/src/main/java/org/apache/camel/component/dataset/DataSetConsumer.java
@@ -20,6 +20,7 @@ import java.util.concurrent.ExecutorService;
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
+import org.apache.camel.Message;
import org.apache.camel.Processor;
import org.apache.camel.spi.CamelLogger;
import org.apache.camel.support.DefaultConsumer;
@@ -86,10 +87,27 @@ public class DataSetConsumer extends DefaultConsumer {
}
}
+ /**
+ * Creates a message exchange for the given index in the {@link DataSet}
+ */
+ protected Exchange createExchange(long messageIndex) throws Exception {
+ Exchange exchange = createExchange(false);
+
+ endpoint.getDataSet().populateMessage(exchange, messageIndex);
+
+ if (!endpoint.getDataSetIndex().equals("off")) {
+ Message in = exchange.getIn();
+ in.setHeader(Exchange.DATASET_INDEX, messageIndex);
+ }
+
+ return exchange;
+ }
+
protected void sendMessages(long startIndex, long endIndex) {
- try {
- for (long i = startIndex; i < endIndex; i++) {
- Exchange exchange = endpoint.createExchange(i);
+ for (long i = startIndex; i < endIndex; i++) {
+ Exchange exchange = null;
+ try {
+ exchange = createExchange(i);
getProcessor().process(exchange);
try {
@@ -104,9 +122,11 @@ public class DataSetConsumer extends DefaultConsumer {
if (reporter != null) {
reporter.process(exchange);
}
+ } catch (Exception e) {
+ handleException(e);
+ } finally {
+ releaseExchange(exchange);
}
- } catch (Exception e) {
- handleException(e);
}
}
diff --git
a/components/camel-file/src/main/java/org/apache/camel/component/file/FileConsumer.java
b/components/camel-file/src/main/java/org/apache/camel/component/file/FileConsumer.java
index 7b9ee71..50087fb 100644
---
a/components/camel-file/src/main/java/org/apache/camel/component/file/FileConsumer.java
+++
b/components/camel-file/src/main/java/org/apache/camel/component/file/FileConsumer.java
@@ -57,6 +57,15 @@ public class FileConsumer extends GenericFileConsumer<File> {
}
@Override
+ protected Exchange createExchange(GenericFile<File> file) {
+ Exchange exchange = createExchange(true);
+ if (file != null) {
+ file.bindToExchange(exchange, getEndpoint().isProbeContentType());
+ }
+ return exchange;
+ }
+
+ @Override
protected boolean pollDirectory(String fileName, List<GenericFile<File>>
fileList, int depth) {
LOG.trace("pollDirectory from fileName: {}", fileName);
diff --git
a/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
b/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
index 3e6a7b4..7aa46b3 100644
---
a/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
+++
b/components/camel-file/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
@@ -102,6 +102,11 @@ public abstract class GenericFileConsumer<T> extends
ScheduledBatchPollingConsum
}
/**
+ * Creates the exchange from the polled file
+ */
+ protected abstract Exchange createExchange(GenericFile<T> file);
+
+ /**
* Poll for files
*/
@Override
@@ -164,7 +169,7 @@ public abstract class GenericFileConsumer<T> extends
ScheduledBatchPollingConsum
// use a linked list so we can dequeue the exchanges
LinkedList<Exchange> exchanges = new LinkedList<>();
for (GenericFile<T> file : files) {
- Exchange exchange = endpoint.createExchange(file);
+ Exchange exchange = createExchange(file);
endpoint.configureExchange(exchange);
endpoint.configureMessage(file, exchange.getIn());
exchanges.add(exchange);
@@ -267,6 +272,7 @@ public abstract class GenericFileConsumer<T> extends
ScheduledBatchPollingConsum
GenericFile<?> file =
exchange.getProperty(FileComponent.FILE_EXCHANGE_FILE, GenericFile.class);
String key = file.getAbsoluteFilePath();
endpoint.getInProgressRepository().remove(key);
+ releaseExchange(exchange);
}
}
diff --git
a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java
b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java
index d428e24..83e2292 100644
---
a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java
+++
b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java
@@ -59,6 +59,15 @@ public abstract class RemoteFileConsumer<T> extends
GenericFileConsumer<T> {
}
@Override
+ protected Exchange createExchange(GenericFile<T> file) {
+ Exchange answer = createExchange(true);
+ if (file != null) {
+ file.bindToExchange(answer);
+ }
+ return answer;
+ }
+
+ @Override
protected boolean prePollCheck() throws Exception {
if (LOG.isTraceEnabled()) {
LOG.trace("prePollCheck on {}",
getEndpoint().getConfiguration().remoteServerInformation());
diff --git
a/components/camel-http/src/main/java/org/apache/camel/component/http/HttpProducer.java
b/components/camel-http/src/main/java/org/apache/camel/component/http/HttpProducer.java
index 41d5205..af521da 100644
---
a/components/camel-http/src/main/java/org/apache/camel/component/http/HttpProducer.java
+++
b/components/camel-http/src/main/java/org/apache/camel/component/http/HttpProducer.java
@@ -623,7 +623,7 @@ public class HttpProducer extends DefaultProducer {
try {
if (body == null) {
return null;
- // special optimized for using these 3 type converters for common
message payload types
+ // special optimized for using these 3 type converters for
common message payload types
} else if (body instanceof byte[]) {
answer = HttpEntityConverter.toHttpEntity((byte[]) body,
exchange);
} else if (body instanceof InputStream) {
diff --git
a/components/camel-scheduler/src/main/java/org/apache/camel/component/scheduler/SchedulerConsumer.java
b/components/camel-scheduler/src/main/java/org/apache/camel/component/scheduler/SchedulerConsumer.java
index 240e673..04ab2b2 100644
---
a/components/camel-scheduler/src/main/java/org/apache/camel/component/scheduler/SchedulerConsumer.java
+++
b/components/camel-scheduler/src/main/java/org/apache/camel/component/scheduler/SchedulerConsumer.java
@@ -17,6 +17,7 @@
package org.apache.camel.component.scheduler;
import java.util.Date;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.camel.AsyncCallback;
import org.apache.camel.Exchange;
@@ -44,7 +45,7 @@ public class SchedulerConsumer extends ScheduledPollConsumer {
}
protected int sendTimerExchange() {
- final Exchange exchange = getEndpoint().createExchange();
+ final Exchange exchange = createExchange(false);
exchange.setProperty(Exchange.TIMER_NAME, getEndpoint().getName());
Date now = new Date();
@@ -55,15 +56,28 @@ public class SchedulerConsumer extends
ScheduledPollConsumer {
}
if (!getEndpoint().isSynchronous()) {
- getAsyncProcessor().process(exchange, new AsyncCallback() {
+ final AtomicBoolean polled = new AtomicBoolean(true);
+ boolean doneSync = getAsyncProcessor().process(exchange, new
AsyncCallback() {
@Override
public void done(boolean doneSync) {
// handle any thrown exception
if (exchange.getException() != null) {
getExceptionHandler().handleException("Error
processing exchange", exchange, exchange.getException());
}
+ boolean wasPolled =
exchange.getProperty(Exchange.SCHEDULER_POLLED_MESSAGES, true, boolean.class);
+ if (!wasPolled) {
+ polled.set(false);
+ }
+
+ // sync wil release outside this callback
+ if (!doneSync) {
+ releaseExchange(exchange);
+ }
}
});
+ if (!doneSync) {
+ return polled.get() ? 1 : 0;
+ }
} else {
try {
getProcessor().process(exchange);
@@ -81,6 +95,7 @@ public class SchedulerConsumer extends ScheduledPollConsumer {
// for example to overrule and indicate no message was polled, which
can affect the scheduler
// to leverage backoff on idle etc.
boolean polled =
exchange.getProperty(Exchange.SCHEDULER_POLLED_MESSAGES, true, boolean.class);
+ releaseExchange(exchange);
return polled ? 1 : 0;
}
diff --git
a/components/camel-timer/src/main/java/org/apache/camel/component/timer/TimerConsumer.java
b/components/camel-timer/src/main/java/org/apache/camel/component/timer/TimerConsumer.java
index 8cc80c4..40b44c8 100644
---
a/components/camel-timer/src/main/java/org/apache/camel/component/timer/TimerConsumer.java
+++
b/components/camel-timer/src/main/java/org/apache/camel/component/timer/TimerConsumer.java
@@ -185,7 +185,7 @@ public class TimerConsumer extends DefaultConsumer
implements StartupListener, S
}
protected void sendTimerExchange(long counter) {
- final Exchange exchange = endpoint.createExchange();
+ final Exchange exchange = createExchange(false);
if (endpoint.isIncludeMetadata()) {
exchange.setProperty(Exchange.TIMER_COUNTER, counter);
@@ -211,6 +211,10 @@ public class TimerConsumer extends DefaultConsumer
implements StartupListener, S
if (exchange.getException() != null) {
getExceptionHandler().handleException("Error
processing exchange", exchange, exchange.getException());
}
+ // sync wil release outside this callback
+ if (!doneSync) {
+ releaseExchange(exchange);
+ }
}
});
} else {
@@ -221,8 +225,12 @@ public class TimerConsumer extends DefaultConsumer
implements StartupListener, S
}
// handle any thrown exception
- if (exchange.getException() != null) {
- getExceptionHandler().handleException("Error processing
exchange", exchange, exchange.getException());
+ try {
+ if (exchange.getException() != null) {
+ getExceptionHandler().handleException("Error processing
exchange", exchange, exchange.getException());
+ }
+ } finally {
+ releaseExchange(exchange);
}
}
}
diff --git a/core/camel-api/src/main/java/org/apache/camel/Consumer.java
b/core/camel-api/src/main/java/org/apache/camel/Consumer.java
index 0f3ec2e..2360be5 100644
--- a/core/camel-api/src/main/java/org/apache/camel/Consumer.java
+++ b/core/camel-api/src/main/java/org/apache/camel/Consumer.java
@@ -16,6 +16,8 @@
*/
package org.apache.camel;
+import org.apache.camel.spi.UnitOfWork;
+
/**
* A consumer of message exchanges from an {@link Endpoint}.
* <p/>
@@ -25,6 +27,21 @@ package org.apache.camel;
*/
public interface Consumer extends Service, EndpointAware {
+ /**
+ * The processor that will process the {@link Exchange} that was consumed.
+ */
Processor getProcessor();
+ /**
+ * Creates an {@link Exchange} that was consumed.
+ *
+ * @param autoRelease whether to auto release the exchange when routing is
complete via {@link UnitOfWork}
+ */
+ Exchange createExchange(boolean autoRelease);
+
+ /**
+ * Releases the {@link Exchange} when its completed processing and no
longer needed.
+ */
+ void releaseExchange(Exchange exchange);
+
}
diff --git a/core/camel-api/src/main/java/org/apache/camel/Endpoint.java
b/core/camel-api/src/main/java/org/apache/camel/Endpoint.java
index 10bfd5e..cacdf51 100644
--- a/core/camel-api/src/main/java/org/apache/camel/Endpoint.java
+++ b/core/camel-api/src/main/java/org/apache/camel/Endpoint.java
@@ -79,6 +79,13 @@ public interface Endpoint extends IsSingleton, Service {
Exchange createExchange(ExchangePattern pattern);
/**
+ * Configures a newly created {@link Exchange}.
+ *
+ * @param exchange the new exchange
+ */
+ void configureExchange(Exchange exchange);
+
+ /**
* Returns the context which created the endpoint
*
* @return the context which created the endpoint
diff --git
a/core/camel-api/src/main/java/org/apache/camel/ExtendedExchange.java
b/core/camel-api/src/main/java/org/apache/camel/ExtendedExchange.java
index c2a1ffd..578a3ee 100644
--- a/core/camel-api/src/main/java/org/apache/camel/ExtendedExchange.java
+++ b/core/camel-api/src/main/java/org/apache/camel/ExtendedExchange.java
@@ -30,6 +30,8 @@ public interface ExtendedExchange extends Exchange {
/**
* Clears the exchange from user data so it may be reused.
+ * <p/>
+ * <b>Important:</b> This API is NOT intended for Camel end users, but
used internally by Camel itself.
*/
void reset();
diff --git
a/core/camel-api/src/main/java/org/apache/camel/spi/ExchangeFactory.java
b/core/camel-api/src/main/java/org/apache/camel/spi/ExchangeFactory.java
index f1eefd0..95586c4 100644
--- a/core/camel-api/src/main/java/org/apache/camel/spi/ExchangeFactory.java
+++ b/core/camel-api/src/main/java/org/apache/camel/spi/ExchangeFactory.java
@@ -27,6 +27,11 @@ import org.apache.camel.Exchange;
*/
public interface ExchangeFactory {
+ // TODO: new factory per consumer so there is no single race bottleneck
+ // TODO: only use factory on route consumer to limit its scope to most
significant impact
+ // TODO: release from extended exchange without onCompletion (overhead)
+ // TODO: reuse unit of work (expensive to create)
+
/**
* Service factory key.
*/
@@ -34,15 +39,18 @@ public interface ExchangeFactory {
/**
* Gets a new {@link Exchange}
+ *
+ * @param autoRelease whether to auto release the exchange when routing is
complete via {@link UnitOfWork}
*/
- Exchange create();
+ Exchange create(boolean autoRelease);
/**
* Gets a new {@link Exchange}
*
+ * @param autoRelease whether to auto release the exchange when routing
is complete via {@link UnitOfWork}
* @param fromEndpoint the from endpoint
*/
- Exchange create(Endpoint fromEndpoint);
+ Exchange create(Endpoint fromEndpoint, boolean autoRelease);
default void release(Exchange exchange) {
// noop
diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/UnitOfWork.java
b/core/camel-api/src/main/java/org/apache/camel/spi/UnitOfWork.java
index 062f22d..20284fa 100644
--- a/core/camel-api/src/main/java/org/apache/camel/spi/UnitOfWork.java
+++ b/core/camel-api/src/main/java/org/apache/camel/spi/UnitOfWork.java
@@ -41,6 +41,13 @@ public interface UnitOfWork extends Service {
String MDC_TRANSACTION_KEY = "camel.transactionKey";
/**
+ * Clears the unit of work from user data so it may be reused.
+ * <p/>
+ * <b>Important:</b> This API is NOT intended for Camel end users, but
used internally by Camel itself.
+ */
+ void reset();
+
+ /**
* Adds a synchronization hook
*
* @param synchronization the hook
diff --git
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultExchangeFactory.java
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultExchangeFactory.java
index fedae54..ec17772 100644
---
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultExchangeFactory.java
+++
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultExchangeFactory.java
@@ -41,12 +41,13 @@ public class DefaultExchangeFactory implements
ExchangeFactory, CamelContextAwar
}
@Override
- public Exchange create() {
+ public Exchange create(boolean autoRelease) {
return new DefaultExchange(camelContext);
}
@Override
- public Exchange create(Endpoint fromEndpoint) {
+ public Exchange create(Endpoint fromEndpoint, boolean autoRelease) {
return new DefaultExchange(fromEndpoint);
}
+
}
diff --git
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java
index f8576b8..975b8a0 100644
---
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java
+++
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java
@@ -132,6 +132,18 @@ public class DefaultUnitOfWork implements UnitOfWork,
Service {
}
@Override
+ public void reset() {
+ routes.clear();
+ if (synchronizations != null) {
+ synchronizations.clear();
+ }
+ originalInMessage = null;
+ if (transactedBy != null) {
+ transactedBy.clear();
+ }
+ }
+
+ @Override
public void setParentUnitOfWork(UnitOfWork parentUnitOfWork) {
}
diff --git
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java
index 88142c9..30094cd 100644
---
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java
+++
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java
@@ -48,6 +48,7 @@ public class PooledExchangeFactory extends ServiceSupport
private final AtomicLong acquired = new AtomicLong();
private final AtomicLong created = new AtomicLong();
private final AtomicLong released = new AtomicLong();
+ private final AtomicLong discarded = new AtomicLong();
private CamelContext camelContext;
private boolean statisticsEnabled = true;
@@ -71,7 +72,7 @@ public class PooledExchangeFactory extends ServiceSupport
}
@Override
- public Exchange create() {
+ public Exchange create(boolean autoRelease) {
Exchange exchange = pool.poll();
if (exchange == null) {
if (statisticsEnabled) {
@@ -83,17 +84,16 @@ public class PooledExchangeFactory extends ServiceSupport
if (statisticsEnabled) {
acquired.incrementAndGet();
}
- // reset exchange before we use it
- ExtendedExchange ee = exchange.adapt(ExtendedExchange.class);
- ee.reset();
}
- // add on completion which will return the exchange when done
- exchange.adapt(ExtendedExchange.class).addOnCompletion(onCompletion);
+ if (autoRelease) {
+ // add on completion which will return the exchange when done
+
exchange.adapt(ExtendedExchange.class).addOnCompletion(onCompletion);
+ }
return exchange;
}
@Override
- public Exchange create(Endpoint fromEndpoint) {
+ public Exchange create(Endpoint fromEndpoint, boolean autoRelease) {
Exchange exchange = pool.poll();
if (exchange == null) {
if (statisticsEnabled) {
@@ -108,17 +108,33 @@ public class PooledExchangeFactory extends ServiceSupport
// need to mark this exchange from the given endpoint
exchange.adapt(ExtendedExchange.class).setFromEndpoint(fromEndpoint);
}
- // add on completion which will return the exchange when done
- exchange.adapt(ExtendedExchange.class).addOnCompletion(onCompletion);
+ if (autoRelease) {
+ // add on completion which will return the exchange when done
+
exchange.adapt(ExtendedExchange.class).addOnCompletion(onCompletion);
+ }
return exchange;
}
@Override
public void release(Exchange exchange) {
- if (statisticsEnabled) {
- released.incrementAndGet();
+ // reset exchange before returning to pool
+ try {
+ // TODO: reset on pool as this then update created to be up-to-date
+ ExtendedExchange ee = exchange.adapt(ExtendedExchange.class);
+ ee.reset();
+
+ // only release back in pool if reset was success
+ if (statisticsEnabled) {
+ released.incrementAndGet();
+ }
+ pool.offer(exchange);
+ } catch (Exception e) {
+ if (statisticsEnabled) {
+ discarded.incrementAndGet();
+ }
+ // ignore
+ LOG.debug("Error resetting exchange: {}. This exchange is
discarded.", exchange);
}
- pool.offer(exchange);
}
@Override
@@ -126,13 +142,14 @@ public class PooledExchangeFactory extends ServiceSupport
pool.clear();
if (statisticsEnabled) {
- LOG.info("PooledExchangeFactory usage [created: {}, acquired: {},
released: {}]", created.get(), acquired.get(),
- released.get());
+ LOG.info("PooledExchangeFactory usage [created: {}, acquired: {},
released: {}, discarded: {}]",
+ created.get(), acquired.get(), released.get(),
discarded.get());
}
created.set(0);
acquired.set(0);
released.set(0);
+ discarded.set(0);
}
private final class ReleaseOnCompletion extends SynchronizationAdapter {
@@ -145,7 +162,9 @@ public class PooledExchangeFactory extends ServiceSupport
@Override
public void onDone(Exchange exchange) {
- release(exchange);
+ if (exchange != null) {
+ release(exchange);
+ }
}
}
diff --git
a/core/camel-core-model/src/main/java/org/apache/camel/builder/ExchangeBuilder.java
b/core/camel-core-model/src/main/java/org/apache/camel/builder/ExchangeBuilder.java
index 1088cdc..8594bfa 100644
---
a/core/camel-core-model/src/main/java/org/apache/camel/builder/ExchangeBuilder.java
+++
b/core/camel-core-model/src/main/java/org/apache/camel/builder/ExchangeBuilder.java
@@ -22,8 +22,8 @@ import java.util.Map;
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangePattern;
-import org.apache.camel.ExtendedCamelContext;
import org.apache.camel.Message;
+import org.apache.camel.support.DefaultExchange;
/**
* Builder to create {@link Exchange} and add headers and set body on the
Exchange {@link Message}.
@@ -103,7 +103,7 @@ public final class ExchangeBuilder {
* @return exchange
*/
public Exchange build() {
- Exchange exchange =
context.adapt(ExtendedCamelContext.class).getExchangeFactory().create();
+ Exchange exchange = new DefaultExchange(context);
Message message = exchange.getIn();
message.setBody(body);
if (headers.size() > 0) {
diff --git
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/WireTapProcessor.java
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/WireTapProcessor.java
index 4ce6bb5..9ca24e6 100644
---
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/WireTapProcessor.java
+++
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/WireTapProcessor.java
@@ -29,7 +29,6 @@ import org.apache.camel.CamelContextAware;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangePattern;
import org.apache.camel.Expression;
-import org.apache.camel.ExtendedCamelContext;
import org.apache.camel.Message;
import org.apache.camel.Processor;
import org.apache.camel.RuntimeCamelException;
@@ -42,6 +41,7 @@ import org.apache.camel.spi.RouteIdAware;
import org.apache.camel.spi.ShutdownAware;
import org.apache.camel.support.AsyncProcessorConverterHelper;
import org.apache.camel.support.AsyncProcessorSupport;
+import org.apache.camel.support.DefaultExchange;
import org.apache.camel.support.ExchangeHelper;
import org.apache.camel.support.service.ServiceHelper;
import org.apache.camel.util.ObjectHelper;
@@ -257,10 +257,7 @@ public class WireTapProcessor extends AsyncProcessorSupport
}
private Exchange configureNewExchange(Exchange exchange) {
- Exchange answer
- =
camelContext.adapt(ExtendedCamelContext.class).getExchangeFactory().create(exchange.getFromEndpoint());
- answer.setPattern(ExchangePattern.InOnly);
- return answer;
+ return new DefaultExchange(exchange.getFromEndpoint(),
ExchangePattern.InOnly);
}
public List<Processor> getNewExchangeProcessors() {
diff --git
a/core/camel-core/src/test/java/org/apache/camel/component/dataset/DataSetTestEndpointTest.java
b/core/camel-core/src/test/java/org/apache/camel/component/dataset/DataSetTestEndpointTest.java
index bd09c43..0050d05 100644
---
a/core/camel-core/src/test/java/org/apache/camel/component/dataset/DataSetTestEndpointTest.java
+++
b/core/camel-core/src/test/java/org/apache/camel/component/dataset/DataSetTestEndpointTest.java
@@ -81,9 +81,19 @@ public class DataSetTestEndpointTest extends
ContextTestSupport {
}
@Override
+ public Exchange createExchange(boolean autoRelease) {
+ return new DefaultExchange(getEndpoint());
+ }
+
+ @Override
+ public void releaseExchange(Exchange exchange) {
+ // noop
+ }
+
+ @Override
public void start() {
// when starting then send a message to the processor
- Exchange exchange = new DefaultExchange(getEndpoint());
+ Exchange exchange = createExchange(false);
exchange.getIn().setBody(expectedBody);
try {
processor.process(exchange);
diff --git
a/core/camel-support/src/main/java/org/apache/camel/support/DefaultConsumer.java
b/core/camel-support/src/main/java/org/apache/camel/support/DefaultConsumer.java
index 71d4265..c8e594f 100644
---
a/core/camel-support/src/main/java/org/apache/camel/support/DefaultConsumer.java
+++
b/core/camel-support/src/main/java/org/apache/camel/support/DefaultConsumer.java
@@ -26,6 +26,7 @@ import org.apache.camel.Processor;
import org.apache.camel.Route;
import org.apache.camel.RouteAware;
import org.apache.camel.spi.ExceptionHandler;
+import org.apache.camel.spi.ExchangeFactory;
import org.apache.camel.spi.RouteIdAware;
import org.apache.camel.spi.UnitOfWork;
import org.apache.camel.support.service.ServiceHelper;
@@ -45,6 +46,7 @@ public class DefaultConsumer extends ServiceSupport
implements Consumer, RouteAw
private final Endpoint endpoint;
private final Processor processor;
private final AsyncProcessor asyncProcessor;
+ private final ExchangeFactory exchangeFactory;
private ExceptionHandler exceptionHandler;
private Route route;
private String routeId;
@@ -54,6 +56,7 @@ public class DefaultConsumer extends ServiceSupport
implements Consumer, RouteAw
this.processor = processor;
this.asyncProcessor = AsyncProcessorConverterHelper.convert(processor);
this.exceptionHandler = new
LoggingExceptionHandler(endpoint.getCamelContext(), getClass());
+ this.exchangeFactory =
endpoint.getCamelContext().adapt(ExtendedCamelContext.class).getExchangeFactory();
}
@Override
@@ -121,6 +124,21 @@ public class DefaultConsumer extends ServiceSupport
implements Consumer, RouteAw
}
@Override
+ public Exchange createExchange(boolean autoRelease) {
+ Exchange answer = exchangeFactory.create(getEndpoint(), autoRelease);
+ endpoint.configureExchange(answer);
+ answer.adapt(ExtendedExchange.class).setFromRouteId(routeId);
+ return answer;
+ }
+
+ @Override
+ public void releaseExchange(Exchange exchange) {
+ if (exchange != null) {
+ exchangeFactory.release(exchange);
+ }
+ }
+
+ @Override
public Endpoint getEndpoint() {
return endpoint;
}
diff --git
a/core/camel-support/src/main/java/org/apache/camel/support/DefaultEndpoint.java
b/core/camel-support/src/main/java/org/apache/camel/support/DefaultEndpoint.java
index aa777e9..70d9aaf 100644
---
a/core/camel-support/src/main/java/org/apache/camel/support/DefaultEndpoint.java
+++
b/core/camel-support/src/main/java/org/apache/camel/support/DefaultEndpoint.java
@@ -27,10 +27,8 @@ import org.apache.camel.Consumer;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangePattern;
-import org.apache.camel.ExtendedCamelContext;
import org.apache.camel.PollingConsumer;
import org.apache.camel.spi.ExceptionHandler;
-import org.apache.camel.spi.ExchangeFactory;
import org.apache.camel.spi.HasId;
import org.apache.camel.spi.Metadata;
import org.apache.camel.spi.PropertyConfigurer;
@@ -56,7 +54,6 @@ public abstract class DefaultEndpoint extends ServiceSupport
implements Endpoint
private volatile String endpointUri;
private CamelContext camelContext;
private Component component;
- private ExchangeFactory exchangeFactory;
@Metadata(label = "advanced", defaultValue = "true",
description = "Whether autowiring is enabled. This is used for
automatic autowiring options (the option must be marked as autowired)"
@@ -101,9 +98,6 @@ public abstract class DefaultEndpoint extends ServiceSupport
implements Endpoint
this.setEndpointUri(endpointUri);
if (component != null) {
this.camelContext = component.getCamelContext();
- if (this.camelContext != null) {
- this.exchangeFactory =
camelContext.adapt(ExtendedCamelContext.class).getExchangeFactory();
- }
}
}
@@ -234,14 +228,19 @@ public abstract class DefaultEndpoint extends
ServiceSupport implements Endpoint
@Override
public Exchange createExchange() {
- return createExchange(getExchangePattern());
+ return createExchange(exchangePattern);
}
@Override
public Exchange createExchange(ExchangePattern pattern) {
- Exchange exchange = exchangeFactory.create(this);
- exchange.setPattern(pattern);
- return exchange;
+ Exchange answer = new DefaultExchange(this, pattern);
+ configureExchange(answer);
+ return answer;
+ }
+
+ @Override
+ public void configureExchange(Exchange exchange) {
+ // noop
}
/**
@@ -490,10 +489,6 @@ public abstract class DefaultEndpoint extends
ServiceSupport implements Endpoint
protected void doInit() throws Exception {
ObjectHelper.notNull(getCamelContext(), "camelContext");
- if (exchangeFactory == null) {
- exchangeFactory =
camelContext.adapt(ExtendedCamelContext.class).getExchangeFactory();
- }
-
if (autowiredEnabled && getComponent() != null) {
PropertyConfigurer configurer =
getComponent().getEndpointPropertyConfigurer();
if (configurer instanceof PropertyConfigurerGetter) {
diff --git
a/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java
b/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java
index fca25fa..5ff30c9 100644
---
a/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java
+++
b/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java
@@ -53,6 +53,7 @@ public final class DefaultExchange implements
ExtendedExchange {
private Exception exception;
private String exchangeId;
private UnitOfWork unitOfWork;
+ private ExchangePattern originalPattern;
private ExchangePattern pattern;
private Endpoint fromEndpoint;
private String fromRouteId;
@@ -74,12 +75,14 @@ public final class DefaultExchange implements
ExtendedExchange {
this.context = context;
this.pattern = ExchangePattern.InOnly;
this.created = System.currentTimeMillis();
+ this.originalPattern = this.pattern;
}
public DefaultExchange(CamelContext context, ExchangePattern pattern) {
this.context = context;
this.pattern = pattern;
this.created = System.currentTimeMillis();
+ this.originalPattern = this.pattern;
}
public DefaultExchange(Exchange parent) {
@@ -89,6 +92,7 @@ public final class DefaultExchange implements
ExtendedExchange {
this.fromEndpoint = parent.getFromEndpoint();
this.fromRouteId = parent.getFromRouteId();
this.unitOfWork = parent.getUnitOfWork();
+ this.originalPattern = this.pattern;
}
public DefaultExchange(Endpoint fromEndpoint) {
@@ -96,6 +100,7 @@ public final class DefaultExchange implements
ExtendedExchange {
this.pattern = ExchangePattern.InOnly;
this.created = System.currentTimeMillis();
this.fromEndpoint = fromEndpoint;
+ this.originalPattern = this.pattern;
}
public DefaultExchange(Endpoint fromEndpoint, ExchangePattern pattern) {
@@ -103,6 +108,7 @@ public final class DefaultExchange implements
ExtendedExchange {
this.pattern = pattern;
this.created = System.currentTimeMillis();
this.fromEndpoint = fromEndpoint;
+ this.originalPattern = this.pattern;
}
@Override
@@ -118,13 +124,17 @@ public final class DefaultExchange implements
ExtendedExchange {
public void reset() {
this.properties.clear();
this.exchangeId = null;
+ // TODO: This is reset time
this.created = System.currentTimeMillis();
+ this.in = null;
this.out = null;
this.exception = null;
this.unitOfWork = null;
- this.pattern = null;
- this.fromEndpoint = null;
- this.fromRouteId = null;
+ // reset pattern to original
+ this.pattern = originalPattern;
+ // do not reset endpoint as it would be the same consumer/endpoint
again
+ // this.fromEndpoint = null;
+ // this.fromRouteId = null;
if (this.onCompletions != null) {
this.onCompletions.clear();
}
diff --git
a/core/camel-support/src/main/java/org/apache/camel/support/DefaultInterceptSendToEndpoint.java
b/core/camel-support/src/main/java/org/apache/camel/support/DefaultInterceptSendToEndpoint.java
index 74725c9..99692e0 100644
---
a/core/camel-support/src/main/java/org/apache/camel/support/DefaultInterceptSendToEndpoint.java
+++
b/core/camel-support/src/main/java/org/apache/camel/support/DefaultInterceptSendToEndpoint.java
@@ -113,6 +113,11 @@ public class DefaultInterceptSendToEndpoint implements
InterceptSendToEndpoint,
}
@Override
+ public void configureExchange(Exchange exchange) {
+ delegate.configureExchange(exchange);
+ }
+
+ @Override
public CamelContext getCamelContext() {
return delegate.getCamelContext();
}
diff --git
a/core/camel-support/src/main/java/org/apache/camel/support/PollingConsumerSupport.java
b/core/camel-support/src/main/java/org/apache/camel/support/PollingConsumerSupport.java
index be1759c..ca4930f 100644
---
a/core/camel-support/src/main/java/org/apache/camel/support/PollingConsumerSupport.java
+++
b/core/camel-support/src/main/java/org/apache/camel/support/PollingConsumerSupport.java
@@ -16,9 +16,7 @@
*/
package org.apache.camel.support;
-import org.apache.camel.Endpoint;
-import org.apache.camel.PollingConsumer;
-import org.apache.camel.Processor;
+import org.apache.camel.*;
import org.apache.camel.spi.ExceptionHandler;
import org.apache.camel.support.service.ServiceSupport;
@@ -50,6 +48,16 @@ public abstract class PollingConsumerSupport extends
ServiceSupport implements P
return null;
}
+ @Override
+ public Exchange createExchange(boolean autoRelease) {
+ throw new UnsupportedOperationException("Not supported on
PollingConsumer");
+ }
+
+ @Override
+ public void releaseExchange(Exchange exchange) {
+ throw new UnsupportedOperationException("Not supported on
PollingConsumer");
+ }
+
public ExceptionHandler getExceptionHandler() {
return exceptionHandler;
}