This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch exchange-factory
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/exchange-factory by this push:
new 401890f CAMEL-16222: PooledExchangeFactory experiment
401890f is described below
commit 401890f0696e15b0646206f5dc19e387651152aa
Author: Claus Ibsen <[email protected]>
AuthorDate: Mon Feb 22 09:38:50 2021 +0100
CAMEL-16222: PooledExchangeFactory experiment
---
.../apache/camel/component/nats/NatsConsumer.java | 40 ++++++++++++----------
.../camel/component/nitrite/NitriteConsumer.java | 3 +-
.../apache/camel/component/nsq/NsqConsumer.java | 13 ++++---
.../camel/oaipmh/handler/AbstractHandler.java | 8 +++--
.../org/apache/camel/oaipmh/handler/Harvester.java | 8 ++---
.../component/optaplanner/OptaPlannerConsumer.java | 8 ++---
6 files changed, 44 insertions(+), 36 deletions(-)
diff --git
a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java
b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java
index 11b15d7..0c78677 100644
---
a/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java
+++
b/components/camel-nats/src/main/java/org/apache/camel/component/nats/NatsConsumer.java
@@ -153,30 +153,32 @@ public class NatsConsumer extends DefaultConsumer {
@Override
public void onMessage(Message msg) throws InterruptedException {
LOG.debug("Received Message: {}", msg);
- Exchange exchange = getEndpoint().createExchange();
- exchange.getIn().setBody(msg.getData());
- exchange.getIn().setHeader(NatsConstants.NATS_REPLY_TO,
msg.getReplyTo());
- exchange.getIn().setHeader(NatsConstants.NATS_SID,
msg.getSID());
- exchange.getIn().setHeader(NatsConstants.NATS_SUBJECT,
msg.getSubject());
- exchange.getIn().setHeader(NatsConstants.NATS_QUEUE_NAME,
msg.getSubscription().getQueueName());
-
exchange.getIn().setHeader(NatsConstants.NATS_MESSAGE_TIMESTAMP,
System.currentTimeMillis());
+ Exchange exchange = createExchange(false);
try {
+ exchange.getIn().setBody(msg.getData());
+ exchange.getIn().setHeader(NatsConstants.NATS_REPLY_TO,
msg.getReplyTo());
+ exchange.getIn().setHeader(NatsConstants.NATS_SID,
msg.getSID());
+ exchange.getIn().setHeader(NatsConstants.NATS_SUBJECT,
msg.getSubject());
+ exchange.getIn().setHeader(NatsConstants.NATS_QUEUE_NAME,
msg.getSubscription().getQueueName());
+
exchange.getIn().setHeader(NatsConstants.NATS_MESSAGE_TIMESTAMP,
System.currentTimeMillis());
+
processor.process(exchange);
- } catch (Exception e) {
- getExceptionHandler().handleException("Error during
processing", exchange, e);
- }
- // is there a reply?
- if (!configuration.isReplyToDisabled()
- && msg.getReplyTo() != null && msg.getConnection() !=
null) {
- Connection con = msg.getConnection();
- byte[] data = exchange.getMessage().getBody(byte[].class);
- if (data != null) {
- LOG.debug("Publishing replyTo: {} message",
msg.getReplyTo());
- con.publish(msg.getReplyTo(), data);
+ // is there a reply?
+ if (!configuration.isReplyToDisabled()
+ && msg.getReplyTo() != null && msg.getConnection()
!= null) {
+ Connection con = msg.getConnection();
+ byte[] data =
exchange.getMessage().getBody(byte[].class);
+ if (data != null) {
+ LOG.debug("Publishing replyTo: {} message",
msg.getReplyTo());
+ con.publish(msg.getReplyTo(), data);
+ }
}
+ } catch (Exception e){
+ getExceptionHandler().handleException("Error during
processing", exchange, e);
+ } finally {
+ releaseExchange(exchange, false);
}
-
}
}
}
diff --git
a/components/camel-nitrite/src/main/java/org/apache/camel/component/nitrite/NitriteConsumer.java
b/components/camel-nitrite/src/main/java/org/apache/camel/component/nitrite/NitriteConsumer.java
index a9fd003..8742460 100644
---
a/components/camel-nitrite/src/main/java/org/apache/camel/component/nitrite/NitriteConsumer.java
+++
b/components/camel-nitrite/src/main/java/org/apache/camel/component/nitrite/NitriteConsumer.java
@@ -61,7 +61,7 @@ public class NitriteConsumer extends DefaultConsumer {
@Override
public void onChange(ChangeInfo changeInfo) {
for (ChangedItem changedItem : changeInfo.getChangedItems()) {
- Exchange exchange = endpoint.createExchange();
+ Exchange exchange = createExchange(false);
Message message = exchange.getMessage();
message.setHeader(NitriteConstants.CHANGE_TIMESTAMP,
changedItem.getChangeTimestamp());
message.setHeader(NitriteConstants.CHANGE_TYPE,
changedItem.getChangeType());
@@ -75,6 +75,7 @@ public class NitriteConsumer extends DefaultConsumer {
if (exchange.getException() != null) {
getExceptionHandler().handleException("Error
processing exchange", exchange, exchange.getException());
}
+ releaseExchange(exchange, false);
}
}
}
diff --git
a/components/camel-nsq/src/main/java/org/apache/camel/component/nsq/NsqConsumer.java
b/components/camel-nsq/src/main/java/org/apache/camel/component/nsq/NsqConsumer.java
index 10c44fb..04b9514 100644
---
a/components/camel-nsq/src/main/java/org/apache/camel/component/nsq/NsqConsumer.java
+++
b/components/camel-nsq/src/main/java/org/apache/camel/component/nsq/NsqConsumer.java
@@ -111,12 +111,13 @@ public class NsqConsumer extends DefaultConsumer {
@Override
public void message(NSQMessage msg) {
LOG.debug("Received Message: {}", msg);
- Exchange exchange =
getEndpoint().createExchange(ExchangePattern.InOnly);
- exchange.getIn().setBody(msg.getMessage());
- exchange.getIn().setHeader(NsqConstants.NSQ_MESSAGE_ID,
msg.getId());
- exchange.getIn().setHeader(NsqConstants.NSQ_MESSAGE_ATTEMPTS,
msg.getAttempts());
- exchange.getIn().setHeader(NsqConstants.NSQ_MESSAGE_TIMESTAMP,
msg.getTimestamp());
+ Exchange exchange = createExchange(false);
try {
+ exchange.setPattern(ExchangePattern.InOnly);
+ exchange.getIn().setBody(msg.getMessage());
+ exchange.getIn().setHeader(NsqConstants.NSQ_MESSAGE_ID,
msg.getId());
+ exchange.getIn().setHeader(NsqConstants.NSQ_MESSAGE_ATTEMPTS,
msg.getAttempts());
+ exchange.getIn().setHeader(NsqConstants.NSQ_MESSAGE_TIMESTAMP,
msg.getTimestamp());
if (configuration.getAutoFinish()) {
msg.finished();
} else {
@@ -129,6 +130,8 @@ public class NsqConsumer extends DefaultConsumer {
msg.requeue((int) configuration.getRequeueInterval());
}
getExceptionHandler().handleException("Error during
processing", exchange, e);
+ } finally {
+ releaseExchange(exchange, false);
}
}
}
diff --git
a/components/camel-oaipmh/src/main/java/org/apache/camel/oaipmh/handler/AbstractHandler.java
b/components/camel-oaipmh/src/main/java/org/apache/camel/oaipmh/handler/AbstractHandler.java
index 88cfd7d..c204028 100644
---
a/components/camel-oaipmh/src/main/java/org/apache/camel/oaipmh/handler/AbstractHandler.java
+++
b/components/camel-oaipmh/src/main/java/org/apache/camel/oaipmh/handler/AbstractHandler.java
@@ -16,6 +16,7 @@
*/
package org.apache.camel.oaipmh.handler;
+import org.apache.camel.Consumer;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
@@ -30,18 +31,20 @@ public abstract class AbstractHandler {
private static final Logger LOG =
LoggerFactory.getLogger(AbstractHandler.class);
+ protected final Consumer consumer;
protected final Endpoint endpoint;
protected final Processor processor;
protected final ExceptionHandler exceptionHandler;
public AbstractHandler(OAIPMHConsumer consumer) {
+ this.consumer = consumer;
this.endpoint = consumer.getEndpoint();
this.processor = consumer.getAsyncProcessor();
this.exceptionHandler = consumer.getExceptionHandler();
}
protected void send(OAIPMHResponse message) {
- Exchange exchange = endpoint.createExchange();
+ Exchange exchange = consumer.createExchange(false);
String xml = message.getRawResponse();
exchange.getIn().setBody(xml);
try {
@@ -49,12 +52,13 @@ public abstract class AbstractHandler {
LOG.trace("sending exchange: {}", exchange);
processor.process(exchange);
} catch (Exception e) {
- throw new RuntimeCamelException("Error sending exchange: " +
exchange, e);
+ exchange.setException(e);
} finally {
// log exception if an exception occurred and was not handled
if (exchange.getException() != null) {
exceptionHandler.handleException("Error processing exchange",
exchange, exchange.getException());
}
+ consumer.releaseExchange(exchange, false);
}
}
diff --git
a/components/camel-oaipmh/src/main/java/org/apache/camel/oaipmh/handler/Harvester.java
b/components/camel-oaipmh/src/main/java/org/apache/camel/oaipmh/handler/Harvester.java
index 494b426..84a795b 100644
---
a/components/camel-oaipmh/src/main/java/org/apache/camel/oaipmh/handler/Harvester.java
+++
b/components/camel-oaipmh/src/main/java/org/apache/camel/oaipmh/handler/Harvester.java
@@ -69,7 +69,7 @@ public class Harvester {
}
- private boolean harvest() throws IOException, URISyntaxException,
ParserConfigurationException, SAXException, Exception {
+ private boolean harvest() throws Exception {
boolean hasNext = false;
if (!this.empty) {
String responseXML = httpClient.doRequest(this.baseURI, this.verb,
this.set, this.from, this.until, this.metadata,
@@ -88,13 +88,11 @@ public class Harvester {
return hasNext;
}
- public void asynHarvest() throws IOException, URISyntaxException,
ParserConfigurationException, SAXException, Exception {
+ public void asynHarvest() throws Exception {
this.harvest();
-
}
- public List<String> synHarvest(boolean onlyFirst)
- throws IOException, URISyntaxException,
ParserConfigurationException, SAXException, Exception {
+ public List<String> synHarvest(boolean onlyFirst) throws Exception {
while (this.harvest()) {
if (onlyFirst) {
break;
diff --git
a/components/camel-optaplanner/src/main/java/org/apache/camel/component/optaplanner/OptaPlannerConsumer.java
b/components/camel-optaplanner/src/main/java/org/apache/camel/component/optaplanner/OptaPlannerConsumer.java
index 3ea6ccf..a4dc33f 100644
---
a/components/camel-optaplanner/src/main/java/org/apache/camel/component/optaplanner/OptaPlannerConsumer.java
+++
b/components/camel-optaplanner/src/main/java/org/apache/camel/component/optaplanner/OptaPlannerConsumer.java
@@ -59,22 +59,22 @@ public class OptaPlannerConsumer extends DefaultConsumer {
}
public void processEvent(BestSolutionChangedEvent<Object> event) {
- Exchange exchange = getEndpoint().createExchange();
+ Exchange exchange = createExchange(true);
exchange.getMessage().setHeader(OptaPlannerConstants.BEST_SOLUTION,
event.getNewBestSolution());
try {
getProcessor().process(exchange);
} catch (Exception e) {
- LOG.error("Error processing event ", e);
+ getExceptionHandler().handleException(e);
}
}
public void processSolverJobEvent(OptaplannerSolutionEvent event) {
- Exchange exchange = getEndpoint().createExchange();
+ Exchange exchange = createExchange(true);
exchange.getMessage().setHeader(OptaPlannerConstants.BEST_SOLUTION,
event.getBestSolution());
try {
getProcessor().process(exchange);
} catch (Exception e) {
- LOG.error("Error processing event ", e);
+ getExceptionHandler().handleException(e);
}
}