This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch exchange-factory
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/exchange-factory by this push:
new 0309f6e CAMEL-16222: PooledExchangeFactory experiment
0309f6e is described below
commit 0309f6e1b18eac8663e3936d52e3bf443ddf6379
Author: Claus Ibsen <[email protected]>
AuthorDate: Sun Feb 21 13:33:49 2021 +0100
CAMEL-16222: PooledExchangeFactory experiment
---
.../component/cassandra/CassandraConsumer.java | 7 +--
.../apache/camel/component/cmis/CMISConsumer.java | 2 +-
.../org/apache/camel/coap/CamelCoapResource.java | 3 +-
.../camel/component/cometd/CometdConsumer.java | 20 ++++----
.../consul/endpoint/ConsulEventConsumer.java | 44 ++++++++---------
.../consul/endpoint/ConsulKeyValueConsumer.java | 34 ++++++-------
.../camel/component/corda/CordaConsumer.java | 14 +++---
.../component/couchbase/CouchbaseConsumer.java | 55 ++++++++++------------
.../component/couchdb/CouchDbChangesetTracker.java | 7 ++-
.../camel/component/couchdb/CouchDbConsumer.java | 19 +++++---
.../camel/component/couchdb/CouchDbEndpoint.java | 13 -----
.../couchdb/CouchDbChangesetTrackerTest.java | 14 +++---
.../component/couchdb/CouchDbEndpointTest.java | 27 -----------
13 files changed, 116 insertions(+), 143 deletions(-)
diff --git
a/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraConsumer.java
b/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraConsumer.java
index 07518f5..cc516d8 100644
---
a/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraConsumer.java
+++
b/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraConsumer.java
@@ -55,11 +55,11 @@ public class CassandraConsumer extends
ScheduledPollConsumer {
}
// Create message from ResultSet
- Exchange exchange = getEndpoint().createExchange();
- Message message = exchange.getIn();
- getEndpoint().fillMessage(resultSet, message);
+ Exchange exchange = createExchange(false);
try {
+ Message message = exchange.getIn();
+ getEndpoint().fillMessage(resultSet, message);
// send message to next processor in the route
getProcessor().process(exchange);
return 1; // number of messages polled
@@ -68,6 +68,7 @@ public class CassandraConsumer extends ScheduledPollConsumer {
if (exchange.getException() != null) {
getExceptionHandler().handleException("Error processing
exchange", exchange, exchange.getException());
}
+ releaseExchange(exchange, false);
}
}
diff --git
a/components/camel-cmis/src/main/java/org/apache/camel/component/cmis/CMISConsumer.java
b/components/camel-cmis/src/main/java/org/apache/camel/component/cmis/CMISConsumer.java
index a9e301c..afb7b63 100644
---
a/components/camel-cmis/src/main/java/org/apache/camel/component/cmis/CMISConsumer.java
+++
b/components/camel-cmis/src/main/java/org/apache/camel/component/cmis/CMISConsumer.java
@@ -58,7 +58,7 @@ public class CMISConsumer extends ScheduledPollConsumer {
int sendExchangeWithPropsAndBody(Map<String, Object> properties,
InputStream inputStream)
throws Exception {
- Exchange exchange = getEndpoint().createExchange();
+ Exchange exchange = createExchange(true);
exchange.getIn().setHeaders(properties);
exchange.getIn().setBody(inputStream);
LOG.debug("Polling node: {}", properties.get("cmis:name"));
diff --git
a/components/camel-coap/src/main/java/org/apache/camel/coap/CamelCoapResource.java
b/components/camel-coap/src/main/java/org/apache/camel/coap/CamelCoapResource.java
index a1761be..948513b 100644
---
a/components/camel-coap/src/main/java/org/apache/camel/coap/CamelCoapResource.java
+++
b/components/camel-coap/src/main/java/org/apache/camel/coap/CamelCoapResource.java
@@ -91,7 +91,7 @@ final class CamelCoapResource extends CoapResource {
return;
}
- camelExchange = consumer.getEndpoint().createExchange();
+ camelExchange = consumer.createExchange(false);
consumer.createUoW(camelExchange);
OptionSet options = exchange.getRequest().getOptions();
@@ -144,6 +144,7 @@ final class CamelCoapResource extends CoapResource {
if (camelExchange != null) {
consumer.doneUoW(camelExchange);
}
+ consumer.releaseExchange(camelExchange, false);
}
}
}
diff --git
a/components/camel-cometd/src/main/java/org/apache/camel/component/cometd/CometdConsumer.java
b/components/camel-cometd/src/main/java/org/apache/camel/component/cometd/CometdConsumer.java
index 3d2c498..84d09ab 100644
---
a/components/camel-cometd/src/main/java/org/apache/camel/component/cometd/CometdConsumer.java
+++
b/components/camel-cometd/src/main/java/org/apache/camel/component/cometd/CometdConsumer.java
@@ -98,17 +98,21 @@ public class CometdConsumer extends DefaultConsumer
implements CometdProducerCon
Message message =
binding.createCamelMessage(endpoint.getCamelContext(), remote, cometdMessage,
data);
- Exchange exchange = endpoint.createExchange();
- exchange.setIn(message);
+ Exchange exchange = consumer.createExchange(false);
+ try {
+ exchange.setIn(message);
- consumer.getProcessor().process(exchange);
+ consumer.getProcessor().process(exchange);
- if (ExchangeHelper.isOutCapable(exchange)) {
- ServerChannel channel = getBayeux().getChannel(channelName);
- ServerSession serverSession = getServerSession();
+ if (ExchangeHelper.isOutCapable(exchange)) {
+ ServerChannel channel =
getBayeux().getChannel(channelName);
+ ServerSession serverSession = getServerSession();
- ServerMessage.Mutable outMessage =
binding.createCometdMessage(channel, serverSession, exchange.getOut());
- remote.deliver(serverSession, outMessage);
+ ServerMessage.Mutable outMessage =
binding.createCometdMessage(channel, serverSession, exchange.getOut());
+ remote.deliver(serverSession, outMessage);
+ }
+ } finally {
+ consumer.releaseExchange(exchange, false);
}
}
diff --git
a/components/camel-consul/src/main/java/org/apache/camel/component/consul/endpoint/ConsulEventConsumer.java
b/components/camel-consul/src/main/java/org/apache/camel/component/consul/endpoint/ConsulEventConsumer.java
index 83b6661..25e6aaf 100644
---
a/components/camel-consul/src/main/java/org/apache/camel/component/consul/endpoint/ConsulEventConsumer.java
+++
b/components/camel-consul/src/main/java/org/apache/camel/component/consul/endpoint/ConsulEventConsumer.java
@@ -107,32 +107,34 @@ public final class ConsulEventConsumer extends
AbstractConsulConsumer<EventClien
private void onEvent(Event event) {
LoggerFactory.getLogger(ConsulEventConsumer.this.getClass()).info("{}", event);
- final Exchange exchange = endpoint.createExchange();
- final Message message = exchange.getIn();
-
- message.setHeader(ConsulConstants.CONSUL_KEY, key);
- message.setHeader(ConsulConstants.CONSUL_RESULT, true);
- message.setHeader(ConsulConstants.CONSUL_EVENT_ID, event.getId());
- message.setHeader(ConsulConstants.CONSUL_EVENT_NAME,
event.getName());
- message.setHeader(ConsulConstants.CONSUL_EVENT_LTIME,
event.getLTime());
- message.setHeader(ConsulConstants.CONSUL_VERSION,
event.getVersion());
-
- if (event.getNodeFilter().isPresent()) {
- message.setHeader(ConsulConstants.CONSUL_NODE_FILTER,
event.getNodeFilter().get());
- }
- if (event.getServiceFilter().isPresent()) {
- message.setHeader(ConsulConstants.CONSUL_SERVICE_FILTER,
event.getServiceFilter().get());
- }
- if (event.getTagFilter().isPresent()) {
- message.setHeader(ConsulConstants.CONSUL_TAG_FILTER,
event.getTagFilter().get());
- }
+ final Exchange exchange = createExchange(false);
+ try {
+ final Message message = exchange.getIn();
- message.setBody(event.getPayload().orElse(null));
+ message.setHeader(ConsulConstants.CONSUL_KEY, key);
+ message.setHeader(ConsulConstants.CONSUL_RESULT, true);
+ message.setHeader(ConsulConstants.CONSUL_EVENT_ID,
event.getId());
+ message.setHeader(ConsulConstants.CONSUL_EVENT_NAME,
event.getName());
+ message.setHeader(ConsulConstants.CONSUL_EVENT_LTIME,
event.getLTime());
+ message.setHeader(ConsulConstants.CONSUL_VERSION,
event.getVersion());
+
+ if (event.getNodeFilter().isPresent()) {
+ message.setHeader(ConsulConstants.CONSUL_NODE_FILTER,
event.getNodeFilter().get());
+ }
+ if (event.getServiceFilter().isPresent()) {
+ message.setHeader(ConsulConstants.CONSUL_SERVICE_FILTER,
event.getServiceFilter().get());
+ }
+ if (event.getTagFilter().isPresent()) {
+ message.setHeader(ConsulConstants.CONSUL_TAG_FILTER,
event.getTagFilter().get());
+ }
+
+ message.setBody(event.getPayload().orElse(null));
- try {
getProcessor().process(exchange);
} catch (Exception e) {
getExceptionHandler().handleException("Error processing
exchange", exchange, e);
+ } finally {
+ releaseExchange(exchange, false);
}
}
diff --git
a/components/camel-consul/src/main/java/org/apache/camel/component/consul/endpoint/ConsulKeyValueConsumer.java
b/components/camel-consul/src/main/java/org/apache/camel/component/consul/endpoint/ConsulKeyValueConsumer.java
index a344336..b7b8cb3 100644
---
a/components/camel-consul/src/main/java/org/apache/camel/component/consul/endpoint/ConsulKeyValueConsumer.java
+++
b/components/camel-consul/src/main/java/org/apache/camel/component/consul/endpoint/ConsulKeyValueConsumer.java
@@ -71,27 +71,29 @@ public final class ConsulKeyValueConsumer extends
AbstractConsulConsumer<KeyValu
}
protected void onValue(Value value) {
- final Exchange exchange = endpoint.createExchange();
- final Message message = exchange.getIn();
-
- message.setHeader(ConsulConstants.CONSUL_KEY, value.getKey());
- message.setHeader(ConsulConstants.CONSUL_RESULT, true);
- message.setHeader(ConsulConstants.CONSUL_FLAGS, value.getFlags());
- message.setHeader(ConsulConstants.CONSUL_CREATE_INDEX,
value.getCreateIndex());
- message.setHeader(ConsulConstants.CONSUL_LOCK_INDEX,
value.getLockIndex());
- message.setHeader(ConsulConstants.CONSUL_MODIFY_INDEX,
value.getModifyIndex());
-
- if (value.getSession().isPresent()) {
- message.setHeader(ConsulConstants.CONSUL_SESSION,
value.getSession().get());
- }
+ final Exchange exchange = createExchange(false);
+ try {
+ final Message message = exchange.getIn();
- message.setBody(
- configuration.isValueAsString() ?
value.getValueAsString().orElse(null) : value.getValue().orElse(null));
+ message.setHeader(ConsulConstants.CONSUL_KEY, value.getKey());
+ message.setHeader(ConsulConstants.CONSUL_RESULT, true);
+ message.setHeader(ConsulConstants.CONSUL_FLAGS,
value.getFlags());
+ message.setHeader(ConsulConstants.CONSUL_CREATE_INDEX,
value.getCreateIndex());
+ message.setHeader(ConsulConstants.CONSUL_LOCK_INDEX,
value.getLockIndex());
+ message.setHeader(ConsulConstants.CONSUL_MODIFY_INDEX,
value.getModifyIndex());
+
+ if (value.getSession().isPresent()) {
+ message.setHeader(ConsulConstants.CONSUL_SESSION,
value.getSession().get());
+ }
+
+ message.setBody(
+ configuration.isValueAsString() ?
value.getValueAsString().orElse(null) : value.getValue().orElse(null));
- try {
getProcessor().process(exchange);
} catch (Exception e) {
getExceptionHandler().handleException("Error processing
exchange", exchange, e);
+ } finally {
+ releaseExchange(exchange, false);
}
}
diff --git
a/components/camel-corda/src/main/java/org/apache/camel/component/corda/CordaConsumer.java
b/components/camel-corda/src/main/java/org/apache/camel/component/corda/CordaConsumer.java
index 132723f..d69655f 100644
---
a/components/camel-corda/src/main/java/org/apache/camel/component/corda/CordaConsumer.java
+++
b/components/camel-corda/src/main/java/org/apache/camel/component/corda/CordaConsumer.java
@@ -199,42 +199,42 @@ public class CordaConsumer extends DefaultConsumer {
private void processFlowProcess(String x) {
LOG.debug("processFlowProcess {}", x);
- Exchange exchange = this.getEndpoint().createExchange();
+ Exchange exchange = createExchange(true);
exchange.getIn().setBody(x);
processEvent(exchange);
}
private void processTransactionMappingFeed(StateMachineTransactionMapping
x) {
LOG.debug("processTransactionMappingFeed {}", x);
- Exchange exchange = this.getEndpoint().createExchange();
+ Exchange exchange = createExchange(true);
exchange.getIn().setBody(x);
processEvent(exchange);
}
private void proceedNetworkMapFeed(NetworkMapCache.MapChange x) {
LOG.debug("proceedNetworkMapFeed {}", x);
- Exchange exchange = this.getEndpoint().createExchange();
+ Exchange exchange = createExchange(true);
exchange.getIn().setBody(x);
processEvent(exchange);
}
private void processStateMachineUpdate(StateMachineUpdate x) {
LOG.debug("processStateMachineUpdate {}", x);
- Exchange exchange = this.getEndpoint().createExchange();
+ Exchange exchange = createExchange(true);
exchange.getIn().setBody(x);
processEvent(exchange);
}
private void processContractStateUpdate(Vault.Update<ContractState> x) {
LOG.debug("processContractStateUpdate {}", x);
- Exchange exchange = this.getEndpoint().createExchange();
+ Exchange exchange = createExchange(true);
exchange.getIn().setBody(x);
processEvent(exchange);
}
private void processError(Throwable throwable, String operation) {
LOG.debug("processError for operation: " + operation + " " +
throwable);
- Exchange exchange = this.getEndpoint().createExchange();
+ Exchange exchange = createExchange(true);
exchange.setException(throwable);
processEvent(exchange);
}
@@ -244,7 +244,7 @@ public class CordaConsumer extends DefaultConsumer {
try {
getProcessor().process(exchange);
} catch (Exception e) {
- LOG.error("Error processing event ", e);
+ getExceptionHandler().handleException("Error processing event", e);
}
}
diff --git
a/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseConsumer.java
b/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseConsumer.java
index ab6d5fc..18c2e98 100644
---
a/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseConsumer.java
+++
b/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseConsumer.java
@@ -41,8 +41,8 @@ public class CouchbaseConsumer extends
DefaultScheduledPollConsumer {
private final CouchbaseEndpoint endpoint;
private final Bucket bucket;
+ private final Collection collection;
private ViewOptions viewOptions;
- private Collection collection;
public CouchbaseConsumer(CouchbaseEndpoint endpoint, Bucket client,
Processor processor) {
super(endpoint, processor);
@@ -60,13 +60,10 @@ public class CouchbaseConsumer extends
DefaultScheduledPollConsumer {
} else {
this.collection = client.defaultCollection();
}
- init();
}
@Override
protected void doInit() {
-
- // query.setIncludeDocs(true);
this.viewOptions = ViewOptions.viewOptions();
int limit = endpoint.getLimit();
if (limit > 0) {
@@ -92,13 +89,11 @@ public class CouchbaseConsumer extends
DefaultScheduledPollConsumer {
@Override
protected void doStart() throws Exception {
- LOG.info("Starting Couchbase consumer");
super.doStart();
}
@Override
protected void doStop() throws Exception {
- LOG.info("Stopping Couchbase consumer");
super.doStop();
if (bucket != null) {
bucket.core().shutdown();
@@ -130,34 +125,36 @@ public class CouchbaseConsumer extends
DefaultScheduledPollConsumer {
String designDocumentName = endpoint.getDesignDocumentName();
String viewName = endpoint.getViewName();
- Exchange exchange = endpoint.createExchange();
- exchange.getIn().setBody(doc);
- exchange.getIn().setHeader(HEADER_ID, id);
- exchange.getIn().setHeader(HEADER_KEY, key);
- exchange.getIn().setHeader(HEADER_DESIGN_DOCUMENT_NAME,
designDocumentName);
- exchange.getIn().setHeader(HEADER_VIEWNAME, viewName);
-
- if ("delete".equalsIgnoreCase(consumerProcessedStrategy)) {
- if (LOG.isTraceEnabled()) {
- LOG.trace("Deleting doc with ID {}", id);
- }
-
- collection.remove(id);
- } else if ("filter".equalsIgnoreCase(consumerProcessedStrategy)) {
- if (LOG.isTraceEnabled()) {
- LOG.trace("Filtering out ID {}", id);
+ Exchange exchange = createExchange(false);
+ try {
+ exchange.getIn().setBody(doc);
+ exchange.getIn().setHeader(HEADER_ID, id);
+ exchange.getIn().setHeader(HEADER_KEY, key);
+ exchange.getIn().setHeader(HEADER_DESIGN_DOCUMENT_NAME,
designDocumentName);
+ exchange.getIn().setHeader(HEADER_VIEWNAME, viewName);
+
+ if ("delete".equalsIgnoreCase(consumerProcessedStrategy)) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Deleting doc with ID {}", id);
+ }
+
+ collection.remove(id);
+ } else if
("filter".equalsIgnoreCase(consumerProcessedStrategy)) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Filtering out ID {}", id);
+ }
+ // add filter for already processed docs
+ } else {
+ LOG.trace("No strategy set for already processed docs,
beware of duplicates!");
}
- // add filter for already processed docs
- } else {
- LOG.trace("No strategy set for already processed docs, beware
of duplicates!");
- }
- logDetails(id, doc, key, designDocumentName, viewName, exchange);
+ logDetails(id, doc, key, designDocumentName, viewName,
exchange);
- try {
- this.getProcessor().process(exchange);
+ getProcessor().process(exchange);
} catch (Exception e) {
this.getExceptionHandler().handleException("Error processing
exchange.", exchange, e);
+ } finally {
+ releaseExchange(exchange, false);
}
}
diff --git
a/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbChangesetTracker.java
b/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbChangesetTracker.java
index ac84831..14d145c 100644
---
a/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbChangesetTracker.java
+++
b/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbChangesetTracker.java
@@ -73,7 +73,7 @@ public class CouchDbChangesetTracker implements Runnable {
lastSequence = feed.getSeq();
JsonObject doc = feed.getDoc();
- Exchange exchange =
endpoint.createExchange(lastSequence, feed.getId(), doc, feed.isDeleted());
+ Exchange exchange =
consumer.createExchange(lastSequence, feed.getId(), doc, feed.isDeleted());
if (LOG.isTraceEnabled()) {
LOG.trace("Created exchange [exchange={}, _id={},
seq={}",
new Object[] { exchange, feed.getId(),
lastSequence });
@@ -83,6 +83,8 @@ public class CouchDbChangesetTracker implements Runnable {
consumer.getProcessor().process(exchange);
} catch (Exception e) {
consumer.getExceptionHandler().handleException("Error processing exchange.",
exchange, e);
+ } finally {
+ consumer.releaseExchange(exchange, false);
}
}
@@ -139,7 +141,4 @@ public class CouchDbChangesetTracker implements Runnable {
changes.stop();
}
- public boolean isStopped() {
- return stopped;
- }
}
diff --git
a/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbConsumer.java
b/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbConsumer.java
index 8064873..3a8e478 100644
---
a/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbConsumer.java
+++
b/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbConsumer.java
@@ -18,15 +18,13 @@ package org.apache.camel.component.couchdb;
import java.util.concurrent.ExecutorService;
+import com.google.gson.JsonObject;
+import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.support.DefaultConsumer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
public class CouchDbConsumer extends DefaultConsumer {
- private static final Logger LOG =
LoggerFactory.getLogger(CouchDbConsumer.class);
-
private final CouchDbClientWrapper couchClient;
private final CouchDbEndpoint endpoint;
private ExecutorService executor;
@@ -38,10 +36,20 @@ public class CouchDbConsumer extends DefaultConsumer {
this.endpoint = endpoint;
}
+ public Exchange createExchange(String seq, String id, JsonObject obj,
boolean deleted) {
+ Exchange exchange = createExchange(false);
+ exchange.getIn().setHeader(CouchDbConstants.HEADER_DATABASE,
endpoint.getDatabase());
+ exchange.getIn().setHeader(CouchDbConstants.HEADER_SEQ, seq);
+ exchange.getIn().setHeader(CouchDbConstants.HEADER_DOC_ID, id);
+ exchange.getIn().setHeader(CouchDbConstants.HEADER_DOC_REV,
obj.get("_rev").getAsString());
+ exchange.getIn().setHeader(CouchDbConstants.HEADER_METHOD, deleted ?
"DELETE" : "UPDATE");
+ exchange.getIn().setBody(obj);
+ return exchange;
+ }
+
@Override
protected void doStart() throws Exception {
super.doStart();
- LOG.info("Starting CouchDB consumer");
executor =
endpoint.getCamelContext().getExecutorServiceManager().newFixedThreadPool(this,
endpoint.getEndpointUri(),
1);
@@ -52,7 +60,6 @@ public class CouchDbConsumer extends DefaultConsumer {
@Override
protected void doStop() throws Exception {
super.doStop();
- LOG.info("Stopping CouchDB consumer");
if (task != null) {
task.stop();
}
diff --git
a/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbEndpoint.java
b/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbEndpoint.java
index 1137976..e9c6078 100644
---
a/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbEndpoint.java
+++
b/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbEndpoint.java
@@ -18,10 +18,8 @@ package org.apache.camel.component.couchdb;
import java.net.URI;
-import com.google.gson.JsonObject;
import org.apache.camel.Category;
import org.apache.camel.Consumer;
-import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
import org.apache.camel.spi.Metadata;
@@ -112,17 +110,6 @@ public class CouchDbEndpoint extends DefaultEndpoint {
return new CouchDbProducer(this, createClient());
}
- public Exchange createExchange(String seq, String id, JsonObject obj,
boolean deleted) {
- Exchange exchange = super.createExchange();
- exchange.getIn().setHeader(CouchDbConstants.HEADER_DATABASE, database);
- exchange.getIn().setHeader(CouchDbConstants.HEADER_SEQ, seq);
- exchange.getIn().setHeader(CouchDbConstants.HEADER_DOC_ID, id);
- exchange.getIn().setHeader(CouchDbConstants.HEADER_DOC_REV,
obj.get("_rev").getAsString());
- exchange.getIn().setHeader(CouchDbConstants.HEADER_METHOD, deleted ?
"DELETE" : "UPDATE");
- exchange.getIn().setBody(obj);
- return exchange;
- }
-
protected CouchDbClientWrapper createClient() {
return new CouchDbClientWrapper(
new CouchDbClient(database, createDatabase, protocol,
hostname, port, username, password));
diff --git
a/components/camel-couchdb/src/test/java/org/apache/camel/component/couchdb/CouchDbChangesetTrackerTest.java
b/components/camel-couchdb/src/test/java/org/apache/camel/component/couchdb/CouchDbChangesetTrackerTest.java
index 018f8aa..7ae1c49 100644
---
a/components/camel-couchdb/src/test/java/org/apache/camel/component/couchdb/CouchDbChangesetTrackerTest.java
+++
b/components/camel-couchdb/src/test/java/org/apache/camel/component/couchdb/CouchDbChangesetTrackerTest.java
@@ -93,18 +93,18 @@ public class CouchDbChangesetTrackerTest {
when(row3.getId()).thenReturn("id3");
when(changes.hasNext()).thenReturn(true, true, true, false);
when(changes.next()).thenReturn(row1, row2, row3);
- when(endpoint.createExchange("seq1", "id1", null,
false)).thenReturn(exchange1);
- when(endpoint.createExchange("seq2", "id2", null,
false)).thenReturn(exchange2);
- when(endpoint.createExchange("seq3", "id3", null,
false)).thenReturn(exchange3);
+ when(consumer.createExchange("seq1", "id1", null,
false)).thenReturn(exchange1);
+ when(consumer.createExchange("seq2", "id2", null,
false)).thenReturn(exchange2);
+ when(consumer.createExchange("seq3", "id3", null,
false)).thenReturn(exchange3);
when(consumer.getProcessor()).thenReturn(processor);
tracker.run();
- verify(endpoint).createExchange("seq1", "id1", null, false);
+ verify(consumer).createExchange("seq1", "id1", null, false);
verify(processor).process(exchange1);
- verify(endpoint).createExchange("seq2", "id2", null, false);
+ verify(consumer).createExchange("seq2", "id2", null, false);
verify(processor).process(exchange2);
- verify(endpoint).createExchange("seq3", "id3", null, false);
+ verify(consumer).createExchange("seq3", "id3", null, false);
verify(processor).process(exchange3);
}
@@ -116,7 +116,7 @@ public class CouchDbChangesetTrackerTest {
tracker.run();
- verify(endpoint).createExchange("seq1", "id1", null, false);
+ verify(consumer).createExchange("seq1", "id1", null, false);
verify(processor).process(ArgumentMatchers.isNull());
}
}
diff --git
a/components/camel-couchdb/src/test/java/org/apache/camel/component/couchdb/CouchDbEndpointTest.java
b/components/camel-couchdb/src/test/java/org/apache/camel/component/couchdb/CouchDbEndpointTest.java
index b5d09ea..09a8d78 100644
---
a/components/camel-couchdb/src/test/java/org/apache/camel/component/couchdb/CouchDbEndpointTest.java
+++
b/components/camel-couchdb/src/test/java/org/apache/camel/component/couchdb/CouchDbEndpointTest.java
@@ -16,11 +16,6 @@
*/
package org.apache.camel.component.couchdb;
-import java.util.UUID;
-
-import com.google.gson.JsonObject;
-import org.apache.camel.Exchange;
-import org.apache.camel.impl.DefaultCamelContext;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -30,28 +25,6 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
public class CouchDbEndpointTest {
@Test
- void testCreateCouchExchangeHeadersAreSet() throws Exception {
- try (CouchDbEndpoint endpoint = new CouchDbEndpoint(
- "couchdb:http://localhost/db", "http://localhost/db", new
CouchDbComponent(new DefaultCamelContext()))) {
-
- String id = UUID.randomUUID().toString();
- String rev = UUID.randomUUID().toString();
- String seq = "seq123";
-
- JsonObject doc = new JsonObject();
- doc.addProperty("_id", id);
- doc.addProperty("_rev", rev);
-
- Exchange exchange = endpoint.createExchange(seq, id, doc, false);
- assertEquals(id,
exchange.getIn().getHeader(CouchDbConstants.HEADER_DOC_ID));
- assertEquals(rev,
exchange.getIn().getHeader(CouchDbConstants.HEADER_DOC_REV));
- assertEquals(seq,
exchange.getIn().getHeader(CouchDbConstants.HEADER_SEQ));
- assertEquals("UPDATE",
exchange.getIn().getHeader(CouchDbConstants.HEADER_METHOD));
- assertEquals("db",
exchange.getIn().getHeader(CouchDbConstants.HEADER_DATABASE));
- }
- }
-
- @Test
void assertSingleton() throws Exception {
try (CouchDbEndpoint endpoint
= new CouchDbEndpoint("couchdb:http://localhost/db",
"http://localhost/db", new CouchDbComponent())) {