This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch exchange-factory
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/exchange-factory by this push:
new 01c91e2 CAMEL-16222: PooledExchangeFactory experiment
01c91e2 is described below
commit 01c91e20e82fc4b7e4f27b53f89874eb10d67bdb
Author: Claus Ibsen <[email protected]>
AuthorDate: Mon Feb 22 07:22:53 2021 +0100
CAMEL-16222: PooledExchangeFactory experiment
---
.../apache/camel/component/jbpm/JBPMConsumer.java | 7 ++--
.../camel/component/jcache/JCacheConsumer.java | 7 +---
.../jclouds/JcloudsBlobStoreConsumer.java | 2 +-
.../camel/component/jclouds/JcloudsConsumer.java | 46 ----------------------
.../camel/component/jcr/EndpointEventListener.java | 12 ++++--
.../apache/camel/component/jcr/JcrConsumer.java | 2 +-
.../jgroups/raft/CamelRoleChangeListener.java | 17 +++++---
.../jgroups/raft/JGroupsRaftConsumer.java | 2 +-
.../jgroups/raft/JGroupsRaftEndpoint.java | 7 ----
.../component/jgroups/CamelJGroupsReceiver.java | 19 ++++++---
.../camel/component/jgroups/JGroupsConsumer.java | 2 +-
.../apache/camel/component/jira/JiraConstants.java | 2 +
.../jira/consumer/NewCommentsConsumer.java | 2 +-
.../component/jira/consumer/NewIssuesConsumer.java | 2 +-
.../jira/consumer/WatchUpdatesConsumer.java | 9 +++--
.../apache/camel/component/jmx/JMXConsumer.java | 4 +-
.../apache/camel/component/jooq/JooqConsumer.java | 2 +-
.../apache/camel/component/jpa/JpaConsumer.java | 25 ++++++++----
.../component/jt400/Jt400DataQueueConsumer.java | 2 +-
.../component/jt400/Jt400MsgQueueConsumer.java | 2 +-
20 files changed, 72 insertions(+), 101 deletions(-)
diff --git
a/components/camel-jbpm/src/main/java/org/apache/camel/component/jbpm/JBPMConsumer.java
b/components/camel-jbpm/src/main/java/org/apache/camel/component/jbpm/JBPMConsumer.java
index 343bfc7..a8d6c59 100644
---
a/components/camel-jbpm/src/main/java/org/apache/camel/component/jbpm/JBPMConsumer.java
+++
b/components/camel-jbpm/src/main/java/org/apache/camel/component/jbpm/JBPMConsumer.java
@@ -19,7 +19,6 @@ package org.apache.camel.component.jbpm;
import org.apache.camel.AsyncCallback;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
-import org.apache.camel.ExchangePattern;
import org.apache.camel.Processor;
import org.apache.camel.component.jbpm.emitters.CamelEventEmitter;
import org.apache.camel.component.jbpm.listeners.CamelCaseEventListener;
@@ -98,7 +97,7 @@ public class JBPMConsumer extends DefaultConsumer implements
DeploymentEventList
}
public void sendMessage(String eventType, Object body) {
- Exchange exchange =
getEndpoint().createExchange(ExchangePattern.InOnly);
+ Exchange exchange = createExchange(false);
exchange.getIn().setHeader("EventType", eventType);
exchange.getIn().setBody(body);
@@ -111,6 +110,7 @@ public class JBPMConsumer extends DefaultConsumer
implements DeploymentEventList
if (exchange.getException() != null) {
getExceptionHandler().handleException("Error
processing exchange", exchange, exchange.getException());
}
+ releaseExchange(exchange, false);
}
});
} else {
@@ -124,6 +124,7 @@ public class JBPMConsumer extends DefaultConsumer
implements DeploymentEventList
if (exchange.getException() != null) {
getExceptionHandler().handleException("Error processing
exchange", exchange, exchange.getException());
}
+ releaseExchange(exchange, false);
}
}
@@ -131,7 +132,6 @@ public class JBPMConsumer extends DefaultConsumer
implements DeploymentEventList
public void onDeploy(DeploymentEvent event) {
InternalRuntimeManager manager = (InternalRuntimeManager)
event.getDeployedUnit().getRuntimeManager();
configure(manager, this);
-
}
@Override
@@ -156,7 +156,6 @@ public class JBPMConsumer extends DefaultConsumer
implements DeploymentEventList
}
configureConsumer(eventListenerType, manager, consumer);
-
}
protected void configureConsumer(String eventListenerType,
InternalRuntimeManager manager, JBPMConsumer consumer) {
diff --git
a/components/camel-jcache/src/main/java/org/apache/camel/component/jcache/JCacheConsumer.java
b/components/camel-jcache/src/main/java/org/apache/camel/component/jcache/JCacheConsumer.java
index 327e242..544a562 100644
---
a/components/camel-jcache/src/main/java/org/apache/camel/component/jcache/JCacheConsumer.java
+++
b/components/camel-jcache/src/main/java/org/apache/camel/component/jcache/JCacheConsumer.java
@@ -28,14 +28,11 @@ import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.Processor;
import org.apache.camel.support.DefaultConsumer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* The JCache consumer.
*/
public class JCacheConsumer extends DefaultConsumer {
- private static final Logger LOG =
LoggerFactory.getLogger(JCacheConsumer.class);
private CacheEntryListenerConfiguration<Object, Object>
entryListenerConfiguration;
@@ -83,7 +80,7 @@ public class JCacheConsumer extends DefaultConsumer {
@Override
protected void
onEvents(Iterable<CacheEntryEvent<?, ?>> events) {
for (CacheEntryEvent<?, ?> event : events) {
- Exchange exchange =
getEndpoint().createExchange();
+ Exchange exchange = createExchange(true);
Message message = exchange.getIn();
message.setHeader(JCacheConstants.EVENT_TYPE, event.getEventType().name());
message.setHeader(JCacheConstants.KEY,
event.getKey());
@@ -96,7 +93,7 @@ public class JCacheConsumer extends DefaultConsumer {
try {
getProcessor().process(exchange);
} catch (Exception e) {
- LOG.error("Error processing event ",
e);
+
getExceptionHandler().handleException(e);
}
}
}
diff --git
a/components/camel-jclouds/src/main/java/org/apache/camel/component/jclouds/JcloudsBlobStoreConsumer.java
b/components/camel-jclouds/src/main/java/org/apache/camel/component/jclouds/JcloudsBlobStoreConsumer.java
index 899d323..7812704 100644
---
a/components/camel-jclouds/src/main/java/org/apache/camel/component/jclouds/JcloudsBlobStoreConsumer.java
+++
b/components/camel-jclouds/src/main/java/org/apache/camel/component/jclouds/JcloudsBlobStoreConsumer.java
@@ -75,7 +75,7 @@ public class JcloudsBlobStoreConsumer extends
ScheduledBatchPollingConsumer {
if (!Strings.isNullOrEmpty(blobName)) {
InputStream body =
JcloudsBlobStoreHelper.readBlob(blobStore, container, blobName);
if (body != null) {
- Exchange exchange = endpoint.createExchange();
+ Exchange exchange = createExchange(true);
CachedOutputStream cos = new
CachedOutputStream(exchange);
IOHelper.copy(body, cos);
exchange.getIn().setBody(cos.newStreamCache());
diff --git
a/components/camel-jclouds/src/main/java/org/apache/camel/component/jclouds/JcloudsConsumer.java
b/components/camel-jclouds/src/main/java/org/apache/camel/component/jclouds/JcloudsConsumer.java
deleted file mode 100644
index fdcef9c..0000000
---
a/components/camel-jclouds/src/main/java/org/apache/camel/component/jclouds/JcloudsConsumer.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.jclouds;
-
-import org.apache.camel.Exchange;
-import org.apache.camel.Processor;
-import org.apache.camel.support.ScheduledPollConsumer;
-
-public class JcloudsConsumer extends ScheduledPollConsumer {
- private final JcloudsEndpoint endpoint;
-
- public JcloudsConsumer(JcloudsEndpoint endpoint, Processor processor) {
- super(endpoint, processor);
- this.endpoint = endpoint;
- }
-
- @Override
- protected int poll() throws Exception {
- Exchange exchange = endpoint.createExchange();
-
- try {
- // send message to next processor in the route
- getProcessor().process(exchange);
- return 1; // number of messages polled
- } finally {
- // log exception if an exception occurred and was not handled
- if (exchange.getException() != null) {
- getExceptionHandler().handleException("Error processing
exchange", exchange, exchange.getException());
- }
- }
- }
-}
diff --git
a/components/camel-jcr/src/main/java/org/apache/camel/component/jcr/EndpointEventListener.java
b/components/camel-jcr/src/main/java/org/apache/camel/component/jcr/EndpointEventListener.java
index 4602722..abc06f6 100644
---
a/components/camel-jcr/src/main/java/org/apache/camel/component/jcr/EndpointEventListener.java
+++
b/components/camel-jcr/src/main/java/org/apache/camel/component/jcr/EndpointEventListener.java
@@ -38,10 +38,12 @@ public class EndpointEventListener implements EventListener
{
private static final Logger LOG =
LoggerFactory.getLogger(EndpointEventListener.class);
+ private final JcrConsumer consumer;
private final JcrEndpoint endpoint;
private final Processor processor;
- public EndpointEventListener(JcrEndpoint endpoint, Processor processor) {
+ public EndpointEventListener(JcrConsumer consumer, JcrEndpoint endpoint,
Processor processor) {
+ this.consumer = consumer;
this.endpoint = endpoint;
this.processor = processor;
}
@@ -50,10 +52,10 @@ public class EndpointEventListener implements EventListener
{
public void onEvent(EventIterator events) {
LOG.trace("onEvent START");
LOG.debug("{} consumer received JCR events: {}", endpoint, events);
- RuntimeCamelException rce = null;
+ RuntimeCamelException rce;
+ final Exchange exchange = createExchange(events);
try {
- final Exchange exchange = createExchange(events);
try {
LOG.debug("Processor, {}, is processing exchange, {}",
processor, exchange);
@@ -65,6 +67,8 @@ public class EndpointEventListener implements EventListener {
rce = exchange.getException(RuntimeCamelException.class);
} catch (Exception e) {
rce = wrapRuntimeCamelException(e);
+ } finally {
+ consumer.releaseExchange(exchange, false);
}
if (rce != null) {
@@ -76,7 +80,7 @@ public class EndpointEventListener implements EventListener {
}
private Exchange createExchange(EventIterator events) {
- Exchange exchange = endpoint.createExchange();
+ Exchange exchange = consumer.createExchange(false);
List<Event> eventList = new LinkedList<>();
if (events != null) {
diff --git
a/components/camel-jcr/src/main/java/org/apache/camel/component/jcr/JcrConsumer.java
b/components/camel-jcr/src/main/java/org/apache/camel/component/jcr/JcrConsumer.java
index ae7fd5d..9a39485 100644
---
a/components/camel-jcr/src/main/java/org/apache/camel/component/jcr/JcrConsumer.java
+++
b/components/camel-jcr/src/main/java/org/apache/camel/component/jcr/JcrConsumer.java
@@ -108,7 +108,7 @@ public class JcrConsumer extends DefaultConsumer {
boolean noLocal = getJcrEndpoint().isNoLocal();
- eventListener = new EndpointEventListener(getJcrEndpoint(),
getProcessor());
+ eventListener = new EndpointEventListener(this, getJcrEndpoint(),
getProcessor());
if (LOG.isDebugEnabled()) {
LOG.debug("Adding JCR Event Listener, {}, on {}. eventTypes={},
isDeep={}, uuid={}, nodeTypeName={}, noLocal={}",
diff --git
a/components/camel-jgroups-raft/src/main/java/org/apache/camel/component/jgroups/raft/CamelRoleChangeListener.java
b/components/camel-jgroups-raft/src/main/java/org/apache/camel/component/jgroups/raft/CamelRoleChangeListener.java
index 917e052..baf3325 100644
---
a/components/camel-jgroups-raft/src/main/java/org/apache/camel/component/jgroups/raft/CamelRoleChangeListener.java
+++
b/components/camel-jgroups-raft/src/main/java/org/apache/camel/component/jgroups/raft/CamelRoleChangeListener.java
@@ -21,7 +21,6 @@ import org.apache.camel.AsyncProcessor;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.support.AsyncProcessorConverterHelper;
-import org.apache.camel.util.ObjectHelper;
import org.jgroups.protocols.raft.RAFT;
import org.jgroups.protocols.raft.Role;
import org.slf4j.Logger;
@@ -30,13 +29,12 @@ import org.slf4j.LoggerFactory;
public class CamelRoleChangeListener implements RAFT.RoleChange {
private static final transient Logger LOG =
LoggerFactory.getLogger(CamelRoleChangeListener.class);
+ private final JGroupsRaftConsumer consumer;
private final JGroupsRaftEndpoint endpoint;
private final AsyncProcessor processor;
- public CamelRoleChangeListener(JGroupsRaftEndpoint endpoint, Processor
processor) {
- ObjectHelper.notNull(endpoint, "endpoint");
- ObjectHelper.notNull(processor, "processor");
-
+ public CamelRoleChangeListener(JGroupsRaftConsumer consumer,
JGroupsRaftEndpoint endpoint, Processor processor) {
+ this.consumer = consumer;
this.endpoint = endpoint;
this.processor = AsyncProcessorConverterHelper.convert(processor);
}
@@ -44,7 +42,7 @@ public class CamelRoleChangeListener implements
RAFT.RoleChange {
@Override
public void roleChanged(Role role) {
LOG.trace("New Role {} received.", role);
- Exchange exchange = endpoint.createExchange();
+ Exchange exchange = createExchange();
switch (role) {
case Leader:
exchange.getIn().setHeader(JGroupsRaftConstants.HEADER_JGROUPSRAFT_EVENT_TYPE,
JGroupsRaftEventType.LEADER);
@@ -76,4 +74,11 @@ public class CamelRoleChangeListener implements
RAFT.RoleChange {
throw new JGroupsRaftException("Error in consumer while
dispatching exchange containing role " + role, e);
}
}
+
+ private Exchange createExchange() {
+ Exchange exchange = consumer.createExchange(true);
+ endpoint.populateJGroupsRaftHeaders(exchange);
+ return exchange;
+ }
+
}
diff --git
a/components/camel-jgroups-raft/src/main/java/org/apache/camel/component/jgroups/raft/JGroupsRaftConsumer.java
b/components/camel-jgroups-raft/src/main/java/org/apache/camel/component/jgroups/raft/JGroupsRaftConsumer.java
index cf275c7..623b4d9 100644
---
a/components/camel-jgroups-raft/src/main/java/org/apache/camel/component/jgroups/raft/JGroupsRaftConsumer.java
+++
b/components/camel-jgroups-raft/src/main/java/org/apache/camel/component/jgroups/raft/JGroupsRaftConsumer.java
@@ -42,7 +42,7 @@ public class JGroupsRaftConsumer extends DefaultConsumer {
this.clusterName = clusterName;
this.enableRoleChangeEvents = enableRoleChangeEvents;
- this.roleListener = new CamelRoleChangeListener(endpoint, processor);
+ this.roleListener = new CamelRoleChangeListener(this, endpoint,
processor);
}
@Override
diff --git
a/components/camel-jgroups-raft/src/main/java/org/apache/camel/component/jgroups/raft/JGroupsRaftEndpoint.java
b/components/camel-jgroups-raft/src/main/java/org/apache/camel/component/jgroups/raft/JGroupsRaftEndpoint.java
index 3e0dfac..075f97e 100644
---
a/components/camel-jgroups-raft/src/main/java/org/apache/camel/component/jgroups/raft/JGroupsRaftEndpoint.java
+++
b/components/camel-jgroups-raft/src/main/java/org/apache/camel/component/jgroups/raft/JGroupsRaftEndpoint.java
@@ -82,13 +82,6 @@ public class JGroupsRaftEndpoint extends DefaultEndpoint {
return consumer;
}
- @Override
- public Exchange createExchange() {
- Exchange exchange = super.createExchange();
- populateJGroupsRaftHeaders(exchange);
- return exchange;
- }
-
public void populateJGroupsRaftHeaders(Exchange exchange) {
exchange.getIn().setHeader(JGroupsRaftConstants.HEADER_JGROUPSRAFT_COMMIT_INDEX,
resolvedRaftHandle.commitIndex());
exchange.getIn().setHeader(JGroupsRaftConstants.HEADER_JGROUPSRAFT_CURRENT_TERM,
resolvedRaftHandle.currentTerm());
diff --git
a/components/camel-jgroups/src/main/java/org/apache/camel/component/jgroups/CamelJGroupsReceiver.java
b/components/camel-jgroups/src/main/java/org/apache/camel/component/jgroups/CamelJGroupsReceiver.java
index 9fcdef8..4383903 100644
---
a/components/camel-jgroups/src/main/java/org/apache/camel/component/jgroups/CamelJGroupsReceiver.java
+++
b/components/camel-jgroups/src/main/java/org/apache/camel/component/jgroups/CamelJGroupsReceiver.java
@@ -21,13 +21,14 @@ import org.apache.camel.AsyncProcessor;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.support.AsyncProcessorConverterHelper;
-import org.apache.camel.util.ObjectHelper;
import org.jgroups.Message;
import org.jgroups.ReceiverAdapter;
import org.jgroups.View;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static
org.apache.camel.component.jgroups.JGroupsEndpoint.HEADER_JGROUPS_CHANNEL_ADDRESS;
+
/**
* Implementation of JGroups message receiver ({@code org.jgroups.Receiver})
wrapping incoming messages into Camel
* exchanges. Used by {@link JGroupsConsumer}.
@@ -36,13 +37,12 @@ public class CamelJGroupsReceiver extends ReceiverAdapter {
private static final transient Logger LOG =
LoggerFactory.getLogger(CamelJGroupsReceiver.class);
+ private final JGroupsConsumer consumer;
private final JGroupsEndpoint endpoint;
private final AsyncProcessor processor;
- public CamelJGroupsReceiver(JGroupsEndpoint endpoint, Processor processor)
{
- ObjectHelper.notNull(endpoint, "endpoint");
- ObjectHelper.notNull(processor, "processor");
-
+ public CamelJGroupsReceiver(JGroupsConsumer consumer, JGroupsEndpoint
endpoint, Processor processor) {
+ this.consumer = consumer;
this.endpoint = endpoint;
this.processor = AsyncProcessorConverterHelper.convert(processor);
}
@@ -50,7 +50,7 @@ public class CamelJGroupsReceiver extends ReceiverAdapter {
@Override
public void viewAccepted(View view) {
if (endpoint.isEnableViewMessages()) {
- Exchange exchange = endpoint.createExchange(view);
+ Exchange exchange = createExchange(view);
try {
LOG.debug("Processing view: {}", view);
processor.process(exchange, new AsyncCallback() {
@@ -80,4 +80,11 @@ public class CamelJGroupsReceiver extends ReceiverAdapter {
}
}
+ public Exchange createExchange(View view) {
+ Exchange exchange = consumer.createExchange(true);
+ exchange.getIn().setHeader(HEADER_JGROUPS_CHANNEL_ADDRESS,
endpoint.getResolvedChannel().getAddress());
+ exchange.getIn().setBody(view);
+ return exchange;
+ }
+
}
diff --git
a/components/camel-jgroups/src/main/java/org/apache/camel/component/jgroups/JGroupsConsumer.java
b/components/camel-jgroups/src/main/java/org/apache/camel/component/jgroups/JGroupsConsumer.java
index 7cb4e90..bb8ce3e 100644
---
a/components/camel-jgroups/src/main/java/org/apache/camel/component/jgroups/JGroupsConsumer.java
+++
b/components/camel-jgroups/src/main/java/org/apache/camel/component/jgroups/JGroupsConsumer.java
@@ -40,7 +40,7 @@ public class JGroupsConsumer extends DefaultConsumer {
this.endpoint = endpoint;
this.clusterName = clusterName;
- this.receiver = new CamelJGroupsReceiver(endpoint, processor);
+ this.receiver = new CamelJGroupsReceiver(this, endpoint, processor);
}
@Override
diff --git
a/components/camel-jira/src/main/java/org/apache/camel/component/jira/JiraConstants.java
b/components/camel-jira/src/main/java/org/apache/camel/component/jira/JiraConstants.java
index d53dd95..7f9ed14 100644
---
a/components/camel-jira/src/main/java/org/apache/camel/component/jira/JiraConstants.java
+++
b/components/camel-jira/src/main/java/org/apache/camel/component/jira/JiraConstants.java
@@ -27,6 +27,7 @@ public interface JiraConstants {
String ISSUE_ASSIGNEE = "IssueAssignee";
String ISSUE_COMPONENTS = "IssueComponents";
String ISSUE_COMMENT = "IssueComment";
+ String ISSUE_CHANGED = "IssueChanged";
String ISSUE_KEY = "IssueKey";
String ISSUE_PRIORITY_ID = "IssuePriorityId";
String ISSUE_PRIORITY_NAME = "IssuePriorityName";
@@ -35,6 +36,7 @@ public interface JiraConstants {
String ISSUE_TRANSITION_ID = "IssueTransitionId";
String ISSUE_TYPE_ID = "IssueTypeId";
String ISSUE_TYPE_NAME = "IssueTypeName";
+ String ISSUE_WATCHED_ISSUES = "IssueWatchedIssues";
String ISSUE_WATCHERS_ADD = "IssueWatchersAdd";
String ISSUE_WATCHERS_REMOVE = "IssueWatchersRemove";
String JIRA_REST_CLIENT_FACTORY = "JiraRestClientFactory";
diff --git
a/components/camel-jira/src/main/java/org/apache/camel/component/jira/consumer/NewCommentsConsumer.java
b/components/camel-jira/src/main/java/org/apache/camel/component/jira/consumer/NewCommentsConsumer.java
index 0b080fb..00c0b38 100644
---
a/components/camel-jira/src/main/java/org/apache/camel/component/jira/consumer/NewCommentsConsumer.java
+++
b/components/camel-jira/src/main/java/org/apache/camel/component/jira/consumer/NewCommentsConsumer.java
@@ -52,7 +52,7 @@ public class NewCommentsConsumer extends AbstractJiraConsumer
{
// retrieve from last to first item LIFO
for (int i = max; i > -1; i--) {
Comment newComment = newComments.get(i);
- Exchange e = getEndpoint().createExchange();
+ Exchange e = createExchange(true);
e.getIn().setBody(newComment);
getProcessor().process(e);
}
diff --git
a/components/camel-jira/src/main/java/org/apache/camel/component/jira/consumer/NewIssuesConsumer.java
b/components/camel-jira/src/main/java/org/apache/camel/component/jira/consumer/NewIssuesConsumer.java
index b2fd19e..b5d09ef 100644
---
a/components/camel-jira/src/main/java/org/apache/camel/component/jira/consumer/NewIssuesConsumer.java
+++
b/components/camel-jira/src/main/java/org/apache/camel/component/jira/consumer/NewIssuesConsumer.java
@@ -61,7 +61,7 @@ public class NewIssuesConsumer extends AbstractJiraConsumer {
// In the end, we want only *new* issues oldest to newest.
for (int i = newIssues.size() - 1; i > -1; i--) {
Issue newIssue = newIssues.get(i);
- Exchange e = getEndpoint().createExchange();
+ Exchange e = createExchange(true);
e.getIn().setBody(newIssue);
getProcessor().process(e);
}
diff --git
a/components/camel-jira/src/main/java/org/apache/camel/component/jira/consumer/WatchUpdatesConsumer.java
b/components/camel-jira/src/main/java/org/apache/camel/component/jira/consumer/WatchUpdatesConsumer.java
index a97d5fb..6a512c5 100644
---
a/components/camel-jira/src/main/java/org/apache/camel/component/jira/consumer/WatchUpdatesConsumer.java
+++
b/components/camel-jira/src/main/java/org/apache/camel/component/jira/consumer/WatchUpdatesConsumer.java
@@ -29,6 +29,7 @@ import java.util.stream.Collectors;
import com.atlassian.jira.rest.client.api.domain.Issue;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
+import org.apache.camel.component.jira.JiraConstants;
import org.apache.camel.component.jira.JiraEndpoint;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -107,11 +108,11 @@ public class WatchUpdatesConsumer extends
AbstractJiraConsumer {
}
private void processExchange(Object body, String issueKey, String changed)
throws Exception {
- Exchange e = getEndpoint().createExchange();
+ Exchange e = createExchange(true);
e.getIn().setBody(body);
- e.getIn().setHeader("issueKey", issueKey);
- e.getIn().setHeader("changed", changed);
- e.getIn().setHeader("watchedIssues", watchedIssuesKeys);
+ e.getIn().setHeader(JiraConstants.ISSUE_KEY, issueKey);
+ e.getIn().setHeader(JiraConstants.ISSUE_CHANGED, changed);
+ e.getIn().setHeader(JiraConstants.ISSUE_WATCHED_ISSUES,
watchedIssuesKeys);
LOG.debug(" {}: {} changed to {}", issueKey, changed, body);
getProcessor().process(e);
}
diff --git
a/components/camel-jmx/src/main/java/org/apache/camel/component/jmx/JMXConsumer.java
b/components/camel-jmx/src/main/java/org/apache/camel/component/jmx/JMXConsumer.java
index 18171a3..7fa7853 100644
---
a/components/camel-jmx/src/main/java/org/apache/camel/component/jmx/JMXConsumer.java
+++
b/components/camel-jmx/src/main/java/org/apache/camel/component/jmx/JMXConsumer.java
@@ -319,7 +319,7 @@ public class JMXConsumer extends DefaultConsumer implements
NotificationListener
@Override
public void handleNotification(Notification aNotification, Object
aHandback) {
JMXEndpoint ep = getEndpoint();
- Exchange exchange = getEndpoint().createExchange();
+ Exchange exchange = createExchange(true);
Message message = exchange.getIn();
message.setHeader("jmx.handback", aHandback);
try {
@@ -329,7 +329,7 @@ public class JMXConsumer extends DefaultConsumer implements
NotificationListener
message.setBody(aNotification);
}
- // process the notification from thred pool to not block this
notification callback thread from the JVM
+ // process the notification from thread pool to not block this
notification callback thread from the JVM
executorService.submit(() -> {
try {
getProcessor().process(exchange);
diff --git
a/components/camel-jooq/src/main/java/org/apache/camel/component/jooq/JooqConsumer.java
b/components/camel-jooq/src/main/java/org/apache/camel/component/jooq/JooqConsumer.java
index b2868e3..b7ac8e2 100644
---
a/components/camel-jooq/src/main/java/org/apache/camel/component/jooq/JooqConsumer.java
+++
b/components/camel-jooq/src/main/java/org/apache/camel/component/jooq/JooqConsumer.java
@@ -78,7 +78,7 @@ public class JooqConsumer extends
ScheduledBatchPollingConsumer {
}
protected Exchange createExchange(Object result) {
- Exchange exchange = getEndpoint().createExchange();
+ Exchange exchange = createExchange(true);
exchange.getIn().setBody(result);
return exchange;
}
diff --git
a/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java
b/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java
index 6bb0774..cf99d19 100644
---
a/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java
+++
b/components/camel-jpa/src/main/java/org/apache/camel/component/jpa/JpaConsumer.java
@@ -144,7 +144,7 @@ public class JpaConsumer extends
ScheduledBatchPollingConsumer {
"Error processing last message due: {}.
Will commit all previous successful processed message, and ignore this last
failure.",
cause.getMessage(), cause);
} else {
- // rollback all by throwning exception
+ // rollback all by throwing exception
throw cause;
}
}
@@ -200,14 +200,23 @@ public class JpaConsumer extends
ScheduledBatchPollingConsumer {
// process the current exchange
LOG.debug("Processing exchange: {}", exchange);
- getProcessor().process(exchange);
- if (exchange.getException() != null) {
- // if we failed then throw exception
- throw exchange.getException();
+ try {
+ getProcessor().process(exchange);
+ } catch (Exception e) {
+ exchange.setException(e);
}
- // Run the @Consumed callback
- getDeleteHandler().deleteObject(entityManager, result,
exchange);
+ try {
+ if (exchange.getException() != null) {
+ // if we failed then throw exception
+ throw exchange.getException();
+ } else {
+ // Run the @Consumed callback
+ getDeleteHandler().deleteObject(entityManager, result,
exchange);
+ }
+ } finally {
+ releaseExchange(exchange, false);
+ }
}
}
@@ -514,7 +523,7 @@ public class JpaConsumer extends
ScheduledBatchPollingConsumer {
}
protected Exchange createExchange(Object result, EntityManager
entityManager) {
- Exchange exchange = getEndpoint().createExchange();
+ Exchange exchange = createExchange(false);
exchange.getIn().setBody(result);
exchange.getIn().setHeader(JpaConstants.ENTITY_MANAGER, entityManager);
return exchange;
diff --git
a/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400DataQueueConsumer.java
b/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400DataQueueConsumer.java
index d73ae11..e7f113c 100644
---
a/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400DataQueueConsumer.java
+++
b/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400DataQueueConsumer.java
@@ -152,7 +152,7 @@ public class Jt400DataQueueConsumer extends
ScheduledPollConsumer {
entry = queue.read(key, -1, searchType);
}
- Exchange exchange = getEndpoint().createExchange();
+ Exchange exchange = createExchange(true);
if (entry != null) {
exchange.getIn().setHeader(Jt400Endpoint.SENDER_INFORMATION,
entry.getSenderInformation());
if (getEndpoint().getFormat() == Jt400Configuration.Format.binary)
{
diff --git
a/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400MsgQueueConsumer.java
b/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400MsgQueueConsumer.java
index 5dbbe93..8dac242 100755
---
a/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400MsgQueueConsumer.java
+++
b/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400MsgQueueConsumer.java
@@ -121,7 +121,7 @@ public class Jt400MsgQueueConsumer extends
ScheduledPollConsumer {
this.messageKey = entry.getKey();
}
- Exchange exchange = getEndpoint().createExchange();
+ Exchange exchange = createExchange(true);
exchange.getIn().setHeader(Jt400Constants.SENDER_INFORMATION,
entry.getFromJobNumber() + "/" + entry.getUser() + "/" +
entry.getFromJobName());
setHeaderIfValueNotNull(exchange.getIn(), Jt400Constants.MESSAGE_ID,
entry.getID());