This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch exchange-factory
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/exchange-factory by this push:
     new 0309f6e  CAMEL-16222: PooledExchangeFactory experiment
0309f6e is described below

commit 0309f6e1b18eac8663e3936d52e3bf443ddf6379
Author: Claus Ibsen <claus.ib...@gmail.com>
AuthorDate: Sun Feb 21 13:33:49 2021 +0100

    CAMEL-16222: PooledExchangeFactory experiment
---
 .../component/cassandra/CassandraConsumer.java     |  7 +--
 .../apache/camel/component/cmis/CMISConsumer.java  |  2 +-
 .../org/apache/camel/coap/CamelCoapResource.java   |  3 +-
 .../camel/component/cometd/CometdConsumer.java     | 20 ++++----
 .../consul/endpoint/ConsulEventConsumer.java       | 44 ++++++++---------
 .../consul/endpoint/ConsulKeyValueConsumer.java    | 34 ++++++-------
 .../camel/component/corda/CordaConsumer.java       | 14 +++---
 .../component/couchbase/CouchbaseConsumer.java     | 55 ++++++++++------------
 .../component/couchdb/CouchDbChangesetTracker.java |  7 ++-
 .../camel/component/couchdb/CouchDbConsumer.java   | 19 +++++---
 .../camel/component/couchdb/CouchDbEndpoint.java   | 13 -----
 .../couchdb/CouchDbChangesetTrackerTest.java       | 14 +++---
 .../component/couchdb/CouchDbEndpointTest.java     | 27 -----------
 13 files changed, 116 insertions(+), 143 deletions(-)

diff --git 
a/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraConsumer.java
 
b/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraConsumer.java
index 07518f5..cc516d8 100644
--- 
a/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraConsumer.java
+++ 
b/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraConsumer.java
@@ -55,11 +55,11 @@ public class CassandraConsumer extends 
ScheduledPollConsumer {
         }
 
         // Create message from ResultSet
-        Exchange exchange = getEndpoint().createExchange();
-        Message message = exchange.getIn();
-        getEndpoint().fillMessage(resultSet, message);
+        Exchange exchange = createExchange(false);
 
         try {
+            Message message = exchange.getIn();
+            getEndpoint().fillMessage(resultSet, message);
             // send message to next processor in the route
             getProcessor().process(exchange);
             return 1; // number of messages polled
@@ -68,6 +68,7 @@ public class CassandraConsumer extends ScheduledPollConsumer {
             if (exchange.getException() != null) {
                 getExceptionHandler().handleException("Error processing 
exchange", exchange, exchange.getException());
             }
+            releaseExchange(exchange, false);
         }
     }
 
diff --git 
a/components/camel-cmis/src/main/java/org/apache/camel/component/cmis/CMISConsumer.java
 
b/components/camel-cmis/src/main/java/org/apache/camel/component/cmis/CMISConsumer.java
index a9e301c..afb7b63 100644
--- 
a/components/camel-cmis/src/main/java/org/apache/camel/component/cmis/CMISConsumer.java
+++ 
b/components/camel-cmis/src/main/java/org/apache/camel/component/cmis/CMISConsumer.java
@@ -58,7 +58,7 @@ public class CMISConsumer extends ScheduledPollConsumer {
 
     int sendExchangeWithPropsAndBody(Map<String, Object> properties, 
InputStream inputStream)
             throws Exception {
-        Exchange exchange = getEndpoint().createExchange();
+        Exchange exchange = createExchange(true);
         exchange.getIn().setHeaders(properties);
         exchange.getIn().setBody(inputStream);
         LOG.debug("Polling node: {}", properties.get("cmis:name"));
diff --git 
a/components/camel-coap/src/main/java/org/apache/camel/coap/CamelCoapResource.java
 
b/components/camel-coap/src/main/java/org/apache/camel/coap/CamelCoapResource.java
index a1761be..948513b 100644
--- 
a/components/camel-coap/src/main/java/org/apache/camel/coap/CamelCoapResource.java
+++ 
b/components/camel-coap/src/main/java/org/apache/camel/coap/CamelCoapResource.java
@@ -91,7 +91,7 @@ final class CamelCoapResource extends CoapResource {
                 return;
             }
 
-            camelExchange = consumer.getEndpoint().createExchange();
+            camelExchange = consumer.createExchange(false);
             consumer.createUoW(camelExchange);
 
             OptionSet options = exchange.getRequest().getOptions();
@@ -144,6 +144,7 @@ final class CamelCoapResource extends CoapResource {
             if (camelExchange != null) {
                 consumer.doneUoW(camelExchange);
             }
+            consumer.releaseExchange(camelExchange, false);
         }
     }
 }
diff --git 
a/components/camel-cometd/src/main/java/org/apache/camel/component/cometd/CometdConsumer.java
 
b/components/camel-cometd/src/main/java/org/apache/camel/component/cometd/CometdConsumer.java
index 3d2c498..84d09ab 100644
--- 
a/components/camel-cometd/src/main/java/org/apache/camel/component/cometd/CometdConsumer.java
+++ 
b/components/camel-cometd/src/main/java/org/apache/camel/component/cometd/CometdConsumer.java
@@ -98,17 +98,21 @@ public class CometdConsumer extends DefaultConsumer 
implements CometdProducerCon
 
             Message message = 
binding.createCamelMessage(endpoint.getCamelContext(), remote, cometdMessage, 
data);
 
-            Exchange exchange = endpoint.createExchange();
-            exchange.setIn(message);
+            Exchange exchange = consumer.createExchange(false);
+            try {
+                exchange.setIn(message);
 
-            consumer.getProcessor().process(exchange);
+                consumer.getProcessor().process(exchange);
 
-            if (ExchangeHelper.isOutCapable(exchange)) {
-                ServerChannel channel = getBayeux().getChannel(channelName);
-                ServerSession serverSession = getServerSession();
+                if (ExchangeHelper.isOutCapable(exchange)) {
+                    ServerChannel channel = 
getBayeux().getChannel(channelName);
+                    ServerSession serverSession = getServerSession();
 
-                ServerMessage.Mutable outMessage = 
binding.createCometdMessage(channel, serverSession, exchange.getOut());
-                remote.deliver(serverSession, outMessage);
+                    ServerMessage.Mutable outMessage = 
binding.createCometdMessage(channel, serverSession, exchange.getOut());
+                    remote.deliver(serverSession, outMessage);
+                }
+            } finally {
+                consumer.releaseExchange(exchange, false);
             }
         }
 
diff --git 
a/components/camel-consul/src/main/java/org/apache/camel/component/consul/endpoint/ConsulEventConsumer.java
 
b/components/camel-consul/src/main/java/org/apache/camel/component/consul/endpoint/ConsulEventConsumer.java
index 83b6661..25e6aaf 100644
--- 
a/components/camel-consul/src/main/java/org/apache/camel/component/consul/endpoint/ConsulEventConsumer.java
+++ 
b/components/camel-consul/src/main/java/org/apache/camel/component/consul/endpoint/ConsulEventConsumer.java
@@ -107,32 +107,34 @@ public final class ConsulEventConsumer extends 
AbstractConsulConsumer<EventClien
         private void onEvent(Event event) {
             
LoggerFactory.getLogger(ConsulEventConsumer.this.getClass()).info("{}", event);
 
-            final Exchange exchange = endpoint.createExchange();
-            final Message message = exchange.getIn();
-
-            message.setHeader(ConsulConstants.CONSUL_KEY, key);
-            message.setHeader(ConsulConstants.CONSUL_RESULT, true);
-            message.setHeader(ConsulConstants.CONSUL_EVENT_ID, event.getId());
-            message.setHeader(ConsulConstants.CONSUL_EVENT_NAME, 
event.getName());
-            message.setHeader(ConsulConstants.CONSUL_EVENT_LTIME, 
event.getLTime());
-            message.setHeader(ConsulConstants.CONSUL_VERSION, 
event.getVersion());
-
-            if (event.getNodeFilter().isPresent()) {
-                message.setHeader(ConsulConstants.CONSUL_NODE_FILTER, 
event.getNodeFilter().get());
-            }
-            if (event.getServiceFilter().isPresent()) {
-                message.setHeader(ConsulConstants.CONSUL_SERVICE_FILTER, 
event.getServiceFilter().get());
-            }
-            if (event.getTagFilter().isPresent()) {
-                message.setHeader(ConsulConstants.CONSUL_TAG_FILTER, 
event.getTagFilter().get());
-            }
+            final Exchange exchange = createExchange(false);
+            try {
+                final Message message = exchange.getIn();
 
-            message.setBody(event.getPayload().orElse(null));
+                message.setHeader(ConsulConstants.CONSUL_KEY, key);
+                message.setHeader(ConsulConstants.CONSUL_RESULT, true);
+                message.setHeader(ConsulConstants.CONSUL_EVENT_ID, 
event.getId());
+                message.setHeader(ConsulConstants.CONSUL_EVENT_NAME, 
event.getName());
+                message.setHeader(ConsulConstants.CONSUL_EVENT_LTIME, 
event.getLTime());
+                message.setHeader(ConsulConstants.CONSUL_VERSION, 
event.getVersion());
+
+                if (event.getNodeFilter().isPresent()) {
+                    message.setHeader(ConsulConstants.CONSUL_NODE_FILTER, 
event.getNodeFilter().get());
+                }
+                if (event.getServiceFilter().isPresent()) {
+                    message.setHeader(ConsulConstants.CONSUL_SERVICE_FILTER, 
event.getServiceFilter().get());
+                }
+                if (event.getTagFilter().isPresent()) {
+                    message.setHeader(ConsulConstants.CONSUL_TAG_FILTER, 
event.getTagFilter().get());
+                }
+
+                message.setBody(event.getPayload().orElse(null));
 
-            try {
                 getProcessor().process(exchange);
             } catch (Exception e) {
                 getExceptionHandler().handleException("Error processing 
exchange", exchange, e);
+            } finally {
+                releaseExchange(exchange, false);
             }
         }
 
diff --git 
a/components/camel-consul/src/main/java/org/apache/camel/component/consul/endpoint/ConsulKeyValueConsumer.java
 
b/components/camel-consul/src/main/java/org/apache/camel/component/consul/endpoint/ConsulKeyValueConsumer.java
index a344336..b7b8cb3 100644
--- 
a/components/camel-consul/src/main/java/org/apache/camel/component/consul/endpoint/ConsulKeyValueConsumer.java
+++ 
b/components/camel-consul/src/main/java/org/apache/camel/component/consul/endpoint/ConsulKeyValueConsumer.java
@@ -71,27 +71,29 @@ public final class ConsulKeyValueConsumer extends 
AbstractConsulConsumer<KeyValu
         }
 
         protected void onValue(Value value) {
-            final Exchange exchange = endpoint.createExchange();
-            final Message message = exchange.getIn();
-
-            message.setHeader(ConsulConstants.CONSUL_KEY, value.getKey());
-            message.setHeader(ConsulConstants.CONSUL_RESULT, true);
-            message.setHeader(ConsulConstants.CONSUL_FLAGS, value.getFlags());
-            message.setHeader(ConsulConstants.CONSUL_CREATE_INDEX, 
value.getCreateIndex());
-            message.setHeader(ConsulConstants.CONSUL_LOCK_INDEX, 
value.getLockIndex());
-            message.setHeader(ConsulConstants.CONSUL_MODIFY_INDEX, 
value.getModifyIndex());
-
-            if (value.getSession().isPresent()) {
-                message.setHeader(ConsulConstants.CONSUL_SESSION, 
value.getSession().get());
-            }
+            final Exchange exchange = createExchange(false);
+            try {
+                final Message message = exchange.getIn();
 
-            message.setBody(
-                    configuration.isValueAsString() ? 
value.getValueAsString().orElse(null) : value.getValue().orElse(null));
+                message.setHeader(ConsulConstants.CONSUL_KEY, value.getKey());
+                message.setHeader(ConsulConstants.CONSUL_RESULT, true);
+                message.setHeader(ConsulConstants.CONSUL_FLAGS, 
value.getFlags());
+                message.setHeader(ConsulConstants.CONSUL_CREATE_INDEX, 
value.getCreateIndex());
+                message.setHeader(ConsulConstants.CONSUL_LOCK_INDEX, 
value.getLockIndex());
+                message.setHeader(ConsulConstants.CONSUL_MODIFY_INDEX, 
value.getModifyIndex());
+
+                if (value.getSession().isPresent()) {
+                    message.setHeader(ConsulConstants.CONSUL_SESSION, 
value.getSession().get());
+                }
+
+                message.setBody(
+                        configuration.isValueAsString() ? 
value.getValueAsString().orElse(null) : value.getValue().orElse(null));
 
-            try {
                 getProcessor().process(exchange);
             } catch (Exception e) {
                 getExceptionHandler().handleException("Error processing 
exchange", exchange, e);
+            } finally {
+                releaseExchange(exchange, false);
             }
         }
 
diff --git 
a/components/camel-corda/src/main/java/org/apache/camel/component/corda/CordaConsumer.java
 
b/components/camel-corda/src/main/java/org/apache/camel/component/corda/CordaConsumer.java
index 132723f..d69655f 100644
--- 
a/components/camel-corda/src/main/java/org/apache/camel/component/corda/CordaConsumer.java
+++ 
b/components/camel-corda/src/main/java/org/apache/camel/component/corda/CordaConsumer.java
@@ -199,42 +199,42 @@ public class CordaConsumer extends DefaultConsumer {
 
     private void processFlowProcess(String x) {
         LOG.debug("processFlowProcess {}", x);
-        Exchange exchange = this.getEndpoint().createExchange();
+        Exchange exchange = createExchange(true);
         exchange.getIn().setBody(x);
         processEvent(exchange);
     }
 
     private void processTransactionMappingFeed(StateMachineTransactionMapping 
x) {
         LOG.debug("processTransactionMappingFeed {}", x);
-        Exchange exchange = this.getEndpoint().createExchange();
+        Exchange exchange = createExchange(true);
         exchange.getIn().setBody(x);
         processEvent(exchange);
     }
 
     private void proceedNetworkMapFeed(NetworkMapCache.MapChange x) {
         LOG.debug("proceedNetworkMapFeed {}", x);
-        Exchange exchange = this.getEndpoint().createExchange();
+        Exchange exchange = createExchange(true);
         exchange.getIn().setBody(x);
         processEvent(exchange);
     }
 
     private void processStateMachineUpdate(StateMachineUpdate x) {
         LOG.debug("processStateMachineUpdate {}", x);
-        Exchange exchange = this.getEndpoint().createExchange();
+        Exchange exchange = createExchange(true);
         exchange.getIn().setBody(x);
         processEvent(exchange);
     }
 
     private void processContractStateUpdate(Vault.Update<ContractState> x) {
         LOG.debug("processContractStateUpdate {}", x);
-        Exchange exchange = this.getEndpoint().createExchange();
+        Exchange exchange = createExchange(true);
         exchange.getIn().setBody(x);
         processEvent(exchange);
     }
 
     private void processError(Throwable throwable, String operation) {
         LOG.debug("processError for operation: " + operation + " " + 
throwable);
-        Exchange exchange = this.getEndpoint().createExchange();
+        Exchange exchange = createExchange(true);
         exchange.setException(throwable);
         processEvent(exchange);
     }
@@ -244,7 +244,7 @@ public class CordaConsumer extends DefaultConsumer {
         try {
             getProcessor().process(exchange);
         } catch (Exception e) {
-            LOG.error("Error processing event ", e);
+            getExceptionHandler().handleException("Error processing event", e);
         }
     }
 
diff --git 
a/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseConsumer.java
 
b/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseConsumer.java
index ab6d5fc..18c2e98 100644
--- 
a/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseConsumer.java
+++ 
b/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseConsumer.java
@@ -41,8 +41,8 @@ public class CouchbaseConsumer extends 
DefaultScheduledPollConsumer {
 
     private final CouchbaseEndpoint endpoint;
     private final Bucket bucket;
+    private final Collection collection;
     private ViewOptions viewOptions;
-    private Collection collection;
 
     public CouchbaseConsumer(CouchbaseEndpoint endpoint, Bucket client, 
Processor processor) {
         super(endpoint, processor);
@@ -60,13 +60,10 @@ public class CouchbaseConsumer extends 
DefaultScheduledPollConsumer {
         } else {
             this.collection = client.defaultCollection();
         }
-        init();
     }
 
     @Override
     protected void doInit() {
-
-        //   query.setIncludeDocs(true);
         this.viewOptions = ViewOptions.viewOptions();
         int limit = endpoint.getLimit();
         if (limit > 0) {
@@ -92,13 +89,11 @@ public class CouchbaseConsumer extends 
DefaultScheduledPollConsumer {
 
     @Override
     protected void doStart() throws Exception {
-        LOG.info("Starting Couchbase consumer");
         super.doStart();
     }
 
     @Override
     protected void doStop() throws Exception {
-        LOG.info("Stopping Couchbase consumer");
         super.doStop();
         if (bucket != null) {
             bucket.core().shutdown();
@@ -130,34 +125,36 @@ public class CouchbaseConsumer extends 
DefaultScheduledPollConsumer {
             String designDocumentName = endpoint.getDesignDocumentName();
             String viewName = endpoint.getViewName();
 
-            Exchange exchange = endpoint.createExchange();
-            exchange.getIn().setBody(doc);
-            exchange.getIn().setHeader(HEADER_ID, id);
-            exchange.getIn().setHeader(HEADER_KEY, key);
-            exchange.getIn().setHeader(HEADER_DESIGN_DOCUMENT_NAME, 
designDocumentName);
-            exchange.getIn().setHeader(HEADER_VIEWNAME, viewName);
-
-            if ("delete".equalsIgnoreCase(consumerProcessedStrategy)) {
-                if (LOG.isTraceEnabled()) {
-                    LOG.trace("Deleting doc with ID {}", id);
-                }
-
-                collection.remove(id);
-            } else if ("filter".equalsIgnoreCase(consumerProcessedStrategy)) {
-                if (LOG.isTraceEnabled()) {
-                    LOG.trace("Filtering out ID {}", id);
+            Exchange exchange = createExchange(false);
+            try {
+                exchange.getIn().setBody(doc);
+                exchange.getIn().setHeader(HEADER_ID, id);
+                exchange.getIn().setHeader(HEADER_KEY, key);
+                exchange.getIn().setHeader(HEADER_DESIGN_DOCUMENT_NAME, 
designDocumentName);
+                exchange.getIn().setHeader(HEADER_VIEWNAME, viewName);
+
+                if ("delete".equalsIgnoreCase(consumerProcessedStrategy)) {
+                    if (LOG.isTraceEnabled()) {
+                        LOG.trace("Deleting doc with ID {}", id);
+                    }
+
+                    collection.remove(id);
+                } else if 
("filter".equalsIgnoreCase(consumerProcessedStrategy)) {
+                    if (LOG.isTraceEnabled()) {
+                        LOG.trace("Filtering out ID {}", id);
+                    }
+                    // add filter for already processed docs
+                } else {
+                    LOG.trace("No strategy set for already processed docs, 
beware of duplicates!");
                 }
-                // add filter for already processed docs
-            } else {
-                LOG.trace("No strategy set for already processed docs, beware 
of duplicates!");
-            }
 
-            logDetails(id, doc, key, designDocumentName, viewName, exchange);
+                logDetails(id, doc, key, designDocumentName, viewName, 
exchange);
 
-            try {
-                this.getProcessor().process(exchange);
+                getProcessor().process(exchange);
             } catch (Exception e) {
                 this.getExceptionHandler().handleException("Error processing 
exchange.", exchange, e);
+            } finally {
+                releaseExchange(exchange, false);
             }
         }
 
diff --git 
a/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbChangesetTracker.java
 
b/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbChangesetTracker.java
index ac84831..14d145c 100644
--- 
a/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbChangesetTracker.java
+++ 
b/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbChangesetTracker.java
@@ -73,7 +73,7 @@ public class CouchDbChangesetTracker implements Runnable {
                         lastSequence = feed.getSeq();
                         JsonObject doc = feed.getDoc();
 
-                        Exchange exchange = 
endpoint.createExchange(lastSequence, feed.getId(), doc, feed.isDeleted());
+                        Exchange exchange = 
consumer.createExchange(lastSequence, feed.getId(), doc, feed.isDeleted());
                         if (LOG.isTraceEnabled()) {
                             LOG.trace("Created exchange [exchange={}, _id={}, 
seq={}",
                                     new Object[] { exchange, feed.getId(), 
lastSequence });
@@ -83,6 +83,8 @@ public class CouchDbChangesetTracker implements Runnable {
                             consumer.getProcessor().process(exchange);
                         } catch (Exception e) {
                             
consumer.getExceptionHandler().handleException("Error processing exchange.", 
exchange, e);
+                        } finally {
+                            consumer.releaseExchange(exchange, false);
                         }
                     }
 
@@ -139,7 +141,4 @@ public class CouchDbChangesetTracker implements Runnable {
         changes.stop();
     }
 
-    public boolean isStopped() {
-        return stopped;
-    }
 }
diff --git 
a/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbConsumer.java
 
b/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbConsumer.java
index 8064873..3a8e478 100644
--- 
a/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbConsumer.java
+++ 
b/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbConsumer.java
@@ -18,15 +18,13 @@ package org.apache.camel.component.couchdb;
 
 import java.util.concurrent.ExecutorService;
 
+import com.google.gson.JsonObject;
+import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.support.DefaultConsumer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class CouchDbConsumer extends DefaultConsumer {
 
-    private static final Logger LOG = 
LoggerFactory.getLogger(CouchDbConsumer.class);
-
     private final CouchDbClientWrapper couchClient;
     private final CouchDbEndpoint endpoint;
     private ExecutorService executor;
@@ -38,10 +36,20 @@ public class CouchDbConsumer extends DefaultConsumer {
         this.endpoint = endpoint;
     }
 
+    public Exchange createExchange(String seq, String id, JsonObject obj, 
boolean deleted) {
+        Exchange exchange = createExchange(false);
+        exchange.getIn().setHeader(CouchDbConstants.HEADER_DATABASE, 
endpoint.getDatabase());
+        exchange.getIn().setHeader(CouchDbConstants.HEADER_SEQ, seq);
+        exchange.getIn().setHeader(CouchDbConstants.HEADER_DOC_ID, id);
+        exchange.getIn().setHeader(CouchDbConstants.HEADER_DOC_REV, 
obj.get("_rev").getAsString());
+        exchange.getIn().setHeader(CouchDbConstants.HEADER_METHOD, deleted ? 
"DELETE" : "UPDATE");
+        exchange.getIn().setBody(obj);
+        return exchange;
+    }
+
     @Override
     protected void doStart() throws Exception {
         super.doStart();
-        LOG.info("Starting CouchDB consumer");
 
         executor = 
endpoint.getCamelContext().getExecutorServiceManager().newFixedThreadPool(this, 
endpoint.getEndpointUri(),
                 1);
@@ -52,7 +60,6 @@ public class CouchDbConsumer extends DefaultConsumer {
     @Override
     protected void doStop() throws Exception {
         super.doStop();
-        LOG.info("Stopping CouchDB consumer");
         if (task != null) {
             task.stop();
         }
diff --git 
a/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbEndpoint.java
 
b/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbEndpoint.java
index 1137976..e9c6078 100644
--- 
a/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbEndpoint.java
+++ 
b/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbEndpoint.java
@@ -18,10 +18,8 @@ package org.apache.camel.component.couchdb;
 
 import java.net.URI;
 
-import com.google.gson.JsonObject;
 import org.apache.camel.Category;
 import org.apache.camel.Consumer;
-import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
 import org.apache.camel.spi.Metadata;
@@ -112,17 +110,6 @@ public class CouchDbEndpoint extends DefaultEndpoint {
         return new CouchDbProducer(this, createClient());
     }
 
-    public Exchange createExchange(String seq, String id, JsonObject obj, 
boolean deleted) {
-        Exchange exchange = super.createExchange();
-        exchange.getIn().setHeader(CouchDbConstants.HEADER_DATABASE, database);
-        exchange.getIn().setHeader(CouchDbConstants.HEADER_SEQ, seq);
-        exchange.getIn().setHeader(CouchDbConstants.HEADER_DOC_ID, id);
-        exchange.getIn().setHeader(CouchDbConstants.HEADER_DOC_REV, 
obj.get("_rev").getAsString());
-        exchange.getIn().setHeader(CouchDbConstants.HEADER_METHOD, deleted ? 
"DELETE" : "UPDATE");
-        exchange.getIn().setBody(obj);
-        return exchange;
-    }
-
     protected CouchDbClientWrapper createClient() {
         return new CouchDbClientWrapper(
                 new CouchDbClient(database, createDatabase, protocol, 
hostname, port, username, password));
diff --git 
a/components/camel-couchdb/src/test/java/org/apache/camel/component/couchdb/CouchDbChangesetTrackerTest.java
 
b/components/camel-couchdb/src/test/java/org/apache/camel/component/couchdb/CouchDbChangesetTrackerTest.java
index 018f8aa..7ae1c49 100644
--- 
a/components/camel-couchdb/src/test/java/org/apache/camel/component/couchdb/CouchDbChangesetTrackerTest.java
+++ 
b/components/camel-couchdb/src/test/java/org/apache/camel/component/couchdb/CouchDbChangesetTrackerTest.java
@@ -93,18 +93,18 @@ public class CouchDbChangesetTrackerTest {
         when(row3.getId()).thenReturn("id3");
         when(changes.hasNext()).thenReturn(true, true, true, false);
         when(changes.next()).thenReturn(row1, row2, row3);
-        when(endpoint.createExchange("seq1", "id1", null, 
false)).thenReturn(exchange1);
-        when(endpoint.createExchange("seq2", "id2", null, 
false)).thenReturn(exchange2);
-        when(endpoint.createExchange("seq3", "id3", null, 
false)).thenReturn(exchange3);
+        when(consumer.createExchange("seq1", "id1", null, 
false)).thenReturn(exchange1);
+        when(consumer.createExchange("seq2", "id2", null, 
false)).thenReturn(exchange2);
+        when(consumer.createExchange("seq3", "id3", null, 
false)).thenReturn(exchange3);
         when(consumer.getProcessor()).thenReturn(processor);
 
         tracker.run();
 
-        verify(endpoint).createExchange("seq1", "id1", null, false);
+        verify(consumer).createExchange("seq1", "id1", null, false);
         verify(processor).process(exchange1);
-        verify(endpoint).createExchange("seq2", "id2", null, false);
+        verify(consumer).createExchange("seq2", "id2", null, false);
         verify(processor).process(exchange2);
-        verify(endpoint).createExchange("seq3", "id3", null, false);
+        verify(consumer).createExchange("seq3", "id3", null, false);
         verify(processor).process(exchange3);
     }
 
@@ -116,7 +116,7 @@ public class CouchDbChangesetTrackerTest {
 
         tracker.run();
 
-        verify(endpoint).createExchange("seq1", "id1", null, false);
+        verify(consumer).createExchange("seq1", "id1", null, false);
         verify(processor).process(ArgumentMatchers.isNull());
     }
 }
diff --git 
a/components/camel-couchdb/src/test/java/org/apache/camel/component/couchdb/CouchDbEndpointTest.java
 
b/components/camel-couchdb/src/test/java/org/apache/camel/component/couchdb/CouchDbEndpointTest.java
index b5d09ea..09a8d78 100644
--- 
a/components/camel-couchdb/src/test/java/org/apache/camel/component/couchdb/CouchDbEndpointTest.java
+++ 
b/components/camel-couchdb/src/test/java/org/apache/camel/component/couchdb/CouchDbEndpointTest.java
@@ -16,11 +16,6 @@
  */
 package org.apache.camel.component.couchdb;
 
-import java.util.UUID;
-
-import com.google.gson.JsonObject;
-import org.apache.camel.Exchange;
-import org.apache.camel.impl.DefaultCamelContext;
 import org.junit.jupiter.api.Test;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -30,28 +25,6 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 public class CouchDbEndpointTest {
 
     @Test
-    void testCreateCouchExchangeHeadersAreSet() throws Exception {
-        try (CouchDbEndpoint endpoint = new CouchDbEndpoint(
-                "couchdb:http://localhost/db";, "http://localhost/db";, new 
CouchDbComponent(new DefaultCamelContext()))) {
-
-            String id = UUID.randomUUID().toString();
-            String rev = UUID.randomUUID().toString();
-            String seq = "seq123";
-
-            JsonObject doc = new JsonObject();
-            doc.addProperty("_id", id);
-            doc.addProperty("_rev", rev);
-
-            Exchange exchange = endpoint.createExchange(seq, id, doc, false);
-            assertEquals(id, 
exchange.getIn().getHeader(CouchDbConstants.HEADER_DOC_ID));
-            assertEquals(rev, 
exchange.getIn().getHeader(CouchDbConstants.HEADER_DOC_REV));
-            assertEquals(seq, 
exchange.getIn().getHeader(CouchDbConstants.HEADER_SEQ));
-            assertEquals("UPDATE", 
exchange.getIn().getHeader(CouchDbConstants.HEADER_METHOD));
-            assertEquals("db", 
exchange.getIn().getHeader(CouchDbConstants.HEADER_DATABASE));
-        }
-    }
-
-    @Test
     void assertSingleton() throws Exception {
         try (CouchDbEndpoint endpoint
                 = new CouchDbEndpoint("couchdb:http://localhost/db";, 
"http://localhost/db";, new CouchDbComponent())) {

Reply via email to