This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch exchange-factory
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 0405d1977236f5571f62723efc3c2802a6b0e2ac
Author: Claus Ibsen <claus.ib...@gmail.com>
AuthorDate: Sun Feb 21 09:24:58 2021 +0100

    CAMEL-16222: PooledExchangeFactory experiment
---
 .../java/org/apache/camel/component/ahc/ws/WsConsumer.java |  2 +-
 .../java/org/apache/camel/component/apns/ApnsConsumer.java |  2 +-
 .../java/org/apache/camel/component/as2/AS2Consumer.java   |  9 +++++----
 .../apache/camel/component/asterisk/AsteriskConsumer.java  |  9 +++++----
 .../integration/consumer/AtmosScheduledPollConsumer.java   |  4 ----
 .../consumer/AtmosScheduledPollGetConsumer.java            | 14 +++++++-------
 .../component/atmosphere/websocket/WebsocketConsumer.java  |  6 +++---
 7 files changed, 22 insertions(+), 24 deletions(-)

diff --git 
a/components/camel-ahc-ws/src/main/java/org/apache/camel/component/ahc/ws/WsConsumer.java
 
b/components/camel-ahc-ws/src/main/java/org/apache/camel/component/ahc/ws/WsConsumer.java
index 1341bb4..8bf9923 100644
--- 
a/components/camel-ahc-ws/src/main/java/org/apache/camel/component/ahc/ws/WsConsumer.java
+++ 
b/components/camel-ahc-ws/src/main/java/org/apache/camel/component/ahc/ws/WsConsumer.java
@@ -68,7 +68,7 @@ public class WsConsumer extends DefaultConsumer {
     }
 
     private void sendMessageInternal(Object message) {
-        final Exchange exchange = getEndpoint().createExchange();
+        final Exchange exchange = createExchange(true);
 
         //TODO may set some headers with some meta info (e.g., socket info, 
unique-id for correlation purpose, etc0 
         // set the body
diff --git 
a/components/camel-apns/src/main/java/org/apache/camel/component/apns/ApnsConsumer.java
 
b/components/camel-apns/src/main/java/org/apache/camel/component/apns/ApnsConsumer.java
index d0782a6..5d45e70 100644
--- 
a/components/camel-apns/src/main/java/org/apache/camel/component/apns/ApnsConsumer.java
+++ 
b/components/camel-apns/src/main/java/org/apache/camel/component/apns/ApnsConsumer.java
@@ -51,7 +51,7 @@ public class ApnsConsumer extends ScheduledPollConsumer {
         while (it.hasNext()) {
             InactiveDevice inactiveDevice = it.next();
 
-            Exchange e = getEndpoint().createExchange();
+            Exchange e = createExchange(true);
             e.getIn().setBody(inactiveDevice);
             getProcessor().process(e);
         }
diff --git 
a/components/camel-as2/camel-as2-component/src/main/java/org/apache/camel/component/as2/AS2Consumer.java
 
b/components/camel-as2/camel-as2-component/src/main/java/org/apache/camel/component/as2/AS2Consumer.java
index 7e767eb..08b68d2 100644
--- 
a/components/camel-as2/camel-as2-component/src/main/java/org/apache/camel/component/as2/AS2Consumer.java
+++ 
b/components/camel-as2/camel-as2-component/src/main/java/org/apache/camel/component/as2/AS2Consumer.java
@@ -124,17 +124,18 @@ public class AS2Consumer extends 
AbstractApiConsumer<AS2ApiName, AS2Configuratio
                     = HttpMessageUtils.extractEdiPayload(request, 
as2ServerConnection.getDecryptingPrivateKey());
 
             // Set AS2 Interchange property and EDI message into body of input 
message.
-            Exchange exchange = getEndpoint().createExchange();
-            HttpCoreContext coreContext = HttpCoreContext.adapt(context);
-            exchange.setProperty(AS2Constants.AS2_INTERCHANGE, coreContext);
-            exchange.getIn().setBody(ediEntity.getEdiMessage());
+            Exchange exchange = createExchange(false);
 
             try {
+                HttpCoreContext coreContext = HttpCoreContext.adapt(context);
+                exchange.setProperty(AS2Constants.AS2_INTERCHANGE, 
coreContext);
+                exchange.getIn().setBody(ediEntity.getEdiMessage());
                 // send message to next processor in the route
                 getProcessor().process(exchange);
             } finally {
                 // check if an exception occurred and was not handled
                 exception = exchange.getException();
+                releaseExchange(exchange, false);
             }
         } catch (Exception e) {
             LOG.warn("Failed to process AS2 message", e);
diff --git 
a/components/camel-asterisk/src/main/java/org/apache/camel/component/asterisk/AsteriskConsumer.java
 
b/components/camel-asterisk/src/main/java/org/apache/camel/component/asterisk/AsteriskConsumer.java
index 24c8368..e650178 100644
--- 
a/components/camel-asterisk/src/main/java/org/apache/camel/component/asterisk/AsteriskConsumer.java
+++ 
b/components/camel-asterisk/src/main/java/org/apache/camel/component/asterisk/AsteriskConsumer.java
@@ -62,14 +62,15 @@ public class AsteriskConsumer extends DefaultConsumer {
     private final class EventListener implements ManagerEventListener {
         @Override
         public void onManagerEvent(ManagerEvent event) {
-            Exchange exchange = endpoint.createExchange();
-            exchange.getIn().setHeader(AsteriskConstants.EVENT_NAME, 
event.getClass().getSimpleName());
-            exchange.getIn().setBody(event);
-
+            Exchange exchange = createExchange(false);
             try {
+                exchange.getIn().setHeader(AsteriskConstants.EVENT_NAME, 
event.getClass().getSimpleName());
+                exchange.getIn().setBody(event);
                 getProcessor().process(exchange);
             } catch (Exception e) {
                 getExceptionHandler().handleException("Error processing 
exchange.", exchange, e);
+            } finally {
+                releaseExchange(exchange, false);
             }
         }
     }
diff --git 
a/components/camel-atmos/src/main/java/org/apache/camel/component/atmos/integration/consumer/AtmosScheduledPollConsumer.java
 
b/components/camel-atmos/src/main/java/org/apache/camel/component/atmos/integration/consumer/AtmosScheduledPollConsumer.java
index 0c1bb56..950415f 100644
--- 
a/components/camel-atmos/src/main/java/org/apache/camel/component/atmos/integration/consumer/AtmosScheduledPollConsumer.java
+++ 
b/components/camel-atmos/src/main/java/org/apache/camel/component/atmos/integration/consumer/AtmosScheduledPollConsumer.java
@@ -38,8 +38,6 @@ public abstract class AtmosScheduledPollConsumer extends 
ScheduledPollConsumer {
     /**
      * Lifecycle method invoked when the consumer has created. Internally 
create or reuse a connection to the low level
      * atmos client
-     * 
-     * @throws Exception
      */
     @Override
     protected void doStart() throws Exception {
@@ -53,8 +51,6 @@ public abstract class AtmosScheduledPollConsumer extends 
ScheduledPollConsumer {
 
     /**
      * Lifecycle method invoked when the consumer has destroyed. Erase the 
reference to the atmos low level client
-     * 
-     * @throws Exception
      */
     @Override
     protected void doStop() throws Exception {
diff --git 
a/components/camel-atmos/src/main/java/org/apache/camel/component/atmos/integration/consumer/AtmosScheduledPollGetConsumer.java
 
b/components/camel-atmos/src/main/java/org/apache/camel/component/atmos/integration/consumer/AtmosScheduledPollGetConsumer.java
index e2d7f43..1f2fc02 100644
--- 
a/components/camel-atmos/src/main/java/org/apache/camel/component/atmos/integration/consumer/AtmosScheduledPollGetConsumer.java
+++ 
b/components/camel-atmos/src/main/java/org/apache/camel/component/atmos/integration/consumer/AtmosScheduledPollGetConsumer.java
@@ -32,17 +32,16 @@ public class AtmosScheduledPollGetConsumer extends 
AtmosScheduledPollConsumer {
     /**
      * Poll from an atmos remote path and put the result in the message 
exchange
      * 
-     * @return           number of messages polled
-     * @throws Exception
+     * @return number of messages polled
      */
     @Override
     protected int poll() throws Exception {
-        Exchange exchange = endpoint.createExchange();
-        AtmosResult result = 
AtmosAPIFacade.getInstance(configuration.getClient())
-                .get(configuration.getRemotePath());
-        result.populateExchange(exchange);
-
+        Exchange exchange = createExchange(false);
         try {
+            AtmosResult result = 
AtmosAPIFacade.getInstance(configuration.getClient())
+                    .get(configuration.getRemotePath());
+            result.populateExchange(exchange);
+
             // send message to next processor in the route
             getProcessor().process(exchange);
             return 1; // number of messages polled
@@ -51,6 +50,7 @@ public class AtmosScheduledPollGetConsumer extends 
AtmosScheduledPollConsumer {
             if (exchange.getException() != null) {
                 getExceptionHandler().handleException("Error processing 
exchange", exchange, exchange.getException());
             }
+            releaseExchange(exchange, false);
         }
     }
 }
diff --git 
a/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/WebsocketConsumer.java
 
b/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/WebsocketConsumer.java
index 8af4199..8957b6a 100644
--- 
a/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/WebsocketConsumer.java
+++ 
b/components/camel-atmosphere-websocket/src/main/java/org/apache/camel/component/atmosphere/websocket/WebsocketConsumer.java
@@ -84,7 +84,7 @@ public class WebsocketConsumer extends ServletConsumer {
     }
 
     public void sendMessage(final String connectionKey, Object message) {
-        final Exchange exchange = getEndpoint().createExchange();
+        final Exchange exchange = createExchange(true);
 
         // set header and body
         exchange.getIn().setHeader(WebsocketConstants.CONNECTION_KEY, 
connectionKey);
@@ -101,7 +101,7 @@ public class WebsocketConsumer extends ServletConsumer {
     }
 
     public void sendEventNotification(String connectionKey, int eventType) {
-        final Exchange exchange = getEndpoint().createExchange();
+        final Exchange exchange = createExchange(true);
 
         // set header
         exchange.getIn().setHeader(WebsocketConstants.CONNECTION_KEY, 
connectionKey);
@@ -122,7 +122,7 @@ public class WebsocketConsumer extends ServletConsumer {
     }
 
     public void sendNotDeliveredMessage(List<String> failedConnectionKeys, 
Object message) {
-        final Exchange exchange = getEndpoint().createExchange();
+        final Exchange exchange = createExchange(true);
 
         // set header and body
         exchange.getIn().setHeader(WebsocketConstants.CONNECTION_KEY_LIST, 
failedConnectionKeys);

Reply via email to