This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch exchange-factory in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/exchange-factory by this push: new 0309f6e CAMEL-16222: PooledExchangeFactory experiment 0309f6e is described below commit 0309f6e1b18eac8663e3936d52e3bf443ddf6379 Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Sun Feb 21 13:33:49 2021 +0100 CAMEL-16222: PooledExchangeFactory experiment --- .../component/cassandra/CassandraConsumer.java | 7 +-- .../apache/camel/component/cmis/CMISConsumer.java | 2 +- .../org/apache/camel/coap/CamelCoapResource.java | 3 +- .../camel/component/cometd/CometdConsumer.java | 20 ++++---- .../consul/endpoint/ConsulEventConsumer.java | 44 ++++++++--------- .../consul/endpoint/ConsulKeyValueConsumer.java | 34 ++++++------- .../camel/component/corda/CordaConsumer.java | 14 +++--- .../component/couchbase/CouchbaseConsumer.java | 55 ++++++++++------------ .../component/couchdb/CouchDbChangesetTracker.java | 7 ++- .../camel/component/couchdb/CouchDbConsumer.java | 19 +++++--- .../camel/component/couchdb/CouchDbEndpoint.java | 13 ----- .../couchdb/CouchDbChangesetTrackerTest.java | 14 +++--- .../component/couchdb/CouchDbEndpointTest.java | 27 ----------- 13 files changed, 116 insertions(+), 143 deletions(-) diff --git a/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraConsumer.java b/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraConsumer.java index 07518f5..cc516d8 100644 --- a/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraConsumer.java +++ b/components/camel-cassandraql/src/main/java/org/apache/camel/component/cassandra/CassandraConsumer.java @@ -55,11 +55,11 @@ public class CassandraConsumer extends ScheduledPollConsumer { } // Create message from ResultSet - Exchange exchange = getEndpoint().createExchange(); - Message message = exchange.getIn(); - getEndpoint().fillMessage(resultSet, message); + Exchange exchange = createExchange(false); try { + Message message = exchange.getIn(); + getEndpoint().fillMessage(resultSet, message); // send message to next processor in the route getProcessor().process(exchange); return 1; // number of messages polled @@ -68,6 +68,7 @@ public class CassandraConsumer extends ScheduledPollConsumer { if (exchange.getException() != null) { getExceptionHandler().handleException("Error processing exchange", exchange, exchange.getException()); } + releaseExchange(exchange, false); } } diff --git a/components/camel-cmis/src/main/java/org/apache/camel/component/cmis/CMISConsumer.java b/components/camel-cmis/src/main/java/org/apache/camel/component/cmis/CMISConsumer.java index a9e301c..afb7b63 100644 --- a/components/camel-cmis/src/main/java/org/apache/camel/component/cmis/CMISConsumer.java +++ b/components/camel-cmis/src/main/java/org/apache/camel/component/cmis/CMISConsumer.java @@ -58,7 +58,7 @@ public class CMISConsumer extends ScheduledPollConsumer { int sendExchangeWithPropsAndBody(Map<String, Object> properties, InputStream inputStream) throws Exception { - Exchange exchange = getEndpoint().createExchange(); + Exchange exchange = createExchange(true); exchange.getIn().setHeaders(properties); exchange.getIn().setBody(inputStream); LOG.debug("Polling node: {}", properties.get("cmis:name")); diff --git a/components/camel-coap/src/main/java/org/apache/camel/coap/CamelCoapResource.java b/components/camel-coap/src/main/java/org/apache/camel/coap/CamelCoapResource.java index a1761be..948513b 100644 --- a/components/camel-coap/src/main/java/org/apache/camel/coap/CamelCoapResource.java +++ b/components/camel-coap/src/main/java/org/apache/camel/coap/CamelCoapResource.java @@ -91,7 +91,7 @@ final class CamelCoapResource extends CoapResource { return; } - camelExchange = consumer.getEndpoint().createExchange(); + camelExchange = consumer.createExchange(false); consumer.createUoW(camelExchange); OptionSet options = exchange.getRequest().getOptions(); @@ -144,6 +144,7 @@ final class CamelCoapResource extends CoapResource { if (camelExchange != null) { consumer.doneUoW(camelExchange); } + consumer.releaseExchange(camelExchange, false); } } } diff --git a/components/camel-cometd/src/main/java/org/apache/camel/component/cometd/CometdConsumer.java b/components/camel-cometd/src/main/java/org/apache/camel/component/cometd/CometdConsumer.java index 3d2c498..84d09ab 100644 --- a/components/camel-cometd/src/main/java/org/apache/camel/component/cometd/CometdConsumer.java +++ b/components/camel-cometd/src/main/java/org/apache/camel/component/cometd/CometdConsumer.java @@ -98,17 +98,21 @@ public class CometdConsumer extends DefaultConsumer implements CometdProducerCon Message message = binding.createCamelMessage(endpoint.getCamelContext(), remote, cometdMessage, data); - Exchange exchange = endpoint.createExchange(); - exchange.setIn(message); + Exchange exchange = consumer.createExchange(false); + try { + exchange.setIn(message); - consumer.getProcessor().process(exchange); + consumer.getProcessor().process(exchange); - if (ExchangeHelper.isOutCapable(exchange)) { - ServerChannel channel = getBayeux().getChannel(channelName); - ServerSession serverSession = getServerSession(); + if (ExchangeHelper.isOutCapable(exchange)) { + ServerChannel channel = getBayeux().getChannel(channelName); + ServerSession serverSession = getServerSession(); - ServerMessage.Mutable outMessage = binding.createCometdMessage(channel, serverSession, exchange.getOut()); - remote.deliver(serverSession, outMessage); + ServerMessage.Mutable outMessage = binding.createCometdMessage(channel, serverSession, exchange.getOut()); + remote.deliver(serverSession, outMessage); + } + } finally { + consumer.releaseExchange(exchange, false); } } diff --git a/components/camel-consul/src/main/java/org/apache/camel/component/consul/endpoint/ConsulEventConsumer.java b/components/camel-consul/src/main/java/org/apache/camel/component/consul/endpoint/ConsulEventConsumer.java index 83b6661..25e6aaf 100644 --- a/components/camel-consul/src/main/java/org/apache/camel/component/consul/endpoint/ConsulEventConsumer.java +++ b/components/camel-consul/src/main/java/org/apache/camel/component/consul/endpoint/ConsulEventConsumer.java @@ -107,32 +107,34 @@ public final class ConsulEventConsumer extends AbstractConsulConsumer<EventClien private void onEvent(Event event) { LoggerFactory.getLogger(ConsulEventConsumer.this.getClass()).info("{}", event); - final Exchange exchange = endpoint.createExchange(); - final Message message = exchange.getIn(); - - message.setHeader(ConsulConstants.CONSUL_KEY, key); - message.setHeader(ConsulConstants.CONSUL_RESULT, true); - message.setHeader(ConsulConstants.CONSUL_EVENT_ID, event.getId()); - message.setHeader(ConsulConstants.CONSUL_EVENT_NAME, event.getName()); - message.setHeader(ConsulConstants.CONSUL_EVENT_LTIME, event.getLTime()); - message.setHeader(ConsulConstants.CONSUL_VERSION, event.getVersion()); - - if (event.getNodeFilter().isPresent()) { - message.setHeader(ConsulConstants.CONSUL_NODE_FILTER, event.getNodeFilter().get()); - } - if (event.getServiceFilter().isPresent()) { - message.setHeader(ConsulConstants.CONSUL_SERVICE_FILTER, event.getServiceFilter().get()); - } - if (event.getTagFilter().isPresent()) { - message.setHeader(ConsulConstants.CONSUL_TAG_FILTER, event.getTagFilter().get()); - } + final Exchange exchange = createExchange(false); + try { + final Message message = exchange.getIn(); - message.setBody(event.getPayload().orElse(null)); + message.setHeader(ConsulConstants.CONSUL_KEY, key); + message.setHeader(ConsulConstants.CONSUL_RESULT, true); + message.setHeader(ConsulConstants.CONSUL_EVENT_ID, event.getId()); + message.setHeader(ConsulConstants.CONSUL_EVENT_NAME, event.getName()); + message.setHeader(ConsulConstants.CONSUL_EVENT_LTIME, event.getLTime()); + message.setHeader(ConsulConstants.CONSUL_VERSION, event.getVersion()); + + if (event.getNodeFilter().isPresent()) { + message.setHeader(ConsulConstants.CONSUL_NODE_FILTER, event.getNodeFilter().get()); + } + if (event.getServiceFilter().isPresent()) { + message.setHeader(ConsulConstants.CONSUL_SERVICE_FILTER, event.getServiceFilter().get()); + } + if (event.getTagFilter().isPresent()) { + message.setHeader(ConsulConstants.CONSUL_TAG_FILTER, event.getTagFilter().get()); + } + + message.setBody(event.getPayload().orElse(null)); - try { getProcessor().process(exchange); } catch (Exception e) { getExceptionHandler().handleException("Error processing exchange", exchange, e); + } finally { + releaseExchange(exchange, false); } } diff --git a/components/camel-consul/src/main/java/org/apache/camel/component/consul/endpoint/ConsulKeyValueConsumer.java b/components/camel-consul/src/main/java/org/apache/camel/component/consul/endpoint/ConsulKeyValueConsumer.java index a344336..b7b8cb3 100644 --- a/components/camel-consul/src/main/java/org/apache/camel/component/consul/endpoint/ConsulKeyValueConsumer.java +++ b/components/camel-consul/src/main/java/org/apache/camel/component/consul/endpoint/ConsulKeyValueConsumer.java @@ -71,27 +71,29 @@ public final class ConsulKeyValueConsumer extends AbstractConsulConsumer<KeyValu } protected void onValue(Value value) { - final Exchange exchange = endpoint.createExchange(); - final Message message = exchange.getIn(); - - message.setHeader(ConsulConstants.CONSUL_KEY, value.getKey()); - message.setHeader(ConsulConstants.CONSUL_RESULT, true); - message.setHeader(ConsulConstants.CONSUL_FLAGS, value.getFlags()); - message.setHeader(ConsulConstants.CONSUL_CREATE_INDEX, value.getCreateIndex()); - message.setHeader(ConsulConstants.CONSUL_LOCK_INDEX, value.getLockIndex()); - message.setHeader(ConsulConstants.CONSUL_MODIFY_INDEX, value.getModifyIndex()); - - if (value.getSession().isPresent()) { - message.setHeader(ConsulConstants.CONSUL_SESSION, value.getSession().get()); - } + final Exchange exchange = createExchange(false); + try { + final Message message = exchange.getIn(); - message.setBody( - configuration.isValueAsString() ? value.getValueAsString().orElse(null) : value.getValue().orElse(null)); + message.setHeader(ConsulConstants.CONSUL_KEY, value.getKey()); + message.setHeader(ConsulConstants.CONSUL_RESULT, true); + message.setHeader(ConsulConstants.CONSUL_FLAGS, value.getFlags()); + message.setHeader(ConsulConstants.CONSUL_CREATE_INDEX, value.getCreateIndex()); + message.setHeader(ConsulConstants.CONSUL_LOCK_INDEX, value.getLockIndex()); + message.setHeader(ConsulConstants.CONSUL_MODIFY_INDEX, value.getModifyIndex()); + + if (value.getSession().isPresent()) { + message.setHeader(ConsulConstants.CONSUL_SESSION, value.getSession().get()); + } + + message.setBody( + configuration.isValueAsString() ? value.getValueAsString().orElse(null) : value.getValue().orElse(null)); - try { getProcessor().process(exchange); } catch (Exception e) { getExceptionHandler().handleException("Error processing exchange", exchange, e); + } finally { + releaseExchange(exchange, false); } } diff --git a/components/camel-corda/src/main/java/org/apache/camel/component/corda/CordaConsumer.java b/components/camel-corda/src/main/java/org/apache/camel/component/corda/CordaConsumer.java index 132723f..d69655f 100644 --- a/components/camel-corda/src/main/java/org/apache/camel/component/corda/CordaConsumer.java +++ b/components/camel-corda/src/main/java/org/apache/camel/component/corda/CordaConsumer.java @@ -199,42 +199,42 @@ public class CordaConsumer extends DefaultConsumer { private void processFlowProcess(String x) { LOG.debug("processFlowProcess {}", x); - Exchange exchange = this.getEndpoint().createExchange(); + Exchange exchange = createExchange(true); exchange.getIn().setBody(x); processEvent(exchange); } private void processTransactionMappingFeed(StateMachineTransactionMapping x) { LOG.debug("processTransactionMappingFeed {}", x); - Exchange exchange = this.getEndpoint().createExchange(); + Exchange exchange = createExchange(true); exchange.getIn().setBody(x); processEvent(exchange); } private void proceedNetworkMapFeed(NetworkMapCache.MapChange x) { LOG.debug("proceedNetworkMapFeed {}", x); - Exchange exchange = this.getEndpoint().createExchange(); + Exchange exchange = createExchange(true); exchange.getIn().setBody(x); processEvent(exchange); } private void processStateMachineUpdate(StateMachineUpdate x) { LOG.debug("processStateMachineUpdate {}", x); - Exchange exchange = this.getEndpoint().createExchange(); + Exchange exchange = createExchange(true); exchange.getIn().setBody(x); processEvent(exchange); } private void processContractStateUpdate(Vault.Update<ContractState> x) { LOG.debug("processContractStateUpdate {}", x); - Exchange exchange = this.getEndpoint().createExchange(); + Exchange exchange = createExchange(true); exchange.getIn().setBody(x); processEvent(exchange); } private void processError(Throwable throwable, String operation) { LOG.debug("processError for operation: " + operation + " " + throwable); - Exchange exchange = this.getEndpoint().createExchange(); + Exchange exchange = createExchange(true); exchange.setException(throwable); processEvent(exchange); } @@ -244,7 +244,7 @@ public class CordaConsumer extends DefaultConsumer { try { getProcessor().process(exchange); } catch (Exception e) { - LOG.error("Error processing event ", e); + getExceptionHandler().handleException("Error processing event", e); } } diff --git a/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseConsumer.java b/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseConsumer.java index ab6d5fc..18c2e98 100644 --- a/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseConsumer.java +++ b/components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseConsumer.java @@ -41,8 +41,8 @@ public class CouchbaseConsumer extends DefaultScheduledPollConsumer { private final CouchbaseEndpoint endpoint; private final Bucket bucket; + private final Collection collection; private ViewOptions viewOptions; - private Collection collection; public CouchbaseConsumer(CouchbaseEndpoint endpoint, Bucket client, Processor processor) { super(endpoint, processor); @@ -60,13 +60,10 @@ public class CouchbaseConsumer extends DefaultScheduledPollConsumer { } else { this.collection = client.defaultCollection(); } - init(); } @Override protected void doInit() { - - // query.setIncludeDocs(true); this.viewOptions = ViewOptions.viewOptions(); int limit = endpoint.getLimit(); if (limit > 0) { @@ -92,13 +89,11 @@ public class CouchbaseConsumer extends DefaultScheduledPollConsumer { @Override protected void doStart() throws Exception { - LOG.info("Starting Couchbase consumer"); super.doStart(); } @Override protected void doStop() throws Exception { - LOG.info("Stopping Couchbase consumer"); super.doStop(); if (bucket != null) { bucket.core().shutdown(); @@ -130,34 +125,36 @@ public class CouchbaseConsumer extends DefaultScheduledPollConsumer { String designDocumentName = endpoint.getDesignDocumentName(); String viewName = endpoint.getViewName(); - Exchange exchange = endpoint.createExchange(); - exchange.getIn().setBody(doc); - exchange.getIn().setHeader(HEADER_ID, id); - exchange.getIn().setHeader(HEADER_KEY, key); - exchange.getIn().setHeader(HEADER_DESIGN_DOCUMENT_NAME, designDocumentName); - exchange.getIn().setHeader(HEADER_VIEWNAME, viewName); - - if ("delete".equalsIgnoreCase(consumerProcessedStrategy)) { - if (LOG.isTraceEnabled()) { - LOG.trace("Deleting doc with ID {}", id); - } - - collection.remove(id); - } else if ("filter".equalsIgnoreCase(consumerProcessedStrategy)) { - if (LOG.isTraceEnabled()) { - LOG.trace("Filtering out ID {}", id); + Exchange exchange = createExchange(false); + try { + exchange.getIn().setBody(doc); + exchange.getIn().setHeader(HEADER_ID, id); + exchange.getIn().setHeader(HEADER_KEY, key); + exchange.getIn().setHeader(HEADER_DESIGN_DOCUMENT_NAME, designDocumentName); + exchange.getIn().setHeader(HEADER_VIEWNAME, viewName); + + if ("delete".equalsIgnoreCase(consumerProcessedStrategy)) { + if (LOG.isTraceEnabled()) { + LOG.trace("Deleting doc with ID {}", id); + } + + collection.remove(id); + } else if ("filter".equalsIgnoreCase(consumerProcessedStrategy)) { + if (LOG.isTraceEnabled()) { + LOG.trace("Filtering out ID {}", id); + } + // add filter for already processed docs + } else { + LOG.trace("No strategy set for already processed docs, beware of duplicates!"); } - // add filter for already processed docs - } else { - LOG.trace("No strategy set for already processed docs, beware of duplicates!"); - } - logDetails(id, doc, key, designDocumentName, viewName, exchange); + logDetails(id, doc, key, designDocumentName, viewName, exchange); - try { - this.getProcessor().process(exchange); + getProcessor().process(exchange); } catch (Exception e) { this.getExceptionHandler().handleException("Error processing exchange.", exchange, e); + } finally { + releaseExchange(exchange, false); } } diff --git a/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbChangesetTracker.java b/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbChangesetTracker.java index ac84831..14d145c 100644 --- a/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbChangesetTracker.java +++ b/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbChangesetTracker.java @@ -73,7 +73,7 @@ public class CouchDbChangesetTracker implements Runnable { lastSequence = feed.getSeq(); JsonObject doc = feed.getDoc(); - Exchange exchange = endpoint.createExchange(lastSequence, feed.getId(), doc, feed.isDeleted()); + Exchange exchange = consumer.createExchange(lastSequence, feed.getId(), doc, feed.isDeleted()); if (LOG.isTraceEnabled()) { LOG.trace("Created exchange [exchange={}, _id={}, seq={}", new Object[] { exchange, feed.getId(), lastSequence }); @@ -83,6 +83,8 @@ public class CouchDbChangesetTracker implements Runnable { consumer.getProcessor().process(exchange); } catch (Exception e) { consumer.getExceptionHandler().handleException("Error processing exchange.", exchange, e); + } finally { + consumer.releaseExchange(exchange, false); } } @@ -139,7 +141,4 @@ public class CouchDbChangesetTracker implements Runnable { changes.stop(); } - public boolean isStopped() { - return stopped; - } } diff --git a/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbConsumer.java b/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbConsumer.java index 8064873..3a8e478 100644 --- a/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbConsumer.java +++ b/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbConsumer.java @@ -18,15 +18,13 @@ package org.apache.camel.component.couchdb; import java.util.concurrent.ExecutorService; +import com.google.gson.JsonObject; +import org.apache.camel.Exchange; import org.apache.camel.Processor; import org.apache.camel.support.DefaultConsumer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public class CouchDbConsumer extends DefaultConsumer { - private static final Logger LOG = LoggerFactory.getLogger(CouchDbConsumer.class); - private final CouchDbClientWrapper couchClient; private final CouchDbEndpoint endpoint; private ExecutorService executor; @@ -38,10 +36,20 @@ public class CouchDbConsumer extends DefaultConsumer { this.endpoint = endpoint; } + public Exchange createExchange(String seq, String id, JsonObject obj, boolean deleted) { + Exchange exchange = createExchange(false); + exchange.getIn().setHeader(CouchDbConstants.HEADER_DATABASE, endpoint.getDatabase()); + exchange.getIn().setHeader(CouchDbConstants.HEADER_SEQ, seq); + exchange.getIn().setHeader(CouchDbConstants.HEADER_DOC_ID, id); + exchange.getIn().setHeader(CouchDbConstants.HEADER_DOC_REV, obj.get("_rev").getAsString()); + exchange.getIn().setHeader(CouchDbConstants.HEADER_METHOD, deleted ? "DELETE" : "UPDATE"); + exchange.getIn().setBody(obj); + return exchange; + } + @Override protected void doStart() throws Exception { super.doStart(); - LOG.info("Starting CouchDB consumer"); executor = endpoint.getCamelContext().getExecutorServiceManager().newFixedThreadPool(this, endpoint.getEndpointUri(), 1); @@ -52,7 +60,6 @@ public class CouchDbConsumer extends DefaultConsumer { @Override protected void doStop() throws Exception { super.doStop(); - LOG.info("Stopping CouchDB consumer"); if (task != null) { task.stop(); } diff --git a/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbEndpoint.java b/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbEndpoint.java index 1137976..e9c6078 100644 --- a/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbEndpoint.java +++ b/components/camel-couchdb/src/main/java/org/apache/camel/component/couchdb/CouchDbEndpoint.java @@ -18,10 +18,8 @@ package org.apache.camel.component.couchdb; import java.net.URI; -import com.google.gson.JsonObject; import org.apache.camel.Category; import org.apache.camel.Consumer; -import org.apache.camel.Exchange; import org.apache.camel.Processor; import org.apache.camel.Producer; import org.apache.camel.spi.Metadata; @@ -112,17 +110,6 @@ public class CouchDbEndpoint extends DefaultEndpoint { return new CouchDbProducer(this, createClient()); } - public Exchange createExchange(String seq, String id, JsonObject obj, boolean deleted) { - Exchange exchange = super.createExchange(); - exchange.getIn().setHeader(CouchDbConstants.HEADER_DATABASE, database); - exchange.getIn().setHeader(CouchDbConstants.HEADER_SEQ, seq); - exchange.getIn().setHeader(CouchDbConstants.HEADER_DOC_ID, id); - exchange.getIn().setHeader(CouchDbConstants.HEADER_DOC_REV, obj.get("_rev").getAsString()); - exchange.getIn().setHeader(CouchDbConstants.HEADER_METHOD, deleted ? "DELETE" : "UPDATE"); - exchange.getIn().setBody(obj); - return exchange; - } - protected CouchDbClientWrapper createClient() { return new CouchDbClientWrapper( new CouchDbClient(database, createDatabase, protocol, hostname, port, username, password)); diff --git a/components/camel-couchdb/src/test/java/org/apache/camel/component/couchdb/CouchDbChangesetTrackerTest.java b/components/camel-couchdb/src/test/java/org/apache/camel/component/couchdb/CouchDbChangesetTrackerTest.java index 018f8aa..7ae1c49 100644 --- a/components/camel-couchdb/src/test/java/org/apache/camel/component/couchdb/CouchDbChangesetTrackerTest.java +++ b/components/camel-couchdb/src/test/java/org/apache/camel/component/couchdb/CouchDbChangesetTrackerTest.java @@ -93,18 +93,18 @@ public class CouchDbChangesetTrackerTest { when(row3.getId()).thenReturn("id3"); when(changes.hasNext()).thenReturn(true, true, true, false); when(changes.next()).thenReturn(row1, row2, row3); - when(endpoint.createExchange("seq1", "id1", null, false)).thenReturn(exchange1); - when(endpoint.createExchange("seq2", "id2", null, false)).thenReturn(exchange2); - when(endpoint.createExchange("seq3", "id3", null, false)).thenReturn(exchange3); + when(consumer.createExchange("seq1", "id1", null, false)).thenReturn(exchange1); + when(consumer.createExchange("seq2", "id2", null, false)).thenReturn(exchange2); + when(consumer.createExchange("seq3", "id3", null, false)).thenReturn(exchange3); when(consumer.getProcessor()).thenReturn(processor); tracker.run(); - verify(endpoint).createExchange("seq1", "id1", null, false); + verify(consumer).createExchange("seq1", "id1", null, false); verify(processor).process(exchange1); - verify(endpoint).createExchange("seq2", "id2", null, false); + verify(consumer).createExchange("seq2", "id2", null, false); verify(processor).process(exchange2); - verify(endpoint).createExchange("seq3", "id3", null, false); + verify(consumer).createExchange("seq3", "id3", null, false); verify(processor).process(exchange3); } @@ -116,7 +116,7 @@ public class CouchDbChangesetTrackerTest { tracker.run(); - verify(endpoint).createExchange("seq1", "id1", null, false); + verify(consumer).createExchange("seq1", "id1", null, false); verify(processor).process(ArgumentMatchers.isNull()); } } diff --git a/components/camel-couchdb/src/test/java/org/apache/camel/component/couchdb/CouchDbEndpointTest.java b/components/camel-couchdb/src/test/java/org/apache/camel/component/couchdb/CouchDbEndpointTest.java index b5d09ea..09a8d78 100644 --- a/components/camel-couchdb/src/test/java/org/apache/camel/component/couchdb/CouchDbEndpointTest.java +++ b/components/camel-couchdb/src/test/java/org/apache/camel/component/couchdb/CouchDbEndpointTest.java @@ -16,11 +16,6 @@ */ package org.apache.camel.component.couchdb; -import java.util.UUID; - -import com.google.gson.JsonObject; -import org.apache.camel.Exchange; -import org.apache.camel.impl.DefaultCamelContext; import org.junit.jupiter.api.Test; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -30,28 +25,6 @@ import static org.junit.jupiter.api.Assertions.assertTrue; public class CouchDbEndpointTest { @Test - void testCreateCouchExchangeHeadersAreSet() throws Exception { - try (CouchDbEndpoint endpoint = new CouchDbEndpoint( - "couchdb:http://localhost/db", "http://localhost/db", new CouchDbComponent(new DefaultCamelContext()))) { - - String id = UUID.randomUUID().toString(); - String rev = UUID.randomUUID().toString(); - String seq = "seq123"; - - JsonObject doc = new JsonObject(); - doc.addProperty("_id", id); - doc.addProperty("_rev", rev); - - Exchange exchange = endpoint.createExchange(seq, id, doc, false); - assertEquals(id, exchange.getIn().getHeader(CouchDbConstants.HEADER_DOC_ID)); - assertEquals(rev, exchange.getIn().getHeader(CouchDbConstants.HEADER_DOC_REV)); - assertEquals(seq, exchange.getIn().getHeader(CouchDbConstants.HEADER_SEQ)); - assertEquals("UPDATE", exchange.getIn().getHeader(CouchDbConstants.HEADER_METHOD)); - assertEquals("db", exchange.getIn().getHeader(CouchDbConstants.HEADER_DATABASE)); - } - } - - @Test void assertSingleton() throws Exception { try (CouchDbEndpoint endpoint = new CouchDbEndpoint("couchdb:http://localhost/db", "http://localhost/db", new CouchDbComponent())) {