This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch exchange-factory
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/exchange-factory by this push:
new 2c0da01 CAMEL-16222: PooledExchangeFactory experiment
2c0da01 is described below
commit 2c0da01eb58a4831273b55030b72f50dcf8c95c1
Author: Claus Ibsen <[email protected]>
AuthorDate: Sun Feb 21 18:53:43 2021 +0100
CAMEL-16222: PooledExchangeFactory experiment
---
.../java/org/apache/camel/component/ehcache/EhcacheConsumer.java | 4 +++-
.../java/org/apache/camel/component/etcd/EtcdStatsConsumer.java | 2 +-
.../java/org/apache/camel/component/etcd/EtcdWatchConsumer.java | 6 +++++-
.../java/org/apache/camel/component/facebook/FacebookConsumer.java | 6 +++---
.../org/apache/camel/component/file/watch/FileWatchConsumer.java | 2 +-
.../java/org/apache/camel/component/flatpack/FlatpackEndpoint.java | 4 +++-
6 files changed, 16 insertions(+), 8 deletions(-)
diff --git
a/components/camel-ehcache/src/main/java/org/apache/camel/component/ehcache/EhcacheConsumer.java
b/components/camel-ehcache/src/main/java/org/apache/camel/component/ehcache/EhcacheConsumer.java
index 7bf5f70..e784460 100644
---
a/components/camel-ehcache/src/main/java/org/apache/camel/component/ehcache/EhcacheConsumer.java
+++
b/components/camel-ehcache/src/main/java/org/apache/camel/component/ehcache/EhcacheConsumer.java
@@ -72,7 +72,7 @@ public class EhcacheConsumer extends DefaultConsumer
implements CacheEventListen
@Override
public void onEvent(CacheEvent<?, ?> event) {
if (isRunAllowed()) {
- final Exchange exchange = getEndpoint().createExchange();
+ final Exchange exchange = createExchange(false);
final Message message = exchange.getIn();
message.setHeader(EhcacheConstants.KEY, event.getKey());
@@ -84,6 +84,8 @@ public class EhcacheConsumer extends DefaultConsumer
implements CacheEventListen
getProcessor().process(exchange);
} catch (Exception e) {
getExceptionHandler().handleException("Error processing
exchange", exchange, e);
+ } finally {
+ releaseExchange(exchange, false);
}
}
}
diff --git
a/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/EtcdStatsConsumer.java
b/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/EtcdStatsConsumer.java
index c5e1b8c..e47c97e 100644
---
a/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/EtcdStatsConsumer.java
+++
b/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/EtcdStatsConsumer.java
@@ -35,7 +35,7 @@ public class EtcdStatsConsumer extends
AbstractEtcdPollingConsumer {
Object answer = endpoint.getStats(getClient());
if (answer != null) {
- Exchange exchange = endpoint.createExchange();
+ Exchange exchange = createExchange(true);
exchange.getIn().setHeader(EtcdConstants.ETCD_NAMESPACE, "stats");
exchange.getIn().setHeader(EtcdConstants.ETCD_PATH,
endpoint.getPath());
exchange.getIn().setBody(answer);
diff --git
a/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/EtcdWatchConsumer.java
b/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/EtcdWatchConsumer.java
index 6689ed6..2795129 100644
---
a/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/EtcdWatchConsumer.java
+++
b/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/EtcdWatchConsumer.java
@@ -93,7 +93,7 @@ public class EtcdWatchConsumer extends AbstractEtcdConsumer
try {
EtcdKeysResponse response = promise.get();
- exchange = endpoint.createExchange();
+ exchange = createExchange(false);
exchange.getIn().setHeader(EtcdConstants.ETCD_NAMESPACE,
"watch");
exchange.getIn().setHeader(EtcdConstants.ETCD_PATH,
response.node.key);
exchange.getIn().setBody(response);
@@ -127,6 +127,10 @@ public class EtcdWatchConsumer extends AbstractEtcdConsumer
}
}
+ if (exchange != null) {
+ releaseExchange(exchange, false);
+ }
+
if (throwable != null) {
handleException("Error processing etcd response", throwable);
}
diff --git
a/components/camel-facebook/src/main/java/org/apache/camel/component/facebook/FacebookConsumer.java
b/components/camel-facebook/src/main/java/org/apache/camel/component/facebook/FacebookConsumer.java
index 343c4fe..f14f873 100644
---
a/components/camel-facebook/src/main/java/org/apache/camel/component/facebook/FacebookConsumer.java
+++
b/components/camel-facebook/src/main/java/org/apache/camel/component/facebook/FacebookConsumer.java
@@ -184,7 +184,7 @@ public class FacebookConsumer extends ScheduledPollConsumer
{
}
private void processResult(Object result, String rawJSON) throws Exception
{
- Exchange exchange = endpoint.createExchange();
+ Exchange exchange = createExchange(false);
exchange.getIn().setBody(result);
if (rawJSON != null) {
exchange.getIn().setHeader(FacebookConstants.RAW_JSON_HEADER,
rawJSON);
@@ -197,6 +197,7 @@ public class FacebookConsumer extends ScheduledPollConsumer
{
if (exchange.getException() != null) {
getExceptionHandler().handleException("Error processing
exchange", exchange, exchange.getException());
}
+ releaseExchange(exchange, false);
}
}
@@ -237,8 +238,7 @@ public class FacebookConsumer extends ScheduledPollConsumer
{
if (this.sinceTime == null) {
// first poll, set this to (current time - initial poll delay)
final Date startTime = new Date(
- currentMillis
- -
TimeUnit.MILLISECONDS.convert(getInitialDelay(), getTimeUnit()));
+ currentMillis -
TimeUnit.MILLISECONDS.convert(getInitialDelay(), getTimeUnit()));
this.sinceTime = dateFormat.format(startTime);
} else if (this.untilTime != null) {
// use the last 'until' time
diff --git
a/components/camel-file-watch/src/main/java/org/apache/camel/component/file/watch/FileWatchConsumer.java
b/components/camel-file-watch/src/main/java/org/apache/camel/component/file/watch/FileWatchConsumer.java
index c1c8f77..e8b64ba 100644
---
a/components/camel-file-watch/src/main/java/org/apache/camel/component/file/watch/FileWatchConsumer.java
+++
b/components/camel-file-watch/src/main/java/org/apache/camel/component/file/watch/FileWatchConsumer.java
@@ -139,7 +139,7 @@ public class FileWatchConsumer extends DefaultConsumer {
}
private Exchange prepareExchange(FileEvent event) {
- Exchange exchange = getEndpoint().createExchange();
+ Exchange exchange = createExchange(true);
File file = event.getEventPath().toFile();
Message message = exchange.getIn();
message.setBody(file);
diff --git
a/components/camel-flatpack/src/main/java/org/apache/camel/component/flatpack/FlatpackEndpoint.java
b/components/camel-flatpack/src/main/java/org/apache/camel/component/flatpack/FlatpackEndpoint.java
index c9a9829..8b5d315 100644
---
a/components/camel-flatpack/src/main/java/org/apache/camel/component/flatpack/FlatpackEndpoint.java
+++
b/components/camel-flatpack/src/main/java/org/apache/camel/component/flatpack/FlatpackEndpoint.java
@@ -92,7 +92,9 @@ public class FlatpackEndpoint extends DefaultPollingEndpoint {
@Override
public Consumer createConsumer(Processor processor) throws Exception {
- return new FlatpackConsumer(this, processor, loadBalancer);
+ FlatpackConsumer consumer = new FlatpackConsumer(this, processor,
loadBalancer);
+ configureConsumer(consumer);
+ return consumer;
}
public void processDataSet(Exchange originalExchange, DataSet dataSet, int
counter) throws Exception {