This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch exchange-factory
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/exchange-factory by this push:
     new 8dee3f7  CAMEL-16222: PooledExchangeFactory experiment
8dee3f7 is described below

commit 8dee3f7ee66e11e19ed816a0514b810ebdcf1cd0
Author: Claus Ibsen <claus.ib...@gmail.com>
AuthorDate: Sun Feb 21 20:07:16 2021 +0100

    CAMEL-16222: PooledExchangeFactory experiment
---
 .../apache/camel/component/iec60870/Constants.java |  5 ++++
 .../component/iec60870/client/ClientConsumer.java  | 30 ++++++++--------------
 .../ignite/events/IgniteEventsConsumer.java        |  3 +--
 .../ignite/messaging/IgniteMessagingConsumer.java  | 11 ++++----
 .../component/infinispan/InfinispanConsumer.java   | 28 ++++++++++----------
 .../camel/component/ironmq/IronMQConsumer.java     | 19 ++++++++++++--
 .../camel/component/ironmq/IronMQEndpoint.java     | 21 ---------------
 7 files changed, 53 insertions(+), 64 deletions(-)

diff --git 
a/components/camel-iec60870/src/main/java/org/apache/camel/component/iec60870/Constants.java
 
b/components/camel-iec60870/src/main/java/org/apache/camel/component/iec60870/Constants.java
index 926e01a..65d2b31 100644
--- 
a/components/camel-iec60870/src/main/java/org/apache/camel/component/iec60870/Constants.java
+++ 
b/components/camel-iec60870/src/main/java/org/apache/camel/component/iec60870/Constants.java
@@ -22,4 +22,9 @@ public interface Constants {
     String PARAM_PROTOCOL_OPTIONS = "protocolOptions";
 
     String PARAM_CONNECTION_OPTIONS = "connectionOptions";
+
+    String IEC60870_VALUE = "CamelIec60870Value";
+    String IEC60870_TIMESTAMP = "CamelIec60870Timestamp";
+    String IEC60870_QUALITY = "CamelIec60870Quality";
+    String IEC60870_OVERFLOW = "CamelIec60870Overflow";
 }
diff --git 
a/components/camel-iec60870/src/main/java/org/apache/camel/component/iec60870/client/ClientConsumer.java
 
b/components/camel-iec60870/src/main/java/org/apache/camel/component/iec60870/client/ClientConsumer.java
index 43a80d2..c1bf5e3 100644
--- 
a/components/camel-iec60870/src/main/java/org/apache/camel/component/iec60870/client/ClientConsumer.java
+++ 
b/components/camel-iec60870/src/main/java/org/apache/camel/component/iec60870/client/ClientConsumer.java
@@ -16,22 +16,16 @@
  */
 package org.apache.camel.component.iec60870.client;
 
-import java.time.Instant;
-
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
 import org.apache.camel.Processor;
+import org.apache.camel.component.iec60870.Constants;
 import org.apache.camel.component.iec60870.ObjectAddress;
 import org.apache.camel.support.DefaultConsumer;
