This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch exchange-factory in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/exchange-factory by this push: new 8dee3f7 CAMEL-16222: PooledExchangeFactory experiment 8dee3f7 is described below commit 8dee3f7ee66e11e19ed816a0514b810ebdcf1cd0 Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Sun Feb 21 20:07:16 2021 +0100 CAMEL-16222: PooledExchangeFactory experiment --- .../apache/camel/component/iec60870/Constants.java | 5 ++++ .../component/iec60870/client/ClientConsumer.java | 30 ++++++++-------------- .../ignite/events/IgniteEventsConsumer.java | 3 +-- .../ignite/messaging/IgniteMessagingConsumer.java | 11 ++++---- .../component/infinispan/InfinispanConsumer.java | 28 ++++++++++---------- .../camel/component/ironmq/IronMQConsumer.java | 19 ++++++++++++-- .../camel/component/ironmq/IronMQEndpoint.java | 21 --------------- 7 files changed, 53 insertions(+), 64 deletions(-) diff --git a/components/camel-iec60870/src/main/java/org/apache/camel/component/iec60870/Constants.java b/components/camel-iec60870/src/main/java/org/apache/camel/component/iec60870/Constants.java index 926e01a..65d2b31 100644 --- a/components/camel-iec60870/src/main/java/org/apache/camel/component/iec60870/Constants.java +++ b/components/camel-iec60870/src/main/java/org/apache/camel/component/iec60870/Constants.java @@ -22,4 +22,9 @@ public interface Constants { String PARAM_PROTOCOL_OPTIONS = "protocolOptions"; String PARAM_CONNECTION_OPTIONS = "connectionOptions"; + + String IEC60870_VALUE = "CamelIec60870Value"; + String IEC60870_TIMESTAMP = "CamelIec60870Timestamp"; + String IEC60870_QUALITY = "CamelIec60870Quality"; + String IEC60870_OVERFLOW = "CamelIec60870Overflow"; } diff --git a/components/camel-iec60870/src/main/java/org/apache/camel/component/iec60870/client/ClientConsumer.java b/components/camel-iec60870/src/main/java/org/apache/camel/component/iec60870/client/ClientConsumer.java index 43a80d2..c1bf5e3 100644 --- a/components/camel-iec60870/src/main/java/org/apache/camel/component/iec60870/client/ClientConsumer.java +++ b/components/camel-iec60870/src/main/java/org/apache/camel/component/iec60870/client/ClientConsumer.java @@ -16,22 +16,16 @@ */ package org.apache.camel.component.iec60870.client; -import java.time.Instant; - import org.apache.camel.Exchange; import org.apache.camel.Message; import org.apache.camel.Processor; +import org.apache.camel.component.iec60870.Constants; import org.apache.camel.component.iec60870.ObjectAddress; import org.apache.camel.support.DefaultConsumer; -import org.apache.camel.support.DefaultMessage; import org.eclipse.neoscada.protocol.iec60870.asdu.types.Value; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public class ClientConsumer extends DefaultConsumer { - private static final Logger LOG = LoggerFactory.getLogger(ClientConsumer.class); - private final ClientConnection connection; private final ClientEndpoint endpoint; @@ -56,24 +50,20 @@ public class ClientConsumer extends DefaultConsumer { private void updateValue(final ObjectAddress address, final Value<?> value) { // Note: we hold the sync lock for the connection try { - final Exchange exchange = getEndpoint().createExchange(); - exchange.setIn(mapMessage(value)); + Exchange exchange = createExchange(true); + configureMessage(exchange.getIn(), value); getProcessor().process(exchange); - } catch (final Exception e) { - LOG.debug("Failed to process message", e); + } catch (Exception e) { + getExceptionHandler().handleException(e); } } - private Message mapMessage(final Value<?> value) { - final DefaultMessage message = new DefaultMessage(this.endpoint.getCamelContext()); - + private void configureMessage(Message message, final Value<?> value) { message.setBody(value); - message.setHeader("value", value.getValue()); - message.setHeader("timestamp", Instant.ofEpochMilli(value.getTimestamp())); - message.setHeader("quality", value.getQualityInformation()); - message.setHeader("overflow", value.isOverflow()); - - return message; + message.setHeader(Constants.IEC60870_VALUE, value.getValue()); + message.setHeader(Constants.IEC60870_TIMESTAMP, value.getTimestamp()); + message.setHeader(Constants.IEC60870_QUALITY, value.getQualityInformation()); + message.setHeader(Constants.IEC60870_OVERFLOW, value.isOverflow()); } } diff --git a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/events/IgniteEventsConsumer.java b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/events/IgniteEventsConsumer.java index 42b3e34..9b682f0 100644 --- a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/events/IgniteEventsConsumer.java +++ b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/events/IgniteEventsConsumer.java @@ -21,7 +21,6 @@ import java.util.List; import org.apache.camel.AsyncCallback; import org.apache.camel.Exchange; -import org.apache.camel.ExchangePattern; import org.apache.camel.Message; import org.apache.camel.Processor; import org.apache.camel.support.DefaultConsumer; @@ -47,7 +46,7 @@ public class IgniteEventsConsumer extends DefaultConsumer { @Override public boolean apply(Event event) { - Exchange exchange = endpoint.createExchange(ExchangePattern.InOnly); + Exchange exchange = createExchange(true); Message in = exchange.getIn(); in.setBody(event); try { diff --git a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/messaging/IgniteMessagingConsumer.java b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/messaging/IgniteMessagingConsumer.java index d9b8fb1..fc21167 100644 --- a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/messaging/IgniteMessagingConsumer.java +++ b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/messaging/IgniteMessagingConsumer.java @@ -19,7 +19,6 @@ package org.apache.camel.component.ignite.messaging; import java.util.UUID; import org.apache.camel.Exchange; -import org.apache.camel.ExchangePattern; import org.apache.camel.Message; import org.apache.camel.Processor; import org.apache.camel.component.ignite.IgniteConstants; @@ -44,12 +43,12 @@ public class IgniteMessagingConsumer extends DefaultConsumer { @Override public boolean apply(UUID uuid, Object payload) { - Exchange exchange = endpoint.createExchange(ExchangePattern.InOnly); - Message in = exchange.getIn(); - in.setBody(payload); - in.setHeader(IgniteConstants.IGNITE_MESSAGING_TOPIC, endpoint.getTopic()); - in.setHeader(IgniteConstants.IGNITE_MESSAGING_UUID, uuid); + Exchange exchange = createExchange(true); try { + Message in = exchange.getIn(); + in.setBody(payload); + in.setHeader(IgniteConstants.IGNITE_MESSAGING_TOPIC, endpoint.getTopic()); + in.setHeader(IgniteConstants.IGNITE_MESSAGING_UUID, uuid); if (LOG.isTraceEnabled()) { LOG.trace("Processing Ignite message for subscription {} with payload {}.", uuid, payload); } diff --git a/components/camel-infinispan/camel-infinispan-common/src/main/java/org/apache/camel/component/infinispan/InfinispanConsumer.java b/components/camel-infinispan/camel-infinispan-common/src/main/java/org/apache/camel/component/infinispan/InfinispanConsumer.java index 9c09afb..12b855e 100644 --- a/components/camel-infinispan/camel-infinispan-common/src/main/java/org/apache/camel/component/infinispan/InfinispanConsumer.java +++ b/components/camel-infinispan/camel-infinispan-common/src/main/java/org/apache/camel/component/infinispan/InfinispanConsumer.java @@ -45,24 +45,26 @@ public abstract class InfinispanConsumer< @Override public void processEvent(String eventType, String cacheName, Object key, Object eventData, Consumer<Exchange> consumer) { - Exchange exchange = getEndpoint().createExchange(); - exchange.getMessage().setHeader(InfinispanConstants.EVENT_TYPE, eventType); - exchange.getMessage().setHeader(InfinispanConstants.CACHE_NAME, cacheName); + Exchange exchange = createExchange(false); + try { + exchange.getMessage().setHeader(InfinispanConstants.EVENT_TYPE, eventType); + exchange.getMessage().setHeader(InfinispanConstants.CACHE_NAME, cacheName); - if (key != null) { - exchange.getMessage().setHeader(InfinispanConstants.KEY, key); - } - if (eventData != null) { - exchange.getMessage().setHeader(InfinispanConstants.EVENT_DATA, eventData); - } - if (consumer != null) { - consumer.accept(exchange); - } + if (key != null) { + exchange.getMessage().setHeader(InfinispanConstants.KEY, key); + } + if (eventData != null) { + exchange.getMessage().setHeader(InfinispanConstants.EVENT_DATA, eventData); + } + if (consumer != null) { + consumer.accept(exchange); + } - try { getProcessor().process(exchange); } catch (Exception e) { getExceptionHandler().handleException(e); + } finally { + releaseExchange(exchange, false); } } diff --git a/components/camel-ironmq/src/main/java/org/apache/camel/component/ironmq/IronMQConsumer.java b/components/camel-ironmq/src/main/java/org/apache/camel/component/ironmq/IronMQConsumer.java index 8993614..30d45e2 100644 --- a/components/camel-ironmq/src/main/java/org/apache/camel/component/ironmq/IronMQConsumer.java +++ b/components/camel-ironmq/src/main/java/org/apache/camel/component/ironmq/IronMQConsumer.java @@ -59,7 +59,7 @@ public class IronMQConsumer extends ScheduledBatchPollingConsumer { shutdownRunningTask = null; pendingExchanges = 0; try { - Messages messages = null; + Messages messages; LOG.trace("Receiving messages with request [messagePerPoll {}, timeout {}]...", getMaxMessagesPerPoll(), getEndpoint().getConfiguration().getTimeout()); messages = this.ironQueue.reserve(getMaxMessagesPerPoll(), getEndpoint().getConfiguration().getTimeout(), @@ -84,7 +84,7 @@ public class IronMQConsumer extends ScheduledBatchPollingConsumer { Queue<Exchange> answer = new LinkedList<>(); for (Message message : messages) { - Exchange exchange = getEndpoint().createExchange(message); + Exchange exchange = createExchange(message); answer.add(exchange); } return answer; @@ -172,4 +172,19 @@ public class IronMQConsumer extends ScheduledBatchPollingConsumer { return (IronMQEndpoint) super.getEndpoint(); } + private Exchange createExchange(io.iron.ironmq.Message msg) { + Exchange exchange = createExchange(true); + exchange.setPattern(getEndpoint().getExchangePattern()); + org.apache.camel.Message message = exchange.getIn(); + if (getEndpoint().getConfiguration().isPreserveHeaders()) { + GsonUtil.copyFrom(msg, message); + } else { + message.setBody(msg.getBody()); + } + message.setHeader(IronMQConstants.MESSAGE_ID, msg.getId()); + message.setHeader(IronMQConstants.MESSAGE_RESERVATION_ID, msg.getReservationId()); + message.setHeader(IronMQConstants.MESSAGE_RESERVED_COUNT, msg.getReservedCount()); + return exchange; + } + } diff --git a/components/camel-ironmq/src/main/java/org/apache/camel/component/ironmq/IronMQEndpoint.java b/components/camel-ironmq/src/main/java/org/apache/camel/component/ironmq/IronMQEndpoint.java index db0b244..4e8c116 100644 --- a/components/camel-ironmq/src/main/java/org/apache/camel/component/ironmq/IronMQEndpoint.java +++ b/components/camel-ironmq/src/main/java/org/apache/camel/component/ironmq/IronMQEndpoint.java @@ -22,9 +22,6 @@ import io.iron.ironmq.Client; import io.iron.ironmq.Cloud; import org.apache.camel.Category; import org.apache.camel.Consumer; -import org.apache.camel.Exchange; -import org.apache.camel.ExchangePattern; -import org.apache.camel.Message; import org.apache.camel.Processor; import org.apache.camel.Producer; import org.apache.camel.spi.UriEndpoint; @@ -74,24 +71,6 @@ public class IronMQEndpoint extends ScheduledPollEndpoint { return ironMQConsumer; } - public Exchange createExchange(io.iron.ironmq.Message msg) { - return createExchange(getExchangePattern(), msg); - } - - private Exchange createExchange(ExchangePattern pattern, io.iron.ironmq.Message msg) { - Exchange exchange = super.createExchange(pattern); - Message message = exchange.getIn(); - if (configuration.isPreserveHeaders()) { - GsonUtil.copyFrom(msg, message); - } else { - message.setBody(msg.getBody()); - } - message.setHeader(IronMQConstants.MESSAGE_ID, msg.getId()); - message.setHeader(IronMQConstants.MESSAGE_RESERVATION_ID, msg.getReservationId()); - message.setHeader(IronMQConstants.MESSAGE_RESERVED_COUNT, msg.getReservedCount()); - return exchange; - } - @Override protected void doStart() throws Exception { super.doStart();