This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch exchange-factory
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/exchange-factory by this push:
     new d03e874  CAMEL-16222: PooledExchangeFactory experiment
d03e874 is described below

commit d03e87461f1ffcc346cfc8fb56bfc28ccf37adb6
Author: Claus Ibsen <claus.ib...@gmail.com>
AuthorDate: Sun Feb 21 19:54:21 2021 +0100

    CAMEL-16222: PooledExchangeFactory experiment
---
 .../instance/HazelcastInstanceConsumer.java        |  4 +-
 .../hazelcast/list/HazelcastListConsumer.java      |  6 ---
 .../hazelcast/listener/CamelListener.java          |  4 +-
 .../hazelcast/map/HazelcastMapConsumer.java        |  6 ---
 .../multimap/HazelcastMultimapConsumer.java        |  6 ---
 .../hazelcast/queue/HazelcastQueueConsumer.java    |  4 +-
 .../HazelcastReplicatedmapConsumer.java            |  6 ---
 .../hazelcast/seda/HazelcastSedaConsumer.java      | 32 ++++++++--------
 .../hazelcast/set/HazelcastSetConsumer.java        |  6 ---
 .../hazelcast/topic/HazelcastTopicConsumer.java    |  6 ---
 .../camel/component/hbase/HBaseConsumer.java       |  5 ++-
 .../apache/camel/component/hdfs/HdfsConsumer.java  | 44 ++++++++++++----------
 12 files changed, 51 insertions(+), 78 deletions(-)

diff --git 
a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/instance/HazelcastInstanceConsumer.java
 
b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/instance/HazelcastInstanceConsumer.java
index 3dc0450..6a9b392 100644
--- 
a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/instance/HazelcastInstanceConsumer.java
+++ 
b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/instance/HazelcastInstanceConsumer.java
@@ -49,7 +49,7 @@ public class HazelcastInstanceConsumer extends 
DefaultConsumer {
         }
 
         private void sendExchange(MembershipEvent event, String action) {
-            Exchange exchange = getEndpoint().createExchange();
+            Exchange exchange = createExchange(false);
 
             HazelcastComponentHelper.setListenerHeaders(exchange, 
HazelcastConstants.INSTANCE_LISTENER, action);
 
@@ -71,6 +71,8 @@ public class HazelcastInstanceConsumer extends 
DefaultConsumer {
                         "Error processing exchange for Hazelcast consumer on 
your Hazelcast cluster.", exchange,
                         exchange.getException());
             }
+
+            releaseExchange(exchange, false);
         }
 
     }
diff --git 
a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/list/HazelcastListConsumer.java
 
b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/list/HazelcastListConsumer.java
index f22a3bb..6178333 100644
--- 
a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/list/HazelcastListConsumer.java
+++ 
b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/list/HazelcastListConsumer.java
@@ -42,9 +42,6 @@ public class HazelcastListConsumer extends 
HazelcastDefaultConsumer {
         queue = hazelcastInstance.getList(cacheName);
     }
 
