This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch exchange-factory
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 2c612eb53bc40fff40fc71faa679236cf4304c07
Author: Claus Ibsen <claus.ib...@gmail.com>
AuthorDate: Mon Feb 22 08:04:44 2021 +0100

    CAMEL-16222: PooledExchangeFactory experiment
---
 .../apache/camel/component/mail/MailConsumer.java  | 11 ++-
 .../apache/camel/component/mail/MailEndpoint.java  |  7 --
 .../component/milo/client/MiloClientConsumer.java  |  5 +-
 .../component/milo/server/MiloServerConsumer.java  |  4 +-
 .../apache/camel/component/mina/MinaConsumer.java  | 99 +++++++++++++---------
 .../apache/camel/component/mina/MinaEndpoint.java  | 11 ---
 .../mina/MinaTransferExchangeOptionTest.java       |  1 -
 .../camel/component/minio/MinioConsumer.java       | 36 +++++++-
 .../camel/component/minio/MinioEndpoint.java       | 40 +--------
 .../apache/camel/component/mllp/MllpEndpoint.java  |  2 +-
 .../component/mllp/MllpTcpServerConsumer.java      |  8 +-
 .../component/mongodb/gridfs/GridFsConsumer.java   |  2 +-
 .../mongodb/MongoDbChangeStreamsConsumer.java      |  4 +
 .../mongodb/MongoDbChangeStreamsThread.java        | 14 ++-
 .../camel/component/mongodb/MongoDbEndpoint.java   | 12 ---
 .../component/mongodb/MongoDbTailingThread.java    | 14 ++-
 .../camel/component/mybatis/MyBatisConsumer.java   | 21 +++--
 .../mybatis/MyBatisConsumerIsolatedTest.java       | 53 ------------
 18 files changed, 157 insertions(+), 187 deletions(-)

diff --git 
a/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java
 
b/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java
index dd541b2..6fe7825 100644
--- 
a/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java
+++ 
b/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java
@@ -385,7 +385,7 @@ public class MailConsumer extends 
ScheduledBatchPollingConsumer {
                 }
 
                 if (!message.getFlags().contains(Flags.Flag.DELETED)) {
-                    Exchange exchange = getEndpoint().createExchange(message);
+                    Exchange exchange = createExchange(message);
                     if (getEndpoint().getConfiguration().isMapMailMessage()) {
                         // ensure the mail message is mapped, which can be 
ensured by touching the body/header/attachment
                         LOG.trace("Mapping #{} from javax.mail.Message to 
Camel MailMessage", i);
@@ -399,6 +399,8 @@ public class MailConsumer extends 
ScheduledBatchPollingConsumer {
                                 
exchange.getIn(AttachmentMessage.class).setAttachmentObjects(att);
                             }
                         } catch (MessagingException | IOException e) {
+                            // must release exchange before throwing exception
+                            releaseExchange(exchange, true);
                             throw new RuntimeCamelException("Error accessing 
attachments due to: " + e.getMessage(), e);
                         }
                     }
@@ -511,6 +513,13 @@ public class MailConsumer extends 
ScheduledBatchPollingConsumer {
         }
     }
 
+    private Exchange createExchange(Message message) {
+        Exchange exchange = createExchange(true);
+        exchange.setProperty(Exchange.BINDING, getEndpoint().getBinding());
+        exchange.setIn(new MailMessage(exchange, message, 
getEndpoint().getConfiguration().isMapMailMessage()));
+        return exchange;
+    }
+
     private void copyOrMoveMessageIfRequired(
             MailConfiguration config, Message mail, String destinationFolder, 
boolean moveMessage)
             throws MessagingException {
diff --git 
a/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailEndpoint.java
 
b/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailEndpoint.java
index c7723cd..1ce1be9 100644
--- 
a/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailEndpoint.java
+++ 
b/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailEndpoint.java
@@ -126,13 +126,6 @@ public class MailEndpoint extends ScheduledPollEndpoint 
implements HeaderFilterS
         return answer;
     }
 
-    public Exchange createExchange(Message message) {
-        Exchange exchange = super.createExchange();
-        exchange.setProperty(Exchange.BINDING, getBinding());
-        exchange.setIn(new MailMessage(exchange, message, 
getConfiguration().isMapMailMessage()));
-        return exchange;
-    }
-
     // Properties
     // 
-------------------------------------------------------------------------
 
diff --git 
a/components/camel-milo/src/main/java/org/apache/camel/component/milo/client/MiloClientConsumer.java
 
b/components/camel-milo/src/main/java/org/apache/camel/component/milo/client/MiloClientConsumer.java
index 408af8c..6068910 100644
--- 
a/components/camel-milo/src/main/java/org/apache/camel/component/milo/client/MiloClientConsumer.java
+++ 
b/components/camel-milo/src/main/java/org/apache/camel/component/milo/client/MiloClientConsumer.java
@@ -69,10 +69,9 @@ public class MiloClientConsumer extends DefaultConsumer {
     private void handleValueUpdate(final DataValue value) {
         LOG.debug("Handle item update - {} = {}", node, value);
 
-        final Exchange exchange = getEndpoint().createExchange();
-        mapToMessage(value, exchange.getMessage());
-
+        final Exchange exchange = createExchange(true);
         try {
+            mapToMessage(value, exchange.getMessage());
             getProcessor().process(exchange);
         } catch (final Exception e) {
             getExceptionHandler().handleException("Error processing exchange", 
e);
diff --git 
a/components/camel-milo/src/main/java/org/apache/camel/component/milo/server/MiloServerConsumer.java
 
b/components/camel-milo/src/main/java/org/apache/camel/component/milo/server/MiloServerConsumer.java
index f958974..7677e17 100644
--- 
a/components/camel-milo/src/main/java/org/apache/camel/component/milo/server/MiloServerConsumer.java
+++ 
b/components/camel-milo/src/main/java/org/apache/camel/component/milo/server/MiloServerConsumer.java
@@ -54,10 +54,10 @@ public class MiloServerConsumer extends DefaultConsumer {
     }
 
     protected void performWrite(final DataValue value) {
-        Exchange exchange = getEndpoint().createExchange();
-        mapToMessage(value, exchange.getMessage());
+        Exchange exchange = createExchange(true);
 
         try {
+            mapToMessage(value, exchange.getMessage());
             getProcessor().process(exchange);
         } catch (Exception e) {
             getExceptionHandler().handleException("Error processing exchange", 
e);
diff --git 
a/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConsumer.java
 
b/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConsumer.java
index 1152f8f..76917e3 100644
--- 
a/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConsumer.java
+++ 
b/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaConsumer.java
@@ -373,6 +373,21 @@ public class MinaConsumer extends DefaultConsumer {
         this.acceptor = acceptor;
     }
 
+    private Exchange createExchange(IoSession session, Object payload) {
+        Exchange exchange;
+        if (configuration.isTransferExchange()) {
+            // do not release
+            exchange = getEndpoint().createExchange();
+        } else {
+            exchange = createExchange(false);
+        }
+        exchange.getIn().setHeader(MinaConstants.MINA_IOSESSION, session);
+        exchange.getIn().setHeader(MinaConstants.MINA_LOCAL_ADDRESS, 
session.getLocalAddress());
+        exchange.getIn().setHeader(MinaConstants.MINA_REMOTE_ADDRESS, 
session.getRemoteAddress());
+        MinaPayloadHelper.setIn(exchange, payload);
+        return exchange;
+    }
+
     /**
      * Handles consuming messages and replying if the exchange is out capable.
      */
@@ -406,7 +421,7 @@ public class MinaConsumer extends DefaultConsumer {
                 LOG.debug("Received body: {}", in);
             }
 
-            Exchange exchange = getEndpoint().createExchange(session, object);
+            Exchange exchange = createExchange(session, object);
             //Set the exchange charset property for converting
             if (getEndpoint().getConfiguration().getCharsetName() != null) {
                 exchange.setProperty(Exchange.CHARSET_NAME,
@@ -419,50 +434,54 @@ public class MinaConsumer extends DefaultConsumer {
                 getExceptionHandler().handleException(e);
             }
 
-            //
-            // If there's a response to send, send it.
-            //
-            boolean disconnect = 
getEndpoint().getConfiguration().isDisconnect();
-            Object response = null;
-            if (exchange.hasOut()) {
-                response = MinaPayloadHelper.getOut(getEndpoint(), exchange);
-            } else {
-                response = MinaPayloadHelper.getIn(getEndpoint(), exchange);
-            }
-
-            boolean failed = exchange.isFailed();
-            if (failed && 
!getEndpoint().getConfiguration().isTransferExchange()) {
-                if (exchange.getException() != null) {
-                    response = exchange.getException();
+            try {
+                //
+                // If there's a response to send, send it.
+                //
+                boolean disconnect = 
getEndpoint().getConfiguration().isDisconnect();
+                Object response;
+                if (exchange.hasOut()) {
+                    response = MinaPayloadHelper.getOut(getEndpoint(), 
exchange);
                 } else {
-                    // failed and no exception, must be a fault
-                    response = exchange.getOut().getBody();
+                    response = MinaPayloadHelper.getIn(getEndpoint(), 
exchange);
                 }
-            }
 
-            if (response != null) {
-                LOG.debug("Writing body: {}", response);
-                MinaHelper.writeBody(session, response, exchange, 
configuration.getWriteTimeout());
-            } else {
-                LOG.debug("Writing no response");
-                disconnect = Boolean.TRUE;
-            }
+                boolean failed = exchange.isFailed();
+                if (failed && 
!getEndpoint().getConfiguration().isTransferExchange()) {
+                    if (exchange.getException() != null) {
+                        response = exchange.getException();
+                    } else {
+                        // failed and no exception, must be a fault
+                        response = exchange.getOut().getBody();
+                    }
+                }
 
-            // should session be closed after complete?
-            Boolean close;
-            if (ExchangeHelper.isOutCapable(exchange)) {
-                close = 
exchange.getOut().getHeader(MinaConstants.MINA_CLOSE_SESSION_WHEN_COMPLETE, 
Boolean.class);
-            } else {
-                close = 
exchange.getIn().getHeader(MinaConstants.MINA_CLOSE_SESSION_WHEN_COMPLETE, 
Boolean.class);
-            }
+                if (response != null) {
+                    LOG.debug("Writing body: {}", response);
+                    MinaHelper.writeBody(session, response, exchange, 
configuration.getWriteTimeout());
+                } else {
+                    LOG.debug("Writing no response");
+                    disconnect = Boolean.TRUE;
+                }
 
-            // should we disconnect, the header can override the configuration
-            if (close != null) {
-                disconnect = close;
-            }
-            if (disconnect) {
-                LOG.debug("Closing session when complete at address: {}", 
address);
-                session.closeNow();
+                // should session be closed after complete?
+                Boolean close;
+                if (ExchangeHelper.isOutCapable(exchange)) {
+                    close = 
exchange.getOut().getHeader(MinaConstants.MINA_CLOSE_SESSION_WHEN_COMPLETE, 
Boolean.class);
+                } else {
+                    close = 
exchange.getIn().getHeader(MinaConstants.MINA_CLOSE_SESSION_WHEN_COMPLETE, 
Boolean.class);
+                }
+
+                // should we disconnect, the header can override the 
configuration
+                if (close != null) {
+                    disconnect = close;
+                }
+                if (disconnect) {
+                    LOG.debug("Closing session when complete at address: {}", 
address);
+                    session.closeNow();
+                }
+            } finally {
+                releaseExchange(exchange, false);
             }
         }
     }
diff --git 
a/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaEndpoint.java
 
b/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaEndpoint.java
index 2d3784a..416579a 100644
--- 
a/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaEndpoint.java
+++ 
b/components/camel-mina/src/main/java/org/apache/camel/component/mina/MinaEndpoint.java
@@ -19,7 +19,6 @@ package org.apache.camel.component.mina;
 import org.apache.camel.Category;
 import org.apache.camel.Component;
 import org.apache.camel.Consumer;
-import org.apache.camel.Exchange;
 import org.apache.camel.MultipleConsumersSupport;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
@@ -27,7 +26,6 @@ import org.apache.camel.spi.UriEndpoint;
 import org.apache.camel.spi.UriParam;
 import org.apache.camel.support.DefaultEndpoint;
 import org.apache.camel.util.ObjectHelper;
-import org.apache.mina.core.session.IoSession;
 
 /**
  * Socket level networking using TCP or UDP with Apache Mina 2.x.
@@ -68,15 +66,6 @@ public class MinaEndpoint extends DefaultEndpoint implements 
MultipleConsumersSu
         return answer;
     }
 
-    public Exchange createExchange(IoSession session, Object payload) {
-        Exchange exchange = createExchange();
-        exchange.getIn().setHeader(MinaConstants.MINA_IOSESSION, session);
-        exchange.getIn().setHeader(MinaConstants.MINA_LOCAL_ADDRESS, 
session.getLocalAddress());
-        exchange.getIn().setHeader(MinaConstants.MINA_REMOTE_ADDRESS, 
session.getRemoteAddress());
-        MinaPayloadHelper.setIn(exchange, payload);
-        return exchange;
-    }
-
     @Override
     public boolean isMultipleConsumersSupported() {
         // only datagram should allow multiple consumers
diff --git 
a/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaTransferExchangeOptionTest.java
 
b/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaTransferExchangeOptionTest.java
index c0354ae..140617b 100644
--- 
a/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaTransferExchangeOptionTest.java
+++ 
b/components/camel-mina/src/test/java/org/apache/camel/component/mina/MinaTransferExchangeOptionTest.java
@@ -58,7 +58,6 @@ public class MinaTransferExchangeOptionTest extends 
BaseMinaTest {
                 
String.format("mina:tcp://localhost:%1$s?sync=true&encoding=UTF-8&transferExchange=true",
 getPort()));
         Producer producer = endpoint.createProducer();
         Exchange exchange = endpoint.createExchange();
-        //Exchange exchange = endpoint.createExchange();
 
         Message message = exchange.getIn();
         message.setBody("Hello!");
diff --git 
a/components/camel-minio/src/main/java/org/apache/camel/component/minio/MinioConsumer.java
 
b/components/camel-minio/src/main/java/org/apache/camel/component/minio/MinioConsumer.java
index df0a7d9..e98b295 100644
--- 
a/components/camel-minio/src/main/java/org/apache/camel/component/minio/MinioConsumer.java
+++ 
b/components/camel-minio/src/main/java/org/apache/camel/component/minio/MinioConsumer.java
@@ -39,9 +39,11 @@ import io.minio.errors.MinioException;
 import io.minio.messages.Item;
 import org.apache.camel.Exchange;
 import org.apache.camel.ExtendedExchange;
+import org.apache.camel.Message;
 import org.apache.camel.Processor;
 import org.apache.camel.spi.Synchronization;
 import org.apache.camel.support.ScheduledBatchPollingConsumer;
+import org.apache.camel.support.SynchronizationAdapter;
 import org.apache.camel.util.CastUtils;
 import org.apache.camel.util.IOHelper;
 import org.apache.camel.util.URISupport;
@@ -182,7 +184,7 @@ public class MinioConsumer extends 
ScheduledBatchPollingConsumer {
 
     protected Queue<Exchange> createExchanges(InputStream objectStream, String 
objectName) throws Exception {
         Queue<Exchange> answer = new LinkedList<>();
-        Exchange exchange = getEndpoint().createExchange(objectStream, 
objectName);
+        Exchange exchange = createExchange(objectStream, objectName);
         answer.add(exchange);
         IOHelper.close(objectStream);
         return answer;
@@ -200,7 +202,7 @@ public class MinioConsumer extends 
ScheduledBatchPollingConsumer {
                     Item minioObjectSummary = 
minioObjectSummaries.next().get();
                     InputStream minioObject = getObject(bucketName, 
getMinioClient(), minioObjectSummary.objectName());
                     minioObjects.add(minioObject);
-                    Exchange exchange = 
getEndpoint().createExchange(minioObject, minioObjectSummary.objectName());
+                    Exchange exchange = createExchange(minioObject, 
minioObjectSummary.objectName());
                     answer.add(exchange);
                     continuationToken = minioObjectSummary.objectName();
                 } while (minioObjectSummaries.hasNext());
@@ -212,7 +214,7 @@ public class MinioConsumer extends 
ScheduledBatchPollingConsumer {
                     if (!minioObjectSummary.isDir()) {
                         InputStream minioObject = getObject(bucketName, 
getMinioClient(), minioObjectSummary.objectName());
                         minioObjects.add(minioObject);
-                        Exchange exchange = 
getEndpoint().createExchange(minioObject, minioObjectSummary.objectName());
+                        Exchange exchange = createExchange(minioObject, 
minioObjectSummary.objectName());
                         answer.add(exchange);
                         continuationToken = minioObjectSummary.objectName();
                     }
@@ -401,6 +403,34 @@ public class MinioConsumer extends 
ScheduledBatchPollingConsumer {
         return (MinioEndpoint) super.getEndpoint();
     }
 
+    private Exchange createExchange(InputStream minioObject, String 
objectName) throws Exception {
+        LOG.trace("Getting object with objectName {} from bucket {}...", 
objectName, getConfiguration().getBucketName());
+
+        Exchange exchange = createExchange(true);
+        exchange.setPattern(getEndpoint().getExchangePattern());
+        Message message = exchange.getIn();
+        LOG.trace("Got object!");
+
+        getEndpoint().getObjectStat(objectName, message);
+
+        if (getConfiguration().isIncludeBody()) {
+            message.setBody(getEndpoint().readInputStream(minioObject));
+            if (getConfiguration().isAutoCloseBody()) {
+                exchange.adapt(ExtendedExchange.class).addOnCompletion(new 
SynchronizationAdapter() {
+                    @Override
+                    public void onDone(Exchange exchange) {
+                        IOHelper.close(minioObject);
+                    }
+                });
+            }
+        } else {
+            message.setBody(null);
+            IOHelper.close(minioObject);
+        }
+
+        return exchange;
+    }
+
     @Override
     public String toString() {
         if (isEmpty(minioConsumerToString)) {
diff --git 
a/components/camel-minio/src/main/java/org/apache/camel/component/minio/MinioEndpoint.java
 
b/components/camel-minio/src/main/java/org/apache/camel/component/minio/MinioEndpoint.java
index e85cf8f..bb67186 100644
--- 
a/components/camel-minio/src/main/java/org/apache/camel/component/minio/MinioEndpoint.java
+++ 
b/components/camel-minio/src/main/java/org/apache/camel/component/minio/MinioEndpoint.java
@@ -32,17 +32,12 @@ import io.minio.StatObjectResponse;
 import org.apache.camel.Category;
 import org.apache.camel.Component;
 import org.apache.camel.Consumer;
-import org.apache.camel.Exchange;
-import org.apache.camel.ExchangePattern;
-import org.apache.camel.ExtendedExchange;
 import org.apache.camel.Message;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
 import org.apache.camel.spi.UriEndpoint;
 import org.apache.camel.spi.UriParam;
 import org.apache.camel.support.ScheduledPollEndpoint;
-import org.apache.camel.support.SynchronizationAdapter;
-import org.apache.camel.util.IOHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -123,37 +118,6 @@ public class MinioEndpoint extends ScheduledPollEndpoint {
         super.doStop();
     }
 
-    public Exchange createExchange(InputStream minioObject, String objectName) 
throws Exception {
-        return createExchange(getExchangePattern(), minioObject, objectName);
-    }
-
-    public Exchange createExchange(ExchangePattern pattern, InputStream 
minioObject, String objectName) throws Exception {
-        LOG.trace("Getting object with objectName {} from bucket {}...", 
objectName, getConfiguration().getBucketName());
-
-        Exchange exchange = super.createExchange(pattern);
-        Message message = exchange.getIn();
-        LOG.trace("Got object!");
-
-        getObjectStat(objectName, message);
-
-        if (getConfiguration().isIncludeBody()) {
-            message.setBody(readInputStream(minioObject));
-            if (getConfiguration().isAutoCloseBody()) {
-                exchange.adapt(ExtendedExchange.class).addOnCompletion(new 
SynchronizationAdapter() {
-                    @Override
-                    public void onDone(Exchange exchange) {
-                        IOHelper.close(minioObject);
-                    }
-                });
-            }
-        } else {
-            message.setBody(null);
-            IOHelper.close(minioObject);
-        }
-
-        return exchange;
-    }
-
     public MinioConfiguration getConfiguration() {
         return configuration;
     }
@@ -196,7 +160,7 @@ public class MinioEndpoint extends ScheduledPollEndpoint {
         }
     }
 
-    private String readInputStream(InputStream minioObject) throws IOException 
{
+    String readInputStream(InputStream minioObject) throws IOException {
         StringBuilder textBuilder = new StringBuilder();
         try (Reader reader = new BufferedReader(new 
InputStreamReader(minioObject, StandardCharsets.UTF_8))) {
             int c;
@@ -227,7 +191,7 @@ public class MinioEndpoint extends ScheduledPollEndpoint {
         LOG.trace("Bucket policy updated");
     }
 
-    private void getObjectStat(String objectName, Message message) throws 
Exception {
+    void getObjectStat(String objectName, Message message) throws Exception {
 
         String bucketName = getConfiguration().getBucketName();
         StatObjectArgs.Builder statObjectRequest = 
StatObjectArgs.builder().bucket(bucketName).object(objectName);
diff --git 
a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpEndpoint.java
 
b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpEndpoint.java
index d1e3a4a..35719f0 100644
--- 
a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpEndpoint.java
+++ 
b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpEndpoint.java
@@ -109,7 +109,7 @@ public class MllpEndpoint extends DefaultEndpoint {
         super.setBridgeErrorHandler(configuration.isBridgeErrorHandler());
     }
 
-    private void setExchangeProperties(Exchange mllpExchange) {
+    void setExchangeProperties(Exchange mllpExchange) {
         if (configuration.hasCharsetName()) {
             mllpExchange.setProperty(Exchange.CHARSET_NAME, 
configuration.getCharsetName());
         }
diff --git 
a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpServerConsumer.java
 
b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpServerConsumer.java
index d43c356..2ca75bb 100644
--- 
a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpServerConsumer.java
+++ 
b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/MllpTcpServerConsumer.java
@@ -228,7 +228,8 @@ public class MllpTcpServerConsumer extends DefaultConsumer {
         // Send the message on to Camel for processing and wait for the 
response
         log.debug("processMessage(hl7MessageBytes[{}], {}) - populating the 
exchange with received payload",
                 hl7MessageBytes == null ? -1 : hl7MessageBytes.length, 
consumerRunnable.getSocket());
-        Exchange exchange = 
getEndpoint().createExchange(ExchangePattern.InOut);
+        Exchange exchange = createExchange(false);
+        exchange.setPattern(ExchangePattern.InOut);
         if (getConfiguration().hasCharsetName()) {
             exchange.setProperty(Exchange.CHARSET_NAME, 
getConfiguration().getCharsetName());
         }
@@ -283,9 +284,8 @@ public class MllpTcpServerConsumer extends DefaultConsumer {
                     "processMessage(byte[], TcpSocketConsumerRunnable) - 
Unexpected exception creating Unit of Work", exchange,
                     uowEx);
         } finally {
-            if (exchange != null) {
-                doneUoW(exchange);
-            }
+            doneUoW(exchange);
+            releaseExchange(exchange, false);
         }
     }
 
diff --git 
a/components/camel-mongodb-gridfs/src/main/java/org/apache/camel/component/mongodb/gridfs/GridFsConsumer.java
 
b/components/camel-mongodb-gridfs/src/main/java/org/apache/camel/component/mongodb/gridfs/GridFsConsumer.java
index 60eff1d..74fe66a 100644
--- 
a/components/camel-mongodb-gridfs/src/main/java/org/apache/camel/component/mongodb/gridfs/GridFsConsumer.java
+++ 
b/components/camel-mongodb-gridfs/src/main/java/org/apache/camel/component/mongodb/gridfs/GridFsConsumer.java
@@ -142,7 +142,7 @@ public class GridFsConsumer extends DefaultConsumer 
implements Runnable {
                         forig = 
endpoint.getFilesCollection().findOneAndUpdate(filter, update, options);
                     }
                     if (forig != null) {
-                        Exchange exchange = endpoint.createExchange();
+                        Exchange exchange = createExchange(true);
                         GridFSDownloadStream downloadStream = 
endpoint.getGridFsBucket().openDownloadStream(file.getFilename());
                         file = downloadStream.getGridFSFile();
 
diff --git 
a/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbChangeStreamsConsumer.java
 
b/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbChangeStreamsConsumer.java
index f625205..4fd3b23 100644
--- 
a/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbChangeStreamsConsumer.java
+++ 
b/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbChangeStreamsConsumer.java
@@ -19,10 +19,13 @@ package org.apache.camel.component.mongodb;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
 
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
 import org.apache.camel.Processor;
 import org.apache.camel.support.DefaultConsumer;
 import org.apache.camel.util.ObjectHelper;
 import org.bson.BsonDocument;
+import org.bson.Document;
 
 import static java.util.Collections.singletonList;
 
@@ -69,4 +72,5 @@ public class MongoDbChangeStreamsConsumer extends 
DefaultConsumer {
         changeStreamsThread.init();
         executor.execute(changeStreamsThread);
     }
+
 }
diff --git 
a/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbChangeStreamsThread.java
 
b/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbChangeStreamsThread.java
index 116b072..8757793 100644
--- 
a/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbChangeStreamsThread.java
+++ 
b/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbChangeStreamsThread.java
@@ -24,6 +24,7 @@ import com.mongodb.client.MongoCursor;
 import com.mongodb.client.model.changestream.ChangeStreamDocument;
 import com.mongodb.client.model.changestream.OperationType;
 import org.apache.camel.Exchange;
+import org.apache.camel.Message;
 import org.bson.BsonDocument;
 import org.bson.Document;
 import org.bson.types.ObjectId;
@@ -70,7 +71,7 @@ class MongoDbChangeStreamsThread extends 
MongoAbstractConsumerThread {
         try {
             while (cursor.hasNext() && keepRunning) {
                 ChangeStreamDocument<Document> dbObj = 
(ChangeStreamDocument<Document>) cursor.next();
-                Exchange exchange = 
endpoint.createMongoDbExchange(dbObj.getFullDocument());
+                Exchange exchange = 
createMongoDbExchange(dbObj.getFullDocument());
 
                 ObjectId documentId = 
dbObj.getDocumentKey().getObjectId(MONGO_ID).getValue();
                 OperationType operationType = dbObj.getOperationType();
@@ -101,4 +102,15 @@ class MongoDbChangeStreamsThread extends 
MongoAbstractConsumerThread {
             }
         }
     }
+
+    private Exchange createMongoDbExchange(Document dbObj) {
+        Exchange exchange = consumer.createExchange(true);
+        Message message = exchange.getIn();
+        message.setHeader(MongoDbConstants.DATABASE, endpoint.getDatabase());
+        message.setHeader(MongoDbConstants.COLLECTION, 
endpoint.getCollection());
+        message.setHeader(MongoDbConstants.FROM_TAILABLE, true);
+        message.setBody(dbObj);
+        return exchange;
+    }
+
 }
diff --git 
a/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbEndpoint.java
 
b/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbEndpoint.java
index 76ddc9a..a118fea 100644
--- 
a/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbEndpoint.java
+++ 
b/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbEndpoint.java
@@ -31,8 +31,6 @@ import com.mongodb.client.MongoCollection;
 import com.mongodb.client.MongoDatabase;
 import org.apache.camel.Category;
 import org.apache.camel.Consumer;
-import org.apache.camel.Exchange;
-import org.apache.camel.Message;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
 import org.apache.camel.spi.Metadata;
@@ -318,16 +316,6 @@ public class MongoDbEndpoint extends DefaultEndpoint {
         }
     }
 
-    public Exchange createMongoDbExchange(Document dbObj) {
-        Exchange exchange = super.createExchange();
-        Message message = exchange.getIn();
-        message.setHeader(MongoDbConstants.DATABASE, database);
-        message.setHeader(MongoDbConstants.COLLECTION, collection);
-        message.setHeader(MongoDbConstants.FROM_TAILABLE, true);
-        message.setBody(dbObj);
-        return exchange;
-    }
-
     @Override
     protected void doStart() throws Exception {
         if (mongoConnection == null) {
diff --git 
a/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbTailingThread.java
 
b/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbTailingThread.java
index 5f870d1..b2c7747 100644
--- 
a/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbTailingThread.java
+++ 
b/components/camel-mongodb/src/main/java/org/apache/camel/component/mongodb/MongoDbTailingThread.java
@@ -20,6 +20,7 @@ import com.mongodb.CursorType;
 import com.mongodb.MongoCursorNotFoundException;
 import com.mongodb.client.MongoCursor;
 import org.apache.camel.Exchange;
+import org.apache.camel.Message;
 import org.bson.Document;
 
 import static com.mongodb.client.model.Filters.gt;
@@ -110,7 +111,7 @@ class MongoDbTailingThread extends 
MongoAbstractConsumerThread {
         try {
             while (cursor.hasNext() && keepRunning) {
                 Document dbObj = (Document) cursor.next();
-                Exchange exchange = endpoint.createMongoDbExchange(dbObj);
+                Exchange exchange = createMongoDbExchange(dbObj);
                 try {
                     if (log.isTraceEnabled()) {
                         log.trace("Sending exchange: {}, ObjectId: {}", 
exchange, dbObj.get(MONGO_ID));
@@ -145,4 +146,15 @@ class MongoDbTailingThread extends 
MongoAbstractConsumerThread {
             tailTracking.persistToStore();
         }
     }
+
+    Exchange createMongoDbExchange(Document dbObj) {
+        Exchange exchange = consumer.createExchange(true);
+        Message message = exchange.getIn();
+        message.setHeader(MongoDbConstants.DATABASE, endpoint.getDatabase());
+        message.setHeader(MongoDbConstants.COLLECTION, 
endpoint.getCollection());
+        message.setHeader(MongoDbConstants.FROM_TAILABLE, true);
+        message.setBody(dbObj);
+        return exchange;
+    }
+
 }
diff --git 
a/components/camel-mybatis/src/main/java/org/apache/camel/component/mybatis/MyBatisConsumer.java
 
b/components/camel-mybatis/src/main/java/org/apache/camel/component/mybatis/MyBatisConsumer.java
index 45bfca1..bfc721d 100644
--- 
a/components/camel-mybatis/src/main/java/org/apache/camel/component/mybatis/MyBatisConsumer.java
+++ 
b/components/camel-mybatis/src/main/java/org/apache/camel/component/mybatis/MyBatisConsumer.java
@@ -130,9 +130,10 @@ public class MyBatisConsumer extends 
ScheduledBatchPollingConsumer {
 
             // process the current exchange
             LOG.debug("Processing exchange: {} with properties: {}", exchange, 
exchange.getProperties());
-            getProcessor().process(exchange);
-
+            Exception cause = null;
             try {
+                getProcessor().process(exchange);
+
                 if (onConsume != null) {
                     endpoint.getProcessingStrategy().commit(endpoint, 
exchange, data, onConsume);
                 }
@@ -142,13 +143,16 @@ public class MyBatisConsumer extends 
ScheduledBatchPollingConsumer {
 
             if (getEndpoint().isTransacted() && exchange.isFailed()) {
                 // break out as we are transacted and should rollback
-                Exception cause = exchange.getException();
-                if (cause != null) {
-                    throw cause;
-                } else {
-                    throw new RollbackExchangeException("Rollback transaction 
due error processing exchange", exchange);
+                cause = exchange.getException();
+                if (cause == null) {
+                    cause = new RollbackExchangeException("Rollback 
transaction due error processing exchange", null);
                 }
             }
+            releaseExchange(exchange, false);
+
+            if (cause != null) {
+                throw cause;
+            }
         }
 
         return total;
@@ -156,7 +160,8 @@ public class MyBatisConsumer extends 
ScheduledBatchPollingConsumer {
 
     private Exchange createExchange(Object data) {
         final MyBatisEndpoint endpoint = getEndpoint();
-        final Exchange exchange = 
endpoint.createExchange(ExchangePattern.InOnly);
+        final Exchange exchange = createExchange(false);
+        exchange.setPattern(ExchangePattern.InOnly);
         final String outputHeader = getEndpoint().getOutputHeader();
 
         Message msg = exchange.getIn();
diff --git 
a/components/camel-mybatis/src/test/java/org/apache/camel/component/mybatis/MyBatisConsumerIsolatedTest.java
 
b/components/camel-mybatis/src/test/java/org/apache/camel/component/mybatis/MyBatisConsumerIsolatedTest.java
deleted file mode 100644
index 27ad1db..0000000
--- 
a/components/camel-mybatis/src/test/java/org/apache/camel/component/mybatis/MyBatisConsumerIsolatedTest.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.mybatis;
-
-import java.util.ArrayDeque;
-import java.util.Queue;
-
-import org.apache.camel.CamelContext;
-import org.apache.camel.Processor;
-import org.apache.camel.support.DefaultExchange;
-import org.junit.jupiter.api.Test;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.mockito.Mockito.mock;
-
-public class MyBatisConsumerIsolatedTest {
-
-    @Test
-    public void shouldRespectBatchSize() throws Exception {
-        // Given
-        int batchSize = 5;
-        MyBatisConsumer consumer = new 
MyBatisConsumer(mock(MyBatisEndpoint.class), mock(Processor.class));
-        consumer.setMaxMessagesPerPoll(batchSize);
-
-        Queue<Object> emptyMessageQueue = new ArrayDeque<>();
-        for (int i = 0; i < 10; i++) {
-            MyBatisConsumer.DataHolder dataHolder = new 
MyBatisConsumer.DataHolder();
-            dataHolder.exchange = new 
DefaultExchange(mock(CamelContext.class));
-            emptyMessageQueue.add(dataHolder);
-        }
-
-        // When
-        int processedMessages = consumer.processBatch(emptyMessageQueue);
-
-        // Then
-        assertEquals(batchSize, processedMessages);
-    }
-
-}

Reply via email to