This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch exchange-factory
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/exchange-factory by this push:
new d03e874 CAMEL-16222: PooledExchangeFactory experiment
d03e874 is described below
commit d03e87461f1ffcc346cfc8fb56bfc28ccf37adb6
Author: Claus Ibsen <[email protected]>
AuthorDate: Sun Feb 21 19:54:21 2021 +0100
CAMEL-16222: PooledExchangeFactory experiment
---
.../instance/HazelcastInstanceConsumer.java | 4 +-
.../hazelcast/list/HazelcastListConsumer.java | 6 ---
.../hazelcast/listener/CamelListener.java | 4 +-
.../hazelcast/map/HazelcastMapConsumer.java | 6 ---
.../multimap/HazelcastMultimapConsumer.java | 6 ---
.../hazelcast/queue/HazelcastQueueConsumer.java | 4 +-
.../HazelcastReplicatedmapConsumer.java | 6 ---
.../hazelcast/seda/HazelcastSedaConsumer.java | 32 ++++++++--------
.../hazelcast/set/HazelcastSetConsumer.java | 6 ---
.../hazelcast/topic/HazelcastTopicConsumer.java | 6 ---
.../camel/component/hbase/HBaseConsumer.java | 5 ++-
.../apache/camel/component/hdfs/HdfsConsumer.java | 44 ++++++++++++----------
12 files changed, 51 insertions(+), 78 deletions(-)
diff --git
a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/instance/HazelcastInstanceConsumer.java
b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/instance/HazelcastInstanceConsumer.java
index 3dc0450..6a9b392 100644
---
a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/instance/HazelcastInstanceConsumer.java
+++
b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/instance/HazelcastInstanceConsumer.java
@@ -49,7 +49,7 @@ public class HazelcastInstanceConsumer extends
DefaultConsumer {
}
private void sendExchange(MembershipEvent event, String action) {
- Exchange exchange = getEndpoint().createExchange();
+ Exchange exchange = createExchange(false);
HazelcastComponentHelper.setListenerHeaders(exchange,
HazelcastConstants.INSTANCE_LISTENER, action);
@@ -71,6 +71,8 @@ public class HazelcastInstanceConsumer extends
DefaultConsumer {
"Error processing exchange for Hazelcast consumer on
your Hazelcast cluster.", exchange,
exchange.getException());
}
+
+ releaseExchange(exchange, false);
}
}
diff --git
a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/list/HazelcastListConsumer.java
b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/list/HazelcastListConsumer.java
index f22a3bb..6178333 100644
---
a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/list/HazelcastListConsumer.java
+++
b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/list/HazelcastListConsumer.java
@@ -42,9 +42,6 @@ public class HazelcastListConsumer extends
HazelcastDefaultConsumer {
queue = hazelcastInstance.getList(cacheName);
}
- /**
- * @see org.apache.camel.support.DefaultConsumer#doStart()
- */
@Override
protected void doStart() throws Exception {
super.doStart();
@@ -52,9 +49,6 @@ public class HazelcastListConsumer extends
HazelcastDefaultConsumer {
listener = queue.addItemListener(new CamelItemListener(this,
cacheName), true);
}
- /**
- * @see org.apache.camel.support.DefaultConsumer#doStop()
- */
@Override
protected void doStop() throws Exception {
queue.removeItemListener(listener);
diff --git
a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/listener/CamelListener.java
b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/listener/CamelListener.java
index 9e330be..d7cdffa 100644
---
a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/listener/CamelListener.java
+++
b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/listener/CamelListener.java
@@ -32,7 +32,7 @@ public class CamelListener {
}
protected void sendExchange(String operation, Object key, Object value) {
- Exchange exchange = consumer.getEndpoint().createExchange();
+ Exchange exchange = consumer.createExchange(false);
// set object to body
exchange.getIn().setBody(value);
@@ -57,6 +57,8 @@ public class CamelListener {
exchange,
exchange.getException());
}
+
+ consumer.releaseExchange(exchange, false);
}
public String getCacheName() {
diff --git
a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/map/HazelcastMapConsumer.java
b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/map/HazelcastMapConsumer.java
index 1e8fbff..2d16262 100644
---
a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/map/HazelcastMapConsumer.java
+++
b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/map/HazelcastMapConsumer.java
@@ -37,9 +37,6 @@ public class HazelcastMapConsumer extends
HazelcastDefaultConsumer {
cache = hazelcastInstance.getMap(cacheName);
}
- /**
- * @see org.apache.camel.support.DefaultConsumer#doStart()
- */
@Override
protected void doStart() throws Exception {
super.doStart();
@@ -47,9 +44,6 @@ public class HazelcastMapConsumer extends
HazelcastDefaultConsumer {
listener = cache.addEntryListener(new CamelMapListener(this,
cacheName), true);
}
- /**
- * @see org.apache.camel.support.DefaultConsumer#doStop()
- */
@Override
protected void doStop() throws Exception {
cache.removeEntryListener(listener);
diff --git
a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/multimap/HazelcastMultimapConsumer.java
b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/multimap/HazelcastMultimapConsumer.java
index 29c0d6a..a506fcb 100644
---
a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/multimap/HazelcastMultimapConsumer.java
+++
b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/multimap/HazelcastMultimapConsumer.java
@@ -38,9 +38,6 @@ public class HazelcastMultimapConsumer extends
HazelcastDefaultConsumer {
cache = hazelcastInstance.getMultiMap(cacheName);
}
- /**
- * @see org.apache.camel.support.DefaultConsumer#doStart()
- */
@Override
protected void doStart() throws Exception {
super.doStart();
@@ -48,9 +45,6 @@ public class HazelcastMultimapConsumer extends
HazelcastDefaultConsumer {
listener = cache.addEntryListener(new CamelEntryListener(this,
cacheName), true);
}
- /**
- * @see org.apache.camel.support.DefaultConsumer#doStop()
- */
@Override
protected void doStop() throws Exception {
cache.removeEntryListener(listener);
diff --git
a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/queue/HazelcastQueueConsumer.java
b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/queue/HazelcastQueueConsumer.java
index 7794bda..e177126 100644
---
a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/queue/HazelcastQueueConsumer.java
+++
b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/queue/HazelcastQueueConsumer.java
@@ -86,12 +86,14 @@ public class HazelcastQueueConsumer extends
HazelcastDefaultConsumer {
final Object body =
queue.poll(config.getPollingTimeout(), TimeUnit.MILLISECONDS);
// CAMEL-16035 - If the polling timeout is exceeded
with nothing to poll from the queue, the queue.poll() method return NULL
if (body != null) {
- Exchange exchange = getEndpoint().createExchange();
+ Exchange exchange = createExchange(false);
exchange.getIn().setBody(body);
try {
processor.process(exchange);
} catch (Exception e) {
getExceptionHandler().handleException("Error
during processing", exchange, e);
+ } finally {
+ releaseExchange(exchange, false);
}
}
} catch (InterruptedException e) {
diff --git
a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/replicatedmap/HazelcastReplicatedmapConsumer.java
b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/replicatedmap/HazelcastReplicatedmapConsumer.java
index 2a215fd..380def8 100644
---
a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/replicatedmap/HazelcastReplicatedmapConsumer.java
+++
b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/replicatedmap/HazelcastReplicatedmapConsumer.java
@@ -38,9 +38,6 @@ public class HazelcastReplicatedmapConsumer extends
HazelcastDefaultConsumer {
cache = hazelcastInstance.getReplicatedMap(cacheName);
}
- /**
- * @see org.apache.camel.support.DefaultConsumer#doStart()
- */
@Override
protected void doStart() throws Exception {
super.doStart();
@@ -48,9 +45,6 @@ public class HazelcastReplicatedmapConsumer extends
HazelcastDefaultConsumer {
listener = cache.addEntryListener(new CamelEntryListener(this,
cacheName));
}
- /**
- * @see org.apache.camel.support.DefaultConsumer#doStop()
- */
@Override
protected void doStop() throws Exception {
cache.removeEntryListener(listener);
diff --git
a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaConsumer.java
b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaConsumer.java
index a73aafe..c58db47 100644
---
a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaConsumer.java
+++
b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaConsumer.java
@@ -76,7 +76,7 @@ public class HazelcastSedaConsumer extends DefaultConsumer
implements Runnable {
BaseQueue<?> queue =
endpoint.getHazelcastInstance().getQueue(endpoint.getConfiguration().getQueueName());
while (queue != null && isRunAllowed()) {
- final Exchange exchange = this.getEndpoint().createExchange();
+ final Exchange exchange = createExchange(true);
TransactionContext transactionCtx = null;
try {
@@ -100,22 +100,25 @@ public class HazelcastSedaConsumer extends
DefaultConsumer implements Runnable {
exchange.getIn().setBody(body);
}
try {
+ final TransactionContext txc = transactionCtx;
// process using the asynchronous routing engine
processor.process(exchange, new AsyncCallback() {
public void done(boolean asyncDone) {
- // noop
+ if (exchange.getException() != null) {
+ // Rollback
+ if (txc != null) {
+ txc.rollbackTransaction();
+ }
+
getExceptionHandler().handleException("Error processing exchange", exchange,
+ exchange.getException());
+ }
+ // It's OK, I commit
+ if (exchange.getException() == null && txc !=
null) {
+ LOG.trace("Commit transaction: {}",
txc.getTxnId());
+ txc.commitTransaction();
+ }
}
});
-
- if (exchange.getException() != null) {
- // Rollback
- if (transactionCtx != null) {
- transactionCtx.rollbackTransaction();
- }
- getExceptionHandler().handleException("Error
processing exchange", exchange,
- exchange.getException());
- }
-
} catch (Exception e) {
LOG.error("Hzlq Exception caught: {}", e, e);
// Rollback
@@ -125,11 +128,6 @@ public class HazelcastSedaConsumer extends DefaultConsumer
implements Runnable {
}
}
}
- // It's OK, I commit
- if (exchange.getException() == null && transactionCtx != null)
{
- LOG.trace("Commit transaction: {}",
transactionCtx.getTxnId());
- transactionCtx.commitTransaction();
- }
} catch (InterruptedException e) {
if (LOG.isDebugEnabled()) {
LOG.debug("Hzlq Consumer Interrupted: {}", e, e);
diff --git
a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/set/HazelcastSetConsumer.java
b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/set/HazelcastSetConsumer.java
index 8a17cf9..4cf75e3 100644
---
a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/set/HazelcastSetConsumer.java
+++
b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/set/HazelcastSetConsumer.java
@@ -40,9 +40,6 @@ public class HazelcastSetConsumer extends
HazelcastDefaultConsumer {
set = hazelcastInstance.getSet(cacheName);
}
- /**
- * @see org.apache.camel.support.DefaultConsumer#doStart()
- */
@Override
protected void doStart() throws Exception {
super.doStart();
@@ -50,9 +47,6 @@ public class HazelcastSetConsumer extends
HazelcastDefaultConsumer {
listener = set.addItemListener(new CamelItemListener(this, cacheName),
true);
}
- /**
- * @see org.apache.camel.support.DefaultConsumer#doStop()
- */
@Override
protected void doStop() throws Exception {
set.removeItemListener(listener);
diff --git
a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/topic/HazelcastTopicConsumer.java
b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/topic/HazelcastTopicConsumer.java
index 1c816c6..67a1a2a 100644
---
a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/topic/HazelcastTopicConsumer.java
+++
b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/topic/HazelcastTopicConsumer.java
@@ -25,9 +25,6 @@ import org.apache.camel.Processor;
import org.apache.camel.component.hazelcast.HazelcastDefaultConsumer;
import org.apache.camel.component.hazelcast.listener.CamelMessageListener;
-/**
- *
- */
public class HazelcastTopicConsumer extends HazelcastDefaultConsumer {
private ITopic<Object> topic;
@@ -44,9 +41,6 @@ public class HazelcastTopicConsumer extends
HazelcastDefaultConsumer {
}
}
- /**
- * @see org.apache.camel.support.DefaultConsumer#doStart()
- */
@Override
protected void doStart() throws Exception {
super.doStart();
diff --git
a/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseConsumer.java
b/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseConsumer.java
index 59efd5a..d2c3867 100644
---
a/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseConsumer.java
+++
b/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseConsumer.java
@@ -29,6 +29,7 @@ import
org.apache.camel.component.hbase.mapping.CellMappingStrategyFactory;
import org.apache.camel.component.hbase.model.HBaseCell;
import org.apache.camel.component.hbase.model.HBaseData;
import org.apache.camel.component.hbase.model.HBaseRow;
+import org.apache.camel.support.DefaultExchange;
import org.apache.camel.support.ScheduledBatchPollingConsumer;
import org.apache.camel.util.CastUtils;
import org.apache.camel.util.ObjectHelper;
@@ -94,7 +95,7 @@ public class HBaseConsumer extends
ScheduledBatchPollingConsumer {
ResultScanner scanner = table.getScanner(scan);
int exchangeCount = 0;
// The next three statements are used just to get a reference to
the BodyCellMappingStrategy instance.
- Exchange exchange = endpoint.createExchange();
+ Exchange exchange = new DefaultExchange(endpoint);
exchange.getIn().setHeader(CellMappingStrategyFactory.STRATEGY,
CellMappingStrategyFactory.BODY);
CellMappingStrategy mappingStrategy =
endpoint.getCellMappingStrategyFactory().getStrategy(exchange.getIn());
for (Result result = scanner.next();
@@ -137,7 +138,7 @@ public class HBaseConsumer extends
ScheduledBatchPollingConsumer {
}
data.getRows().add(resultRow);
- exchange = endpoint.createExchange();
+ exchange = createExchange(true);
// Probably overkill but kept it here for consistency.
exchange.getIn().setHeader(CellMappingStrategyFactory.STRATEGY,
CellMappingStrategyFactory.BODY);
mappingStrategy.applyScanResults(exchange.getIn(), data);
diff --git
a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java
b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java
index e25ac4e..7e197a4 100644
---
a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java
+++
b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java
@@ -179,36 +179,40 @@ public final class HdfsConsumer extends
ScheduledPollConsumer {
private void processHdfsInputStream(
HdfsInputStream hdfsFile, Holder<Object> key, Holder<Object>
value, AtomicInteger messageCount,
AtomicInteger totalMessageCount) {
- Exchange exchange = this.getEndpoint().createExchange();
- Message message = exchange.getIn();
- String fileName =
StringUtils.substringAfterLast(hdfsFile.getActualPath(), "/");
- message.setHeader(Exchange.FILE_NAME, fileName);
- message.setHeader(Exchange.FILE_NAME_CONSUMED, fileName);
- message.setHeader("CamelFileAbsolutePath", hdfsFile.getActualPath());
- if (key.getValue() != null) {
- message.setHeader(HdfsHeader.KEY.name(), key.getValue());
- }
+ Exchange exchange = createExchange(false);
+ try {
+ Message message = exchange.getIn();
+ String fileName =
StringUtils.substringAfterLast(hdfsFile.getActualPath(), "/");
+ message.setHeader(Exchange.FILE_NAME, fileName);
+ message.setHeader(Exchange.FILE_NAME_CONSUMED, fileName);
+ message.setHeader("CamelFileAbsolutePath",
hdfsFile.getActualPath());
+ if (key.getValue() != null) {
+ message.setHeader(HdfsHeader.KEY.name(), key.getValue());
+ }
- if (hdfsFile.getNumOfReadBytes() >= 0) {
- message.setHeader(Exchange.FILE_LENGTH,
hdfsFile.getNumOfReadBytes());
- }
+ if (hdfsFile.getNumOfReadBytes() >= 0) {
+ message.setHeader(Exchange.FILE_LENGTH,
hdfsFile.getNumOfReadBytes());
+ }
- message.setBody(value.getValue());
+ message.setBody(value.getValue());
- updateNewExchange(exchange, messageCount.get(), hdfsFile);
+ updateNewExchange(exchange, messageCount.get(), hdfsFile);
+
+ LOG.debug("Processing file [{}]", fileName);
- LOG.debug("Processing file [{}]", fileName);
- try {
processor.process(exchange);
totalMessageCount.incrementAndGet();
+
} catch (Exception e) {
exchange.setException(e);
+ } finally {
+ // in case of unhandled exceptions then let the exception handler
handle them
+ if (exchange.getException() != null) {
+ getExceptionHandler().handleException(exchange.getException());
+ }
+ releaseExchange(exchange, false);
}
- // in case of unhandled exceptions then let the exception handler
handle them
- if (exchange.getException() != null) {
- getExceptionHandler().handleException(exchange.getException());
- }
}
private boolean normalFileIsDirectoryHasSuccessFile(FileStatus fileStatus,
HdfsInfo info) {