This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch exchange-factory
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/exchange-factory by this push:
     new 542f2ba  CAMEL-16222: PooledExchangeFactory experiment
542f2ba is described below

commit 542f2ba12d4947eda7fc37216afd169bf4503d8f
Author: Claus Ibsen <claus.ib...@gmail.com>
AuthorDate: Tue Feb 23 19:05:31 2021 +0100

    CAMEL-16222: PooledExchangeFactory experiment
---
 .../aws2/ddbstream/Ddb2StreamConsumer.java         |  8 ++-
 .../aws2/ddbstream/Ddb2StreamEndpoint.java         |  9 ---
 .../component/aws2/kinesis/Kinesis2Consumer.java   | 13 +++-
 .../component/aws2/kinesis/Kinesis2Endpoint.java   | 11 ----
 .../camel/component/aws2/s3/AWS2S3Consumer.java    | 75 ++++++++++++++++++++--
 .../camel/component/aws2/s3/AWS2S3Endpoint.java    | 72 ---------------------
 .../camel/component/aws2/sqs/Sqs2Consumer.java     | 52 ++++++++++++++-
 .../camel/component/aws2/sqs/Sqs2Endpoint.java     | 44 -------------
 8 files changed, 137 insertions(+), 147 deletions(-)

diff --git 
a/components/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/Ddb2StreamConsumer.java
 
b/components/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/Ddb2StreamConsumer.java
index 276195f..32bcb64 100644
--- 
a/components/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/Ddb2StreamConsumer.java
+++ 
b/components/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/Ddb2StreamConsumer.java
@@ -97,6 +97,12 @@ public class Ddb2StreamConsumer extends 
ScheduledBatchPollingConsumer {
         return processedExchanges;
     }
 
+    protected Exchange createExchange(Record record) {
+        Exchange ex = createExchange(true);
+        ex.getIn().setBody(record, Record.class);
+        return ex;
+    }
+
     private DynamoDbStreamsClient getClient() {
         return getEndpoint().getClient();
     }