-    /**
-     * @see org.apache.camel.support.DefaultConsumer#doStart()
-     */
     @Override
     protected void doStart() throws Exception {
         super.doStart();
@@ -52,9 +49,6 @@ public class HazelcastListConsumer extends 
HazelcastDefaultConsumer {
         listener = queue.addItemListener(new CamelItemListener(this, 
cacheName), true);
     }
 
-    /**
-     * @see org.apache.camel.support.DefaultConsumer#doStop()
-     */
     @Override
     protected void doStop() throws Exception {
         queue.removeItemListener(listener);
diff --git 
a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/listener/CamelListener.java
 
b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/listener/CamelListener.java
index 9e330be..d7cdffa 100644
--- 
a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/listener/CamelListener.java
+++ 
b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/listener/CamelListener.java
@@ -32,7 +32,7 @@ public class CamelListener {
     }
 
     protected void sendExchange(String operation, Object key, Object value) {
-        Exchange exchange = consumer.getEndpoint().createExchange();
+        Exchange exchange = consumer.createExchange(false);
 
         // set object to body
         exchange.getIn().setBody(value);
@@ -57,6 +57,8 @@ public class CamelListener {
                     exchange,
                     exchange.getException());
         }
+
+        consumer.releaseExchange(exchange, false);
     }
 
     public String getCacheName() {
diff --git 
a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/map/HazelcastMapConsumer.java
 
b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/map/HazelcastMapConsumer.java
index 1e8fbff..2d16262 100644
--- 
a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/map/HazelcastMapConsumer.java
+++ 
b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/map/HazelcastMapConsumer.java
@@ -37,9 +37,6 @@ public class HazelcastMapConsumer extends 
HazelcastDefaultConsumer {
         cache = hazelcastInstance.getMap(cacheName);
     }
 
-    /**
-     * @see org.apache.camel.support.DefaultConsumer#doStart()
-     */
     @Override
     protected void doStart() throws Exception {
         super.doStart();
@@ -47,9 +44,6 @@ public class HazelcastMapConsumer extends 
HazelcastDefaultConsumer {
         listener = cache.addEntryListener(new CamelMapListener(this, 
cacheName), true);
     }
 
-    /**
-     * @see org.apache.camel.support.DefaultConsumer#doStop()
-     */
     @Override
     protected void doStop() throws Exception {
         cache.removeEntryListener(listener);
diff --git 
a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/multimap/HazelcastMultimapConsumer.java
 
b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/multimap/HazelcastMultimapConsumer.java
index 29c0d6a..a506fcb 100644
--- 
a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/multimap/HazelcastMultimapConsumer.java
+++ 
b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/multimap/HazelcastMultimapConsumer.java
@@ -38,9 +38,6 @@ public class HazelcastMultimapConsumer extends 
HazelcastDefaultConsumer {
         cache = hazelcastInstance.getMultiMap(cacheName);
     }
 
-    /**
-     * @see org.apache.camel.support.DefaultConsumer#doStart()
-     */
     @Override
     protected void doStart() throws Exception {
         super.doStart();
@@ -48,9 +45,6 @@ public class HazelcastMultimapConsumer extends 
HazelcastDefaultConsumer {
         listener = cache.addEntryListener(new CamelEntryListener(this, 
cacheName), true);
     }
 
-    /**
-     * @see org.apache.camel.support.DefaultConsumer#doStop()
-     */
     @Override
     protected void doStop() throws Exception {
         cache.removeEntryListener(listener);
diff --git 
a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/queue/HazelcastQueueConsumer.java
 
b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/queue/HazelcastQueueConsumer.java
index 7794bda..e177126 100644
--- 
a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/queue/HazelcastQueueConsumer.java
+++ 
b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/queue/HazelcastQueueConsumer.java
@@ -86,12 +86,14 @@ public class HazelcastQueueConsumer extends 
HazelcastDefaultConsumer {
                         final Object body = 
queue.poll(config.getPollingTimeout(), TimeUnit.MILLISECONDS);
                         // CAMEL-16035 - If the polling timeout is exceeded 
with nothing to poll from the queue, the queue.poll() method return NULL
                         if (body != null) {
-                            Exchange exchange = getEndpoint().createExchange();
+                            Exchange exchange = createExchange(false);
                             exchange.getIn().setBody(body);
                             try {
                                 processor.process(exchange);
                             } catch (Exception e) {
                                 getExceptionHandler().handleException("Error 
during processing", exchange, e);
+                            } finally {
+                                releaseExchange(exchange, false);
                             }
                         }
                     } catch (InterruptedException e) {
diff --git 
a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/replicatedmap/HazelcastReplicatedmapConsumer.java
 
b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/replicatedmap/HazelcastReplicatedmapConsumer.java
index 2a215fd..380def8 100644
--- 
a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/replicatedmap/HazelcastReplicatedmapConsumer.java
+++ 
b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/replicatedmap/HazelcastReplicatedmapConsumer.java
@@ -38,9 +38,6 @@ public class HazelcastReplicatedmapConsumer extends 
HazelcastDefaultConsumer {
         cache = hazelcastInstance.getReplicatedMap(cacheName);
     }
 
-    /**
-     * @see org.apache.camel.support.DefaultConsumer#doStart()
-     */
     @Override
     protected void doStart() throws Exception {
         super.doStart();
@@ -48,9 +45,6 @@ public class HazelcastReplicatedmapConsumer extends 
HazelcastDefaultConsumer {
         listener = cache.addEntryListener(new CamelEntryListener(this, 
cacheName));
     }
 
-    /**
-     * @see org.apache.camel.support.DefaultConsumer#doStop()
-     */
     @Override
     protected void doStop() throws Exception {
         cache.removeEntryListener(listener);
diff --git 
a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaConsumer.java
 
b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaConsumer.java
index a73aafe..c58db47 100644
--- 
a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaConsumer.java
+++ 
b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/seda/HazelcastSedaConsumer.java
@@ -76,7 +76,7 @@ public class HazelcastSedaConsumer extends DefaultConsumer 
implements Runnable {
         BaseQueue<?> queue = 
endpoint.getHazelcastInstance().getQueue(endpoint.getConfiguration().getQueueName());
 
         while (queue != null && isRunAllowed()) {
-            final Exchange exchange = this.getEndpoint().createExchange();
+            final Exchange exchange = createExchange(true);
 
             TransactionContext transactionCtx = null;
             try {
@@ -100,22 +100,25 @@ public class HazelcastSedaConsumer extends 
DefaultConsumer implements Runnable {
                         exchange.getIn().setBody(body);
                     }
                     try {
+                        final TransactionContext txc = transactionCtx;
                         // process using the asynchronous routing engine
                         processor.process(exchange, new AsyncCallback() {
                             public void done(boolean asyncDone) {
-                                // noop
+                                if (exchange.getException() != null) {
+                                    // Rollback
+                                    if (txc != null) {
+                                        txc.rollbackTransaction();
+                                    }
+                                    
getExceptionHandler().handleException("Error processing exchange", exchange,
+                                            exchange.getException());
+                                }
+                                // It's OK, I commit
+                                if (exchange.getException() == null && txc != 
null) {
+                                    LOG.trace("Commit transaction: {}", 
txc.getTxnId());
+                                    txc.commitTransaction();
+                                }
                             }
                         });
-
-                        if (exchange.getException() != null) {
-                            // Rollback
-                            if (transactionCtx != null) {
-                                transactionCtx.rollbackTransaction();
-                            }
-                            getExceptionHandler().handleException("Error 
processing exchange", exchange,
-                                    exchange.getException());
-                        }
-
                     } catch (Exception e) {
                         LOG.error("Hzlq Exception caught: {}", e, e);
                         // Rollback
@@ -125,11 +128,6 @@ public class HazelcastSedaConsumer extends DefaultConsumer 
implements Runnable {
                         }
                     }
                 }
-                // It's OK, I commit
-                if (exchange.getException() == null && transactionCtx != null) 
{
-                    LOG.trace("Commit transaction: {}", 
transactionCtx.getTxnId());
-                    transactionCtx.commitTransaction();
-                }
             } catch (InterruptedException e) {
                 if (LOG.isDebugEnabled()) {
                     LOG.debug("Hzlq Consumer Interrupted: {}", e, e);
diff --git 
a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/set/HazelcastSetConsumer.java
 
b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/set/HazelcastSetConsumer.java
index 8a17cf9..4cf75e3 100644
--- 
a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/set/HazelcastSetConsumer.java
+++ 
b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/set/HazelcastSetConsumer.java
@@ -40,9 +40,6 @@ public class HazelcastSetConsumer extends 
HazelcastDefaultConsumer {
         set = hazelcastInstance.getSet(cacheName);
     }
 
-    /**
-     * @see org.apache.camel.support.DefaultConsumer#doStart()
-     */
     @Override
     protected void doStart() throws Exception {
         super.doStart();
@@ -50,9 +47,6 @@ public class HazelcastSetConsumer extends 
HazelcastDefaultConsumer {
         listener = set.addItemListener(new CamelItemListener(this, cacheName), 
true);
     }
 
-    /**
-     * @see org.apache.camel.support.DefaultConsumer#doStop()
-     */
     @Override
     protected void doStop() throws Exception {
         set.removeItemListener(listener);
diff --git 
a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/topic/HazelcastTopicConsumer.java
 
b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/topic/HazelcastTopicConsumer.java
index 1c816c6..67a1a2a 100644
--- 
a/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/topic/HazelcastTopicConsumer.java
+++ 
b/components/camel-hazelcast/src/main/java/org/apache/camel/component/hazelcast/topic/HazelcastTopicConsumer.java
@@ -25,9 +25,6 @@ import org.apache.camel.Processor;
 import org.apache.camel.component.hazelcast.HazelcastDefaultConsumer;
 import org.apache.camel.component.hazelcast.listener.CamelMessageListener;
 
-/**
- *
- */
 public class HazelcastTopicConsumer extends HazelcastDefaultConsumer {
 
     private ITopic<Object> topic;
@@ -44,9 +41,6 @@ public class HazelcastTopicConsumer extends 
HazelcastDefaultConsumer {
         }
     }
 
-    /**
-     * @see org.apache.camel.support.DefaultConsumer#doStart()
-     */
     @Override
     protected void doStart() throws Exception {
         super.doStart();
diff --git 
a/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseConsumer.java
 
b/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseConsumer.java
index 59efd5a..d2c3867 100644
--- 
a/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseConsumer.java
+++ 
b/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseConsumer.java
@@ -29,6 +29,7 @@ import 
org.apache.camel.component.hbase.mapping.CellMappingStrategyFactory;
 import org.apache.camel.component.hbase.model.HBaseCell;
 import org.apache.camel.component.hbase.model.HBaseData;
 import org.apache.camel.component.hbase.model.HBaseRow;
+import org.apache.camel.support.DefaultExchange;
 import org.apache.camel.support.ScheduledBatchPollingConsumer;
 import org.apache.camel.util.CastUtils;
 import org.apache.camel.util.ObjectHelper;
@@ -94,7 +95,7 @@ public class HBaseConsumer extends 
ScheduledBatchPollingConsumer {
             ResultScanner scanner = table.getScanner(scan);
             int exchangeCount = 0;
             // The next three statements are used just to get a reference to 
the BodyCellMappingStrategy instance.
-            Exchange exchange = endpoint.createExchange();
+            Exchange exchange = new DefaultExchange(endpoint);
             exchange.getIn().setHeader(CellMappingStrategyFactory.STRATEGY, 
CellMappingStrategyFactory.BODY);
             CellMappingStrategy mappingStrategy = 
endpoint.getCellMappingStrategyFactory().getStrategy(exchange.getIn());
             for (Result result = scanner.next();
@@ -137,7 +138,7 @@ public class HBaseConsumer extends 
ScheduledBatchPollingConsumer {
                     }
 
                     data.getRows().add(resultRow);
-                    exchange = endpoint.createExchange();
+                    exchange = createExchange(true);
                     // Probably overkill but kept it here for consistency.
                     
exchange.getIn().setHeader(CellMappingStrategyFactory.STRATEGY, 
CellMappingStrategyFactory.BODY);
                     mappingStrategy.applyScanResults(exchange.getIn(), data);
diff --git 
a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java
 
b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java
index e25ac4e..7e197a4 100644
--- 
a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java
+++ 
b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java
@@ -179,36 +179,40 @@ public final class HdfsConsumer extends 
ScheduledPollConsumer {
     private void processHdfsInputStream(
             HdfsInputStream hdfsFile, Holder<Object> key, Holder<Object> 
value, AtomicInteger messageCount,
             AtomicInteger totalMessageCount) {
-        Exchange exchange = this.getEndpoint().createExchange();
-        Message message = exchange.getIn();
-        String fileName = 
StringUtils.substringAfterLast(hdfsFile.getActualPath(), "/");
-        message.setHeader(Exchange.FILE_NAME, fileName);
-        message.setHeader(Exchange.FILE_NAME_CONSUMED, fileName);
-        message.setHeader("CamelFileAbsolutePath", hdfsFile.getActualPath());
-        if (key.getValue() != null) {
-            message.setHeader(HdfsHeader.KEY.name(), key.getValue());
-        }
+        Exchange exchange = createExchange(false);
+        try {
+            Message message = exchange.getIn();
+            String fileName = 
StringUtils.substringAfterLast(hdfsFile.getActualPath(), "/");
+            message.setHeader(Exchange.FILE_NAME, fileName);
+            message.setHeader(Exchange.FILE_NAME_CONSUMED, fileName);
+            message.setHeader("CamelFileAbsolutePath", 
hdfsFile.getActualPath());
+            if (key.getValue() != null) {
+                message.setHeader(HdfsHeader.KEY.name(), key.getValue());
+            }
 
-        if (hdfsFile.getNumOfReadBytes() >= 0) {
-            message.setHeader(Exchange.FILE_LENGTH, 
hdfsFile.getNumOfReadBytes());
-        }
+            if (hdfsFile.getNumOfReadBytes() >= 0) {
+                message.setHeader(Exchange.FILE_LENGTH, 
hdfsFile.getNumOfReadBytes());
+            }
 
-        message.setBody(value.getValue());
+            message.setBody(value.getValue());
 
-        updateNewExchange(exchange, messageCount.get(), hdfsFile);
+            updateNewExchange(exchange, messageCount.get(), hdfsFile);
+
+            LOG.debug("Processing file [{}]", fileName);
 
-        LOG.debug("Processing file [{}]", fileName);
-        try {
             processor.process(exchange);
             totalMessageCount.incrementAndGet();
+
         } catch (Exception e) {
             exchange.setException(e);
+        } finally {
+            // in case of unhandled exceptions then let the exception handler 
handle them
+            if (exchange.getException() != null) {
+                getExceptionHandler().handleException(exchange.getException());
+            }
+            releaseExchange(exchange, false);
         }
 
-        // in case of unhandled exceptions then let the exception handler 
handle them
-        if (exchange.getException() != null) {
-            getExceptionHandler().handleException(exchange.getException());
-        }
     }
 
     private boolean normalFileIsDirectoryHasSuccessFile(FileStatus fileStatus, 
HdfsInfo info) {

Reply via email to