This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch exchange-factory
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/exchange-factory by this push:
new 8dee3f7 CAMEL-16222: PooledExchangeFactory experiment
8dee3f7 is described below
commit 8dee3f7ee66e11e19ed816a0514b810ebdcf1cd0
Author: Claus Ibsen <[email protected]>
AuthorDate: Sun Feb 21 20:07:16 2021 +0100
CAMEL-16222: PooledExchangeFactory experiment
---
.../apache/camel/component/iec60870/Constants.java | 5 ++++
.../component/iec60870/client/ClientConsumer.java | 30 ++++++++--------------
.../ignite/events/IgniteEventsConsumer.java | 3 +--
.../ignite/messaging/IgniteMessagingConsumer.java | 11 ++++----
.../component/infinispan/InfinispanConsumer.java | 28 ++++++++++----------
.../camel/component/ironmq/IronMQConsumer.java | 19 ++++++++++++--
.../camel/component/ironmq/IronMQEndpoint.java | 21 ---------------
7 files changed, 53 insertions(+), 64 deletions(-)
diff --git
a/components/camel-iec60870/src/main/java/org/apache/camel/component/iec60870/Constants.java
b/components/camel-iec60870/src/main/java/org/apache/camel/component/iec60870/Constants.java
index 926e01a..65d2b31 100644
---
a/components/camel-iec60870/src/main/java/org/apache/camel/component/iec60870/Constants.java
+++
b/components/camel-iec60870/src/main/java/org/apache/camel/component/iec60870/Constants.java
@@ -22,4 +22,9 @@ public interface Constants {
String PARAM_PROTOCOL_OPTIONS = "protocolOptions";
String PARAM_CONNECTION_OPTIONS = "connectionOptions";
+
+ String IEC60870_VALUE = "CamelIec60870Value";
+ String IEC60870_TIMESTAMP = "CamelIec60870Timestamp";
+ String IEC60870_QUALITY = "CamelIec60870Quality";
+ String IEC60870_OVERFLOW = "CamelIec60870Overflow";
}
diff --git
a/components/camel-iec60870/src/main/java/org/apache/camel/component/iec60870/client/ClientConsumer.java
b/components/camel-iec60870/src/main/java/org/apache/camel/component/iec60870/client/ClientConsumer.java
index 43a80d2..c1bf5e3 100644
---
a/components/camel-iec60870/src/main/java/org/apache/camel/component/iec60870/client/ClientConsumer.java
+++
b/components/camel-iec60870/src/main/java/org/apache/camel/component/iec60870/client/ClientConsumer.java
@@ -16,22 +16,16 @@
*/
package org.apache.camel.component.iec60870.client;
-import java.time.Instant;
-
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.Processor;
+import org.apache.camel.component.iec60870.Constants;
import org.apache.camel.component.iec60870.ObjectAddress;
import org.apache.camel.support.DefaultConsumer;
-import org.apache.camel.support.DefaultMessage;
import org.eclipse.neoscada.protocol.iec60870.asdu.types.Value;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
public class ClientConsumer extends DefaultConsumer {
- private static final Logger LOG =
LoggerFactory.getLogger(ClientConsumer.class);
-
private final ClientConnection connection;
private final ClientEndpoint endpoint;
@@ -56,24 +50,20 @@ public class ClientConsumer extends DefaultConsumer {
private void updateValue(final ObjectAddress address, final Value<?>
value) {
// Note: we hold the sync lock for the connection
try {
- final Exchange exchange = getEndpoint().createExchange();
- exchange.setIn(mapMessage(value));
+ Exchange exchange = createExchange(true);
+ configureMessage(exchange.getIn(), value);
getProcessor().process(exchange);
- } catch (final Exception e) {
- LOG.debug("Failed to process message", e);
+ } catch (Exception e) {
+ getExceptionHandler().handleException(e);
}
}
- private Message mapMessage(final Value<?> value) {
- final DefaultMessage message = new
DefaultMessage(this.endpoint.getCamelContext());
-
+ private void configureMessage(Message message, final Value<?> value) {
message.setBody(value);
- message.setHeader("value", value.getValue());
- message.setHeader("timestamp",
Instant.ofEpochMilli(value.getTimestamp()));
- message.setHeader("quality", value.getQualityInformation());
- message.setHeader("overflow", value.isOverflow());
-
- return message;
+ message.setHeader(Constants.IEC60870_VALUE, value.getValue());
+ message.setHeader(Constants.IEC60870_TIMESTAMP, value.getTimestamp());
+ message.setHeader(Constants.IEC60870_QUALITY,
value.getQualityInformation());
+ message.setHeader(Constants.IEC60870_OVERFLOW, value.isOverflow());
}
}
diff --git
a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/events/IgniteEventsConsumer.java
b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/events/IgniteEventsConsumer.java
index 42b3e34..9b682f0 100644
---
a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/events/IgniteEventsConsumer.java
+++
b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/events/IgniteEventsConsumer.java
@@ -21,7 +21,6 @@ import java.util.List;
import org.apache.camel.AsyncCallback;
import org.apache.camel.Exchange;
-import org.apache.camel.ExchangePattern;
import org.apache.camel.Message;
import org.apache.camel.Processor;
import org.apache.camel.support.DefaultConsumer;
@@ -47,7 +46,7 @@ public class IgniteEventsConsumer extends DefaultConsumer {
@Override
public boolean apply(Event event) {
- Exchange exchange =
endpoint.createExchange(ExchangePattern.InOnly);
+ Exchange exchange = createExchange(true);
Message in = exchange.getIn();
in.setBody(event);
try {
diff --git
a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/messaging/IgniteMessagingConsumer.java
b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/messaging/IgniteMessagingConsumer.java
index d9b8fb1..fc21167 100644
---
a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/messaging/IgniteMessagingConsumer.java
+++
b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/messaging/IgniteMessagingConsumer.java
@@ -19,7 +19,6 @@ package org.apache.camel.component.ignite.messaging;
import java.util.UUID;
import org.apache.camel.Exchange;
-import org.apache.camel.ExchangePattern;
import org.apache.camel.Message;
import org.apache.camel.Processor;
import org.apache.camel.component.ignite.IgniteConstants;
@@ -44,12 +43,12 @@ public class IgniteMessagingConsumer extends
DefaultConsumer {
@Override
public boolean apply(UUID uuid, Object payload) {
- Exchange exchange =
endpoint.createExchange(ExchangePattern.InOnly);
- Message in = exchange.getIn();
- in.setBody(payload);
- in.setHeader(IgniteConstants.IGNITE_MESSAGING_TOPIC,
endpoint.getTopic());
- in.setHeader(IgniteConstants.IGNITE_MESSAGING_UUID, uuid);
+ Exchange exchange = createExchange(true);
try {
+ Message in = exchange.getIn();
+ in.setBody(payload);
+ in.setHeader(IgniteConstants.IGNITE_MESSAGING_TOPIC,
endpoint.getTopic());
+ in.setHeader(IgniteConstants.IGNITE_MESSAGING_UUID, uuid);
if (LOG.isTraceEnabled()) {
LOG.trace("Processing Ignite message for subscription {}
with payload {}.", uuid, payload);
}
diff --git
a/components/camel-infinispan/camel-infinispan-common/src/main/java/org/apache/camel/component/infinispan/InfinispanConsumer.java
b/components/camel-infinispan/camel-infinispan-common/src/main/java/org/apache/camel/component/infinispan/InfinispanConsumer.java
index 9c09afb..12b855e 100644
---
a/components/camel-infinispan/camel-infinispan-common/src/main/java/org/apache/camel/component/infinispan/InfinispanConsumer.java
+++
b/components/camel-infinispan/camel-infinispan-common/src/main/java/org/apache/camel/component/infinispan/InfinispanConsumer.java
@@ -45,24 +45,26 @@ public abstract class InfinispanConsumer<
@Override
public void processEvent(String eventType, String cacheName, Object key,
Object eventData, Consumer<Exchange> consumer) {
- Exchange exchange = getEndpoint().createExchange();
- exchange.getMessage().setHeader(InfinispanConstants.EVENT_TYPE,
eventType);
- exchange.getMessage().setHeader(InfinispanConstants.CACHE_NAME,
cacheName);
+ Exchange exchange = createExchange(false);
+ try {
+ exchange.getMessage().setHeader(InfinispanConstants.EVENT_TYPE,
eventType);
+ exchange.getMessage().setHeader(InfinispanConstants.CACHE_NAME,
cacheName);
- if (key != null) {
- exchange.getMessage().setHeader(InfinispanConstants.KEY, key);
- }
- if (eventData != null) {
- exchange.getMessage().setHeader(InfinispanConstants.EVENT_DATA,
eventData);
- }
- if (consumer != null) {
- consumer.accept(exchange);
- }
+ if (key != null) {
+ exchange.getMessage().setHeader(InfinispanConstants.KEY, key);
+ }
+ if (eventData != null) {
+
exchange.getMessage().setHeader(InfinispanConstants.EVENT_DATA, eventData);
+ }
+ if (consumer != null) {
+ consumer.accept(exchange);
+ }
- try {
getProcessor().process(exchange);
} catch (Exception e) {
getExceptionHandler().handleException(e);
+ } finally {
+ releaseExchange(exchange, false);
}
}
diff --git
a/components/camel-ironmq/src/main/java/org/apache/camel/component/ironmq/IronMQConsumer.java
b/components/camel-ironmq/src/main/java/org/apache/camel/component/ironmq/IronMQConsumer.java
index 8993614..30d45e2 100644
---
a/components/camel-ironmq/src/main/java/org/apache/camel/component/ironmq/IronMQConsumer.java
+++
b/components/camel-ironmq/src/main/java/org/apache/camel/component/ironmq/IronMQConsumer.java
@@ -59,7 +59,7 @@ public class IronMQConsumer extends
ScheduledBatchPollingConsumer {
shutdownRunningTask = null;
pendingExchanges = 0;
try {
- Messages messages = null;
+ Messages messages;
LOG.trace("Receiving messages with request [messagePerPoll {},
timeout {}]...", getMaxMessagesPerPoll(),
getEndpoint().getConfiguration().getTimeout());
messages = this.ironQueue.reserve(getMaxMessagesPerPoll(),
getEndpoint().getConfiguration().getTimeout(),
@@ -84,7 +84,7 @@ public class IronMQConsumer extends
ScheduledBatchPollingConsumer {
Queue<Exchange> answer = new LinkedList<>();
for (Message message : messages) {
- Exchange exchange = getEndpoint().createExchange(message);
+ Exchange exchange = createExchange(message);
answer.add(exchange);
}
return answer;
@@ -172,4 +172,19 @@ public class IronMQConsumer extends
ScheduledBatchPollingConsumer {
return (IronMQEndpoint) super.getEndpoint();
}
+ private Exchange createExchange(io.iron.ironmq.Message msg) {
+ Exchange exchange = createExchange(true);
+ exchange.setPattern(getEndpoint().getExchangePattern());
+ org.apache.camel.Message message = exchange.getIn();
+ if (getEndpoint().getConfiguration().isPreserveHeaders()) {
+ GsonUtil.copyFrom(msg, message);
+ } else {
+ message.setBody(msg.getBody());
+ }
+ message.setHeader(IronMQConstants.MESSAGE_ID, msg.getId());
+ message.setHeader(IronMQConstants.MESSAGE_RESERVATION_ID,
msg.getReservationId());
+ message.setHeader(IronMQConstants.MESSAGE_RESERVED_COUNT,
msg.getReservedCount());
+ return exchange;
+ }
+
}
diff --git
a/components/camel-ironmq/src/main/java/org/apache/camel/component/ironmq/IronMQEndpoint.java
b/components/camel-ironmq/src/main/java/org/apache/camel/component/ironmq/IronMQEndpoint.java
index db0b244..4e8c116 100644
---
a/components/camel-ironmq/src/main/java/org/apache/camel/component/ironmq/IronMQEndpoint.java
+++
b/components/camel-ironmq/src/main/java/org/apache/camel/component/ironmq/IronMQEndpoint.java
@@ -22,9 +22,6 @@ import io.iron.ironmq.Client;
import io.iron.ironmq.Cloud;
import org.apache.camel.Category;
import org.apache.camel.Consumer;
-import org.apache.camel.Exchange;
-import org.apache.camel.ExchangePattern;
-import org.apache.camel.Message;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
import org.apache.camel.spi.UriEndpoint;
@@ -74,24 +71,6 @@ public class IronMQEndpoint extends ScheduledPollEndpoint {
return ironMQConsumer;
}
- public Exchange createExchange(io.iron.ironmq.Message msg) {
- return createExchange(getExchangePattern(), msg);
- }
-
- private Exchange createExchange(ExchangePattern pattern,
io.iron.ironmq.Message msg) {
- Exchange exchange = super.createExchange(pattern);
- Message message = exchange.getIn();
- if (configuration.isPreserveHeaders()) {
- GsonUtil.copyFrom(msg, message);
- } else {
- message.setBody(msg.getBody());
- }
- message.setHeader(IronMQConstants.MESSAGE_ID, msg.getId());
- message.setHeader(IronMQConstants.MESSAGE_RESERVATION_ID,
msg.getReservationId());
- message.setHeader(IronMQConstants.MESSAGE_RESERVED_COUNT,
msg.getReservedCount());
- return exchange;
- }
-
@Override
protected void doStart() throws Exception {
super.doStart();