@@ -130,7 +136,7 @@ public class Ddb2StreamConsumer extends 
ScheduledBatchPollingConsumer {
         for (Record record : records) {
             BigInteger recordSeqNum = new 
BigInteger(record.dynamodb().sequenceNumber());
             if (condition == null || condition.matches(providedSeqNum, 
recordSeqNum)) {
-                exchanges.add(getEndpoint().createExchange(record));
+                exchanges.add(createExchange(record));
             }
         }
         return exchanges;
diff --git 
a/components/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/Ddb2StreamEndpoint.java
 
b/components/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/Ddb2StreamEndpoint.java
index 5c9b0a2..6f314ff 100644
--- 
a/components/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/Ddb2StreamEndpoint.java
+++ 
b/components/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/Ddb2StreamEndpoint.java
@@ -20,7 +20,6 @@ import java.net.URI;
 
 import org.apache.camel.Category;
 import org.apache.camel.Consumer;
-import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
 import org.apache.camel.spi.UriEndpoint;
@@ -34,7 +33,6 @@ import software.amazon.awssdk.http.SdkHttpConfigurationOption;
 import software.amazon.awssdk.http.apache.ApacheHttpClient;
 import software.amazon.awssdk.http.apache.ProxyConfiguration;
 import software.amazon.awssdk.regions.Region;
-import software.amazon.awssdk.services.dynamodb.model.Record;
 import software.amazon.awssdk.services.dynamodb.streams.DynamoDbStreamsClient;
 import 
software.amazon.awssdk.services.dynamodb.streams.DynamoDbStreamsClientBuilder;
 import software.amazon.awssdk.utils.AttributeMap;
@@ -69,13 +67,6 @@ public class Ddb2StreamEndpoint extends 
ScheduledPollEndpoint {
         return consumer;
     }
 
-    Exchange createExchange(Record record) {
-        Exchange ex = super.createExchange();
-        ex.getIn().setBody(record, Record.class);
-
-        return ex;
-    }
-
     @Override
     public void doStart() throws Exception {
         super.doStart();
diff --git 
a/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java
 
b/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java
index e620181..fcda12b 100644
--- 
a/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java
+++ 
b/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java
@@ -75,7 +75,7 @@ public class Kinesis2Consumer extends 
ScheduledBatchPollingConsumer {
         // May cache the last successful sequence number, and pass it to the
         // getRecords request. That way, on the next poll, we start from where
         // we left off, however, I don't know what happens to subsequent
-        // exchanges when an earlier echangee fails.
+        // exchanges when an earlier exchange fails.
 
         currentShardIterator = result.nextShardIterator();
         if (isShardClosed) {
@@ -178,11 +178,20 @@ public class Kinesis2Consumer extends 
ScheduledBatchPollingConsumer {
     private Queue<Exchange> createExchanges(List<Record> records) {
         Queue<Exchange> exchanges = new ArrayDeque<>();
         for (Record record : records) {
-            exchanges.add(getEndpoint().createExchange(record));
+            exchanges.add(createExchange(record));
         }
         return exchanges;
     }
 
+    protected Exchange createExchange(Record record) {
+        Exchange exchange = createExchange(true);
+        exchange.getIn().setBody(record);
+        exchange.getIn().setHeader(Kinesis2Constants.APPROX_ARRIVAL_TIME, 
record.approximateArrivalTimestamp());
+        exchange.getIn().setHeader(Kinesis2Constants.PARTITION_KEY, 
record.partitionKey());
+        exchange.getIn().setHeader(Kinesis2Constants.SEQUENCE_NUMBER, 
record.sequenceNumber());
+        return exchange;
+    }
+
     private boolean hasSequenceNumber() {
         return !getEndpoint().getConfiguration().getSequenceNumber().isEmpty()
                 && 
(getEndpoint().getConfiguration().getIteratorType().equals(ShardIteratorType.AFTER_SEQUENCE_NUMBER)
diff --git 
a/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Endpoint.java
 
b/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Endpoint.java
index ac0b0c4..ebe2e31 100644
--- 
a/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Endpoint.java
+++ 
b/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Endpoint.java
@@ -20,7 +20,6 @@ import java.net.URI;
 
 import org.apache.camel.Category;
 import org.apache.camel.Consumer;
-import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
 import org.apache.camel.spi.UriEndpoint;
@@ -36,7 +35,6 @@ import software.amazon.awssdk.http.apache.ProxyConfiguration;
 import software.amazon.awssdk.regions.Region;
 import software.amazon.awssdk.services.kinesis.KinesisClient;
 import software.amazon.awssdk.services.kinesis.KinesisClientBuilder;
-import software.amazon.awssdk.services.kinesis.model.Record;
 import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
 import software.amazon.awssdk.utils.AttributeMap;
 
@@ -102,15 +100,6 @@ public class Kinesis2Endpoint extends 
ScheduledPollEndpoint {
         return consumer;
     }
 
-    public Exchange createExchange(Record record) {
-        Exchange exchange = super.createExchange();
-        exchange.getIn().setBody(record);
-        exchange.getIn().setHeader(Kinesis2Constants.APPROX_ARRIVAL_TIME, 
record.approximateArrivalTimestamp());
-        exchange.getIn().setHeader(Kinesis2Constants.PARTITION_KEY, 
record.partitionKey());
-        exchange.getIn().setHeader(Kinesis2Constants.SEQUENCE_NUMBER, 
record.sequenceNumber());
-        return exchange;
-    }
-
     public KinesisClient getClient() {
         return kinesisClient;
     }
diff --git 
a/components/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/AWS2S3Consumer.java
 
b/components/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/AWS2S3Consumer.java
index 2776fd0..a4e4fef 100644
--- 
a/components/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/AWS2S3Consumer.java
+++ 
b/components/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/AWS2S3Consumer.java
@@ -16,6 +16,7 @@
  */
 package org.apache.camel.component.aws2.s3;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.LinkedList;
@@ -25,11 +26,14 @@ import java.util.Queue;
 
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
 import org.apache.camel.ExtendedExchange;
-import org.apache.camel.NoFactoryAvailableException;
+import org.apache.camel.Message;
 import org.apache.camel.Processor;
+import org.apache.camel.RuntimeCamelException;
 import org.apache.camel.spi.Synchronization;
 import org.apache.camel.support.ScheduledBatchPollingConsumer;
+import org.apache.camel.support.SynchronizationAdapter;
 import org.apache.camel.util.CastUtils;
 import org.apache.camel.util.IOHelper;
 import org.apache.camel.util.ObjectHelper;
@@ -52,6 +56,7 @@ import 
software.amazon.awssdk.services.s3.model.ListObjectsRequest;
 import software.amazon.awssdk.services.s3.model.ListObjectsResponse;
 import software.amazon.awssdk.services.s3.model.NoSuchKeyException;
 import software.amazon.awssdk.services.s3.model.S3Object;
+import software.amazon.awssdk.utils.IoUtils;
 
 /**
  * A Consumer of messages from the Amazon Web Service Simple Storage Service 
<a href="http://aws.amazon.com/s3/";>AWS
@@ -64,7 +69,7 @@ public class AWS2S3Consumer extends 
ScheduledBatchPollingConsumer {
     private String marker;
     private transient String s3ConsumerToString;
 
-    public AWS2S3Consumer(AWS2S3Endpoint endpoint, Processor processor) throws 
NoFactoryAvailableException {
+    public AWS2S3Consumer(AWS2S3Endpoint endpoint, Processor processor) {
         super(endpoint, processor);
     }
 
@@ -180,7 +185,7 @@ public class AWS2S3Consumer extends 
ScheduledBatchPollingConsumer {
 
     protected Queue<Exchange> 
createExchanges(ResponseInputStream<GetObjectResponse> s3Object, String key) {
         Queue<Exchange> answer = new LinkedList<>();
-        Exchange exchange = getEndpoint().createExchange(s3Object, key);
+        Exchange exchange = createExchange(s3Object, key);
         answer.add(exchange);
         return answer;
     }
@@ -212,7 +217,7 @@ public class AWS2S3Consumer extends 
ScheduledBatchPollingConsumer {
 
                 if (includeS3Object(s3Object)) {
                     s3Objects.add(s3Object);
-                    Exchange exchange = getEndpoint().createExchange(s3Object, 
s3ObjectSummary.key());
+                    Exchange exchange = createExchange(s3Object, 
s3ObjectSummary.key());
                     answer.add(exchange);
                 } else {
                     // If includeFolders != true and the object is not 
included, it is safe to close the object here.
@@ -238,7 +243,6 @@ public class AWS2S3Consumer extends 
ScheduledBatchPollingConsumer {
      * @return          true to include, false to exclude
      */
     protected boolean includeS3Object(ResponseInputStream<GetObjectResponse> 
s3Object) {
-
         if (getConfiguration().isIncludeFolders()) {
             return true;
         } else {
@@ -365,6 +369,67 @@ public class AWS2S3Consumer extends 
ScheduledBatchPollingConsumer {
         return (AWS2S3Endpoint) super.getEndpoint();
     }
 
+    public Exchange createExchange(ResponseInputStream<GetObjectResponse> 
s3Object, String key) {
+        return createExchange(getEndpoint().getExchangePattern(), s3Object, 
key);
+    }
+
+    public Exchange createExchange(ExchangePattern pattern, 
ResponseInputStream<GetObjectResponse> s3Object, String key) {
+        LOG.trace("Getting object with key [{}] from bucket [{}]...", key, 
getConfiguration().getBucketName());
+
+        LOG.trace("Got object [{}]", s3Object);
+
+        Exchange exchange = createExchange(true);
+        exchange.setPattern(pattern);
+        Message message = exchange.getIn();
+
+        if (getConfiguration().isIncludeBody()) {
+            try {
+                message.setBody(IoUtils.toByteArray(s3Object));
+            } catch (IOException e) {
+                throw new RuntimeCamelException(e);
+            }
+        } else {
+            message.setBody(s3Object);
+        }
+
+        message.setHeader(AWS2S3Constants.KEY, key);
+        message.setHeader(AWS2S3Constants.BUCKET_NAME, 
getConfiguration().getBucketName());
+        message.setHeader(AWS2S3Constants.E_TAG, s3Object.response().eTag());
+        message.setHeader(AWS2S3Constants.LAST_MODIFIED, 
s3Object.response().lastModified());
+        message.setHeader(AWS2S3Constants.VERSION_ID, 
s3Object.response().versionId());
+        message.setHeader(AWS2S3Constants.CONTENT_TYPE, 
s3Object.response().contentType());
+        message.setHeader(AWS2S3Constants.CONTENT_LENGTH, 
s3Object.response().contentLength());
+        message.setHeader(AWS2S3Constants.CONTENT_ENCODING, 
s3Object.response().contentEncoding());
+        message.setHeader(AWS2S3Constants.CONTENT_DISPOSITION, 
s3Object.response().contentDisposition());
+        message.setHeader(AWS2S3Constants.CACHE_CONTROL, 
s3Object.response().cacheControl());
+        message.setHeader(AWS2S3Constants.SERVER_SIDE_ENCRYPTION, 
s3Object.response().serverSideEncryption());
+        message.setHeader(AWS2S3Constants.EXPIRATION_TIME, 
s3Object.response().expiration());
+        message.setHeader(AWS2S3Constants.REPLICATION_STATUS, 
s3Object.response().replicationStatus());
+        message.setHeader(AWS2S3Constants.STORAGE_CLASS, 
s3Object.response().storageClass());
+        message.setHeader(AWS2S3Constants.METADATA, 
s3Object.response().metadata());
+
+        /*
+         * If includeBody == true, it is safe to close the object here because 
the S3Object
+         * was consumed already. If includeBody != true, the caller is 
responsible for
+         * closing the stream once the body has been fully consumed or use the 
autoCloseBody
+         * configuration to automatically schedule the body closing at the end 
of exchange.
+         */
+        if (getConfiguration().isIncludeBody()) {
+            IOHelper.close(s3Object);
+        } else {
+            if (getConfiguration().isAutocloseBody()) {
+                exchange.adapt(ExtendedExchange.class).addOnCompletion(new 
SynchronizationAdapter() {
+                    @Override
+                    public void onDone(Exchange exchange) {
+                        IOHelper.close(s3Object);
+                    }
+                });
+            }
+        }
+
+        return exchange;
+    }
+
     @Override
     public String toString() {
         if (s3ConsumerToString == null) {
diff --git 
a/components/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/AWS2S3Endpoint.java
 
b/components/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/AWS2S3Endpoint.java
index 8be1966..9cad4ee 100644
--- 
a/components/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/AWS2S3Endpoint.java
+++ 
b/components/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/AWS2S3Endpoint.java
@@ -16,37 +16,25 @@
  */
 package org.apache.camel.component.aws2.s3;
 
-import java.io.IOException;
-
 import org.apache.camel.Category;
 import org.apache.camel.Component;
 import org.apache.camel.Consumer;
-import org.apache.camel.Exchange;
-import org.apache.camel.ExchangePattern;
-import org.apache.camel.ExtendedExchange;
-import org.apache.camel.Message;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
-import org.apache.camel.RuntimeCamelException;
 import org.apache.camel.component.aws2.s3.client.AWS2S3ClientFactory;
 import org.apache.camel.spi.Metadata;
 import org.apache.camel.spi.UriEndpoint;
 import org.apache.camel.spi.UriParam;
 import org.apache.camel.spi.UriPath;
 import org.apache.camel.support.ScheduledPollEndpoint;
-import org.apache.camel.support.SynchronizationAdapter;
-import org.apache.camel.util.IOHelper;
 import org.apache.camel.util.ObjectHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import software.amazon.awssdk.awscore.exception.AwsServiceException;
-import software.amazon.awssdk.core.ResponseInputStream;
 import software.amazon.awssdk.services.s3.S3Client;
 import software.amazon.awssdk.services.s3.model.CreateBucketRequest;
-import software.amazon.awssdk.services.s3.model.GetObjectResponse;
 import software.amazon.awssdk.services.s3.model.HeadBucketRequest;
 import software.amazon.awssdk.services.s3.model.PutBucketPolicyRequest;
-import software.amazon.awssdk.utils.IoUtils;
 
 /**
  * Store and retrieve objects from AWS S3 Storage Service using AWS SDK 
version 2.x.
@@ -152,66 +140,6 @@ public class AWS2S3Endpoint extends ScheduledPollEndpoint {
         super.doStop();
     }
 
-    public Exchange createExchange(ResponseInputStream<GetObjectResponse> 
s3Object, String key) {
-        return createExchange(getExchangePattern(), s3Object, key);
-    }
-
-    public Exchange createExchange(ExchangePattern pattern, 
ResponseInputStream<GetObjectResponse> s3Object, String key) {
-        LOG.trace("Getting object with key [{}] from bucket [{}]...", key, 
getConfiguration().getBucketName());
-
-        LOG.trace("Got object [{}]", s3Object);
-
-        Exchange exchange = super.createExchange(pattern);
-        Message message = exchange.getIn();
-
-        if (configuration.isIncludeBody()) {
-            try {
-                message.setBody(IoUtils.toByteArray(s3Object));
-            } catch (IOException e) {
-                throw new RuntimeCamelException(e);
-            }
-        } else {
-            message.setBody(s3Object);
-        }
-
-        message.setHeader(AWS2S3Constants.KEY, key);
-        message.setHeader(AWS2S3Constants.BUCKET_NAME, 
getConfiguration().getBucketName());
-        message.setHeader(AWS2S3Constants.E_TAG, s3Object.response().eTag());
-        message.setHeader(AWS2S3Constants.LAST_MODIFIED, 
s3Object.response().lastModified());
-        message.setHeader(AWS2S3Constants.VERSION_ID, 
s3Object.response().versionId());
-        message.setHeader(AWS2S3Constants.CONTENT_TYPE, 
s3Object.response().contentType());
-        message.setHeader(AWS2S3Constants.CONTENT_LENGTH, 
s3Object.response().contentLength());
-        message.setHeader(AWS2S3Constants.CONTENT_ENCODING, 
s3Object.response().contentEncoding());
-        message.setHeader(AWS2S3Constants.CONTENT_DISPOSITION, 
s3Object.response().contentDisposition());
-        message.setHeader(AWS2S3Constants.CACHE_CONTROL, 
s3Object.response().cacheControl());
-        message.setHeader(AWS2S3Constants.SERVER_SIDE_ENCRYPTION, 
s3Object.response().serverSideEncryption());
-        message.setHeader(AWS2S3Constants.EXPIRATION_TIME, 
s3Object.response().expiration());
-        message.setHeader(AWS2S3Constants.REPLICATION_STATUS, 
s3Object.response().replicationStatus());
-        message.setHeader(AWS2S3Constants.STORAGE_CLASS, 
s3Object.response().storageClass());
-        message.setHeader(AWS2S3Constants.METADATA, 
s3Object.response().metadata());
-
-        /*
-         * If includeBody == true, it is safe to close the object here because 
the S3Object
-         * was consumed already. If includeBody != true, the caller is 
responsible for
-         * closing the stream once the body has been fully consumed or use the 
autoCloseBody
-         * configuration to automatically schedule the body closing at the end 
of exchange.
-         */
-        if (configuration.isIncludeBody()) {
-            IOHelper.close(s3Object);
-        } else {
-            if (configuration.isAutocloseBody()) {
-                exchange.adapt(ExtendedExchange.class).addOnCompletion(new 
SynchronizationAdapter() {
-                    @Override
-                    public void onDone(Exchange exchange) {
-                        IOHelper.close(s3Object);
-                    }
-                });
-            }
-        }
-
-        return exchange;
-    }
-
     public AWS2S3Configuration getConfiguration() {
         return configuration;
     }
diff --git 
a/components/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Consumer.java
 
b/components/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Consumer.java
index 660df45..3cc5cb7 100644
--- 
a/components/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Consumer.java
+++ 
b/components/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Consumer.java
@@ -18,17 +18,21 @@ package org.apache.camel.component.aws2.sqs;
 
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
 import java.util.Queue;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
 import org.apache.camel.ExtendedExchange;
-import org.apache.camel.NoFactoryAvailableException;
+import org.apache.camel.Message;
 import org.apache.camel.Processor;
+import org.apache.camel.spi.HeaderFilterStrategy;
 import org.apache.camel.spi.Synchronization;
 import org.apache.camel.support.ScheduledBatchPollingConsumer;
 import org.apache.camel.util.CastUtils;
@@ -40,6 +44,7 @@ import 
software.amazon.awssdk.awscore.exception.AwsServiceException;
 import software.amazon.awssdk.services.sqs.SqsClient;
 import 
software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityRequest;
 import software.amazon.awssdk.services.sqs.model.DeleteMessageRequest;
+import software.amazon.awssdk.services.sqs.model.MessageAttributeValue;
 import software.amazon.awssdk.services.sqs.model.MessageNotInflightException;
 import software.amazon.awssdk.services.sqs.model.QueueDeletedRecentlyException;
 import software.amazon.awssdk.services.sqs.model.QueueDoesNotExistException;
@@ -60,7 +65,7 @@ public class Sqs2Consumer extends 
ScheduledBatchPollingConsumer {
     private Collection<String> attributeNames;
     private Collection<String> messageAttributeNames;
 
-    public Sqs2Consumer(Sqs2Endpoint endpoint, Processor processor) throws 
NoFactoryAvailableException {
+    public Sqs2Consumer(Sqs2Endpoint endpoint, Processor processor) {
         super(endpoint, processor);
 
         if (getConfiguration().getAttributeNames() != null) {
@@ -136,7 +141,7 @@ public class Sqs2Consumer extends 
ScheduledBatchPollingConsumer {
 
         Queue<Exchange> answer = new LinkedList<>();
         for (software.amazon.awssdk.services.sqs.model.Message message : 
messages) {
-            Exchange exchange = getEndpoint().createExchange(message);
+            Exchange exchange = createExchange(message);
             answer.add(exchange);
         }
 
@@ -282,6 +287,47 @@ public class Sqs2Consumer extends 
ScheduledBatchPollingConsumer {
         return (Sqs2Endpoint) super.getEndpoint();
     }
 
+    public Exchange 
createExchange(software.amazon.awssdk.services.sqs.model.Message msg) {
+        return createExchange(getEndpoint().getExchangePattern(), msg);
+    }
+
+    private Exchange createExchange(ExchangePattern pattern, 
software.amazon.awssdk.services.sqs.model.Message msg) {
+        Exchange exchange = createExchange(true);
+        exchange.setPattern(pattern);
+        Message message = exchange.getIn();
+        message.setBody(msg.body());
+        message.setHeaders(new HashMap<>(msg.attributesAsStrings()));
+        message.setHeader(Sqs2Constants.MESSAGE_ID, msg.messageId());
+        message.setHeader(Sqs2Constants.MD5_OF_BODY, msg.md5OfBody());
+        message.setHeader(Sqs2Constants.RECEIPT_HANDLE, msg.receiptHandle());
+        message.setHeader(Sqs2Constants.ATTRIBUTES, msg.attributes());
+        message.setHeader(Sqs2Constants.MESSAGE_ATTRIBUTES, 
msg.messageAttributes());
+
+        // Need to apply the SqsHeaderFilterStrategy this time
+        HeaderFilterStrategy headerFilterStrategy = 
getEndpoint().getHeaderFilterStrategy();
+        // add all sqs message attributes as camel message headers so that
+        // knowledge of
+        // the Sqs class MessageAttributeValue will not leak to the client
+        for (Map.Entry<String, MessageAttributeValue> entry : 
msg.messageAttributes().entrySet()) {
+            String header = entry.getKey();
+            Object value = translateValue(entry.getValue());
+            if (!headerFilterStrategy.applyFilterToExternalHeaders(header, 
value, exchange)) {
+                message.setHeader(header, value);
+            }
+        }
+        return exchange;
+    }
+
+    private static Object translateValue(MessageAttributeValue mav) {
+        Object result = null;
+        if (mav.stringValue() != null) {
+            result = mav.stringValue();
+        } else if (mav.binaryValue() != null) {
+            result = mav.binaryValue();
+        }
+        return result;
+    }
+
     @Override
     public String toString() {
         if (sqsConsumerToString == null) {
diff --git 
a/components/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Endpoint.java
 
b/components/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Endpoint.java
index 2dc209c..1eb4014 100644
--- 
a/components/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Endpoint.java
+++ 
b/components/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Endpoint.java
@@ -21,13 +21,9 @@ import java.io.InputStream;
 import java.nio.charset.Charset;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.Map.Entry;
 
 import org.apache.camel.Category;
 import org.apache.camel.Consumer;
-import org.apache.camel.Exchange;
-import org.apache.camel.ExchangePattern;
-import org.apache.camel.Message;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
 import org.apache.camel.component.aws2.sqs.client.Sqs2ClientFactory;
@@ -52,7 +48,6 @@ import 
software.amazon.awssdk.services.sqs.model.CreateQueueResponse;
 import software.amazon.awssdk.services.sqs.model.GetQueueUrlRequest;
 import software.amazon.awssdk.services.sqs.model.GetQueueUrlResponse;
 import software.amazon.awssdk.services.sqs.model.ListQueuesResponse;
-import software.amazon.awssdk.services.sqs.model.MessageAttributeValue;
 import software.amazon.awssdk.services.sqs.model.QueueAttributeName;
 import software.amazon.awssdk.services.sqs.model.QueueDoesNotExistException;
 import software.amazon.awssdk.services.sqs.model.SetQueueAttributesRequest;
@@ -343,36 +338,6 @@ public class Sqs2Endpoint extends ScheduledPollEndpoint 
implements HeaderFilterS
         super.doStop();
     }
 
-    public Exchange 
createExchange(software.amazon.awssdk.services.sqs.model.Message msg) {
-        return createExchange(getExchangePattern(), msg);
-    }
-
-    private Exchange createExchange(ExchangePattern pattern, 
software.amazon.awssdk.services.sqs.model.Message msg) {
-        Exchange exchange = super.createExchange(pattern);
-        Message message = exchange.getIn();
-        message.setBody(msg.body());
-        message.setHeaders(new HashMap<>(msg.attributesAsStrings()));
-        message.setHeader(Sqs2Constants.MESSAGE_ID, msg.messageId());
-        message.setHeader(Sqs2Constants.MD5_OF_BODY, msg.md5OfBody());
-        message.setHeader(Sqs2Constants.RECEIPT_HANDLE, msg.receiptHandle());
-        message.setHeader(Sqs2Constants.ATTRIBUTES, msg.attributes());
-        message.setHeader(Sqs2Constants.MESSAGE_ATTRIBUTES, 
msg.messageAttributes());
-
-        // Need to apply the SqsHeaderFilterStrategy this time
-        HeaderFilterStrategy headerFilterStrategy = getHeaderFilterStrategy();
-        // add all sqs message attributes as camel message headers so that
-        // knowledge of
-        // the Sqs class MessageAttributeValue will not leak to the client
-        for (Entry<String, MessageAttributeValue> entry : 
msg.messageAttributes().entrySet()) {
-            String header = entry.getKey();
-            Object value = translateValue(entry.getValue());
-            if (!headerFilterStrategy.applyFilterToExternalHeaders(header, 
value, exchange)) {
-                message.setHeader(header, value);
-            }
-        }
-        return exchange;
-    }
-
     public Sqs2Configuration getConfiguration() {
         return configuration;
     }
@@ -406,13 +371,4 @@ public class Sqs2Endpoint extends ScheduledPollEndpoint 
implements HeaderFilterS
         this.maxMessagesPerPoll = maxMessagesPerPoll;
     }
 
-    private Object translateValue(MessageAttributeValue mav) {
-        Object result = null;
-        if (mav.stringValue() != null) {
-            result = mav.stringValue();
-        } else if (mav.binaryValue() != null) {
-            result = mav.binaryValue();
-        }
-        return result;
-    }
 }

Reply via email to