-import org.apache.camel.support.DefaultMessage;
 import org.eclipse.neoscada.protocol.iec60870.asdu.types.Value;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class ClientConsumer extends DefaultConsumer {
 
-    private static final Logger LOG = 
LoggerFactory.getLogger(ClientConsumer.class);
-
     private final ClientConnection connection;
     private final ClientEndpoint endpoint;
 
@@ -56,24 +50,20 @@ public class ClientConsumer extends DefaultConsumer {
     private void updateValue(final ObjectAddress address, final Value<?> 
value) {
         // Note: we hold the sync lock for the connection
         try {
-            final Exchange exchange = getEndpoint().createExchange();
-            exchange.setIn(mapMessage(value));
+            Exchange exchange = createExchange(true);
+            configureMessage(exchange.getIn(), value);
             getProcessor().process(exchange);
-        } catch (final Exception e) {
-            LOG.debug("Failed to process message", e);
+        } catch (Exception e) {
+            getExceptionHandler().handleException(e);
         }
     }
 
-    private Message mapMessage(final Value<?> value) {
-        final DefaultMessage message = new 
DefaultMessage(this.endpoint.getCamelContext());
-
+    private void configureMessage(Message message, final Value<?> value) {
         message.setBody(value);
 
-        message.setHeader("value", value.getValue());
-        message.setHeader("timestamp", 
Instant.ofEpochMilli(value.getTimestamp()));
-        message.setHeader("quality", value.getQualityInformation());
-        message.setHeader("overflow", value.isOverflow());
-
-        return message;
+        message.setHeader(Constants.IEC60870_VALUE, value.getValue());
+        message.setHeader(Constants.IEC60870_TIMESTAMP, value.getTimestamp());
+        message.setHeader(Constants.IEC60870_QUALITY, 
value.getQualityInformation());
+        message.setHeader(Constants.IEC60870_OVERFLOW, value.isOverflow());
     }
 }
diff --git 
a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/events/IgniteEventsConsumer.java
 
b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/events/IgniteEventsConsumer.java
index 42b3e34..9b682f0 100644
--- 
a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/events/IgniteEventsConsumer.java
+++ 
b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/events/IgniteEventsConsumer.java
@@ -21,7 +21,6 @@ import java.util.List;
 
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
-import org.apache.camel.ExchangePattern;
 import org.apache.camel.Message;
 import org.apache.camel.Processor;
 import org.apache.camel.support.DefaultConsumer;
@@ -47,7 +46,7 @@ public class IgniteEventsConsumer extends DefaultConsumer {
 
         @Override
         public boolean apply(Event event) {
-            Exchange exchange = 
endpoint.createExchange(ExchangePattern.InOnly);
+            Exchange exchange = createExchange(true);
             Message in = exchange.getIn();
             in.setBody(event);
             try {
diff --git 
a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/messaging/IgniteMessagingConsumer.java
 
b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/messaging/IgniteMessagingConsumer.java
index d9b8fb1..fc21167 100644
--- 
a/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/messaging/IgniteMessagingConsumer.java
+++ 
b/components/camel-ignite/src/main/java/org/apache/camel/component/ignite/messaging/IgniteMessagingConsumer.java
@@ -19,7 +19,6 @@ package org.apache.camel.component.ignite.messaging;
 import java.util.UUID;
 
 import org.apache.camel.Exchange;
-import org.apache.camel.ExchangePattern;
 import org.apache.camel.Message;
 import org.apache.camel.Processor;
 import org.apache.camel.component.ignite.IgniteConstants;
@@ -44,12 +43,12 @@ public class IgniteMessagingConsumer extends 
DefaultConsumer {
 
         @Override
         public boolean apply(UUID uuid, Object payload) {
-            Exchange exchange = 
endpoint.createExchange(ExchangePattern.InOnly);
-            Message in = exchange.getIn();
-            in.setBody(payload);
-            in.setHeader(IgniteConstants.IGNITE_MESSAGING_TOPIC, 
endpoint.getTopic());
-            in.setHeader(IgniteConstants.IGNITE_MESSAGING_UUID, uuid);
+            Exchange exchange = createExchange(true);
             try {
+                Message in = exchange.getIn();
+                in.setBody(payload);
+                in.setHeader(IgniteConstants.IGNITE_MESSAGING_TOPIC, 
endpoint.getTopic());
+                in.setHeader(IgniteConstants.IGNITE_MESSAGING_UUID, uuid);
                 if (LOG.isTraceEnabled()) {
                     LOG.trace("Processing Ignite message for subscription {} 
with payload {}.", uuid, payload);
                 }
diff --git 
a/components/camel-infinispan/camel-infinispan-common/src/main/java/org/apache/camel/component/infinispan/InfinispanConsumer.java
 
b/components/camel-infinispan/camel-infinispan-common/src/main/java/org/apache/camel/component/infinispan/InfinispanConsumer.java
index 9c09afb..12b855e 100644
--- 
a/components/camel-infinispan/camel-infinispan-common/src/main/java/org/apache/camel/component/infinispan/InfinispanConsumer.java
+++ 
b/components/camel-infinispan/camel-infinispan-common/src/main/java/org/apache/camel/component/infinispan/InfinispanConsumer.java
@@ -45,24 +45,26 @@ public abstract class InfinispanConsumer<
 
     @Override
     public void processEvent(String eventType, String cacheName, Object key, 
Object eventData, Consumer<Exchange> consumer) {
-        Exchange exchange = getEndpoint().createExchange();
-        exchange.getMessage().setHeader(InfinispanConstants.EVENT_TYPE, 
eventType);
-        exchange.getMessage().setHeader(InfinispanConstants.CACHE_NAME, 
cacheName);
+        Exchange exchange = createExchange(false);
+        try {
+            exchange.getMessage().setHeader(InfinispanConstants.EVENT_TYPE, 
eventType);
+            exchange.getMessage().setHeader(InfinispanConstants.CACHE_NAME, 
cacheName);
 
-        if (key != null) {
-            exchange.getMessage().setHeader(InfinispanConstants.KEY, key);
-        }
-        if (eventData != null) {
-            exchange.getMessage().setHeader(InfinispanConstants.EVENT_DATA, 
eventData);
-        }
-        if (consumer != null) {
-            consumer.accept(exchange);
-        }
+            if (key != null) {
+                exchange.getMessage().setHeader(InfinispanConstants.KEY, key);
+            }
+            if (eventData != null) {
+                
exchange.getMessage().setHeader(InfinispanConstants.EVENT_DATA, eventData);
+            }
+            if (consumer != null) {
+                consumer.accept(exchange);
+            }
 
-        try {
             getProcessor().process(exchange);
         } catch (Exception e) {
             getExceptionHandler().handleException(e);
+        } finally {
+            releaseExchange(exchange, false);
         }
     }
 
diff --git 
a/components/camel-ironmq/src/main/java/org/apache/camel/component/ironmq/IronMQConsumer.java
 
b/components/camel-ironmq/src/main/java/org/apache/camel/component/ironmq/IronMQConsumer.java
index 8993614..30d45e2 100644
--- 
a/components/camel-ironmq/src/main/java/org/apache/camel/component/ironmq/IronMQConsumer.java
+++ 
b/components/camel-ironmq/src/main/java/org/apache/camel/component/ironmq/IronMQConsumer.java
@@ -59,7 +59,7 @@ public class IronMQConsumer extends 
ScheduledBatchPollingConsumer {
         shutdownRunningTask = null;
         pendingExchanges = 0;
         try {
-            Messages messages = null;
+            Messages messages;
             LOG.trace("Receiving messages with request [messagePerPoll {}, 
timeout {}]...", getMaxMessagesPerPoll(),
                     getEndpoint().getConfiguration().getTimeout());
             messages = this.ironQueue.reserve(getMaxMessagesPerPoll(), 
getEndpoint().getConfiguration().getTimeout(),
@@ -84,7 +84,7 @@ public class IronMQConsumer extends 
ScheduledBatchPollingConsumer {
 
         Queue<Exchange> answer = new LinkedList<>();
         for (Message message : messages) {
-            Exchange exchange = getEndpoint().createExchange(message);
+            Exchange exchange = createExchange(message);
             answer.add(exchange);
         }
         return answer;
@@ -172,4 +172,19 @@ public class IronMQConsumer extends 
ScheduledBatchPollingConsumer {
         return (IronMQEndpoint) super.getEndpoint();
     }
 
+    private Exchange createExchange(io.iron.ironmq.Message msg) {
+        Exchange exchange = createExchange(true);
+        exchange.setPattern(getEndpoint().getExchangePattern());
+        org.apache.camel.Message message = exchange.getIn();
+        if (getEndpoint().getConfiguration().isPreserveHeaders()) {
+            GsonUtil.copyFrom(msg, message);
+        } else {
+            message.setBody(msg.getBody());
+        }
+        message.setHeader(IronMQConstants.MESSAGE_ID, msg.getId());
+        message.setHeader(IronMQConstants.MESSAGE_RESERVATION_ID, 
msg.getReservationId());
+        message.setHeader(IronMQConstants.MESSAGE_RESERVED_COUNT, 
msg.getReservedCount());
+        return exchange;
+    }
+
 }
diff --git 
a/components/camel-ironmq/src/main/java/org/apache/camel/component/ironmq/IronMQEndpoint.java
 
b/components/camel-ironmq/src/main/java/org/apache/camel/component/ironmq/IronMQEndpoint.java
index db0b244..4e8c116 100644
--- 
a/components/camel-ironmq/src/main/java/org/apache/camel/component/ironmq/IronMQEndpoint.java
+++ 
b/components/camel-ironmq/src/main/java/org/apache/camel/component/ironmq/IronMQEndpoint.java
@@ -22,9 +22,6 @@ import io.iron.ironmq.Client;
 import io.iron.ironmq.Cloud;
 import org.apache.camel.Category;
 import org.apache.camel.Consumer;
-import org.apache.camel.Exchange;
-import org.apache.camel.ExchangePattern;
-import org.apache.camel.Message;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
 import org.apache.camel.spi.UriEndpoint;
@@ -74,24 +71,6 @@ public class IronMQEndpoint extends ScheduledPollEndpoint {
         return ironMQConsumer;
     }
 
-    public Exchange createExchange(io.iron.ironmq.Message msg) {
-        return createExchange(getExchangePattern(), msg);
-    }
-
-    private Exchange createExchange(ExchangePattern pattern, 
io.iron.ironmq.Message msg) {
-        Exchange exchange = super.createExchange(pattern);
-        Message message = exchange.getIn();
-        if (configuration.isPreserveHeaders()) {
-            GsonUtil.copyFrom(msg, message);
-        } else {
-            message.setBody(msg.getBody());
-        }
-        message.setHeader(IronMQConstants.MESSAGE_ID, msg.getId());
-        message.setHeader(IronMQConstants.MESSAGE_RESERVATION_ID, 
msg.getReservationId());
-        message.setHeader(IronMQConstants.MESSAGE_RESERVED_COUNT, 
msg.getReservedCount());
-        return exchange;
-    }
-
     @Override
     protected void doStart() throws Exception {
         super.doStart();

Reply via email to