This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch exchange-factory in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/exchange-factory by this push: new 542f2ba CAMEL-16222: PooledExchangeFactory experiment 542f2ba is described below commit 542f2ba12d4947eda7fc37216afd169bf4503d8f Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Tue Feb 23 19:05:31 2021 +0100 CAMEL-16222: PooledExchangeFactory experiment --- .../aws2/ddbstream/Ddb2StreamConsumer.java | 8 ++- .../aws2/ddbstream/Ddb2StreamEndpoint.java | 9 --- .../component/aws2/kinesis/Kinesis2Consumer.java | 13 +++- .../component/aws2/kinesis/Kinesis2Endpoint.java | 11 ---- .../camel/component/aws2/s3/AWS2S3Consumer.java | 75 ++++++++++++++++++++-- .../camel/component/aws2/s3/AWS2S3Endpoint.java | 72 --------------------- .../camel/component/aws2/sqs/Sqs2Consumer.java | 52 ++++++++++++++- .../camel/component/aws2/sqs/Sqs2Endpoint.java | 44 ------------- 8 files changed, 137 insertions(+), 147 deletions(-) diff --git a/components/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/Ddb2StreamConsumer.java b/components/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/Ddb2StreamConsumer.java index 276195f..32bcb64 100644 --- a/components/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/Ddb2StreamConsumer.java +++ b/components/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/Ddb2StreamConsumer.java @@ -97,6 +97,12 @@ public class Ddb2StreamConsumer extends ScheduledBatchPollingConsumer { return processedExchanges; } + protected Exchange createExchange(Record record) { + Exchange ex = createExchange(true); + ex.getIn().setBody(record, Record.class); + return ex; + } + private DynamoDbStreamsClient getClient() { return getEndpoint().getClient(); } @@ -130,7 +136,7 @@ public class Ddb2StreamConsumer extends ScheduledBatchPollingConsumer { for (Record record : records) { BigInteger recordSeqNum = new BigInteger(record.dynamodb().sequenceNumber()); if (condition == null || condition.matches(providedSeqNum, recordSeqNum)) { - exchanges.add(getEndpoint().createExchange(record)); + exchanges.add(createExchange(record)); } } return exchanges; diff --git a/components/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/Ddb2StreamEndpoint.java b/components/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/Ddb2StreamEndpoint.java index 5c9b0a2..6f314ff 100644 --- a/components/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/Ddb2StreamEndpoint.java +++ b/components/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/Ddb2StreamEndpoint.java @@ -20,7 +20,6 @@ import java.net.URI; import org.apache.camel.Category; import org.apache.camel.Consumer; -import org.apache.camel.Exchange; import org.apache.camel.Processor; import org.apache.camel.Producer; import org.apache.camel.spi.UriEndpoint; @@ -34,7 +33,6 @@ import software.amazon.awssdk.http.SdkHttpConfigurationOption; import software.amazon.awssdk.http.apache.ApacheHttpClient; import software.amazon.awssdk.http.apache.ProxyConfiguration; import software.amazon.awssdk.regions.Region; -import software.amazon.awssdk.services.dynamodb.model.Record; import software.amazon.awssdk.services.dynamodb.streams.DynamoDbStreamsClient; import software.amazon.awssdk.services.dynamodb.streams.DynamoDbStreamsClientBuilder; import software.amazon.awssdk.utils.AttributeMap; @@ -69,13 +67,6 @@ public class Ddb2StreamEndpoint extends ScheduledPollEndpoint { return consumer; } - Exchange createExchange(Record record) { - Exchange ex = super.createExchange(); - ex.getIn().setBody(record, Record.class); - - return ex; - } - @Override public void doStart() throws Exception { super.doStart(); diff --git a/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java b/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java index e620181..fcda12b 100644 --- a/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java +++ b/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java @@ -75,7 +75,7 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer { // May cache the last successful sequence number, and pass it to the // getRecords request. That way, on the next poll, we start from where // we left off, however, I don't know what happens to subsequent - // exchanges when an earlier echangee fails. + // exchanges when an earlier exchange fails. currentShardIterator = result.nextShardIterator(); if (isShardClosed) { @@ -178,11 +178,20 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer { private Queue<Exchange> createExchanges(List<Record> records) { Queue<Exchange> exchanges = new ArrayDeque<>(); for (Record record : records) { - exchanges.add(getEndpoint().createExchange(record)); + exchanges.add(createExchange(record)); } return exchanges; } + protected Exchange createExchange(Record record) { + Exchange exchange = createExchange(true); + exchange.getIn().setBody(record); + exchange.getIn().setHeader(Kinesis2Constants.APPROX_ARRIVAL_TIME, record.approximateArrivalTimestamp()); + exchange.getIn().setHeader(Kinesis2Constants.PARTITION_KEY, record.partitionKey()); + exchange.getIn().setHeader(Kinesis2Constants.SEQUENCE_NUMBER, record.sequenceNumber()); + return exchange; + } + private boolean hasSequenceNumber() { return !getEndpoint().getConfiguration().getSequenceNumber().isEmpty() && (getEndpoint().getConfiguration().getIteratorType().equals(ShardIteratorType.AFTER_SEQUENCE_NUMBER) diff --git a/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Endpoint.java b/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Endpoint.java index ac0b0c4..ebe2e31 100644 --- a/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Endpoint.java +++ b/components/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Endpoint.java @@ -20,7 +20,6 @@ import java.net.URI; import org.apache.camel.Category; import org.apache.camel.Consumer; -import org.apache.camel.Exchange; import org.apache.camel.Processor; import org.apache.camel.Producer; import org.apache.camel.spi.UriEndpoint; @@ -36,7 +35,6 @@ import software.amazon.awssdk.http.apache.ProxyConfiguration; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.kinesis.KinesisClient; import software.amazon.awssdk.services.kinesis.KinesisClientBuilder; -import software.amazon.awssdk.services.kinesis.model.Record; import software.amazon.awssdk.services.kinesis.model.ShardIteratorType; import software.amazon.awssdk.utils.AttributeMap; @@ -102,15 +100,6 @@ public class Kinesis2Endpoint extends ScheduledPollEndpoint { return consumer; } - public Exchange createExchange(Record record) { - Exchange exchange = super.createExchange(); - exchange.getIn().setBody(record); - exchange.getIn().setHeader(Kinesis2Constants.APPROX_ARRIVAL_TIME, record.approximateArrivalTimestamp()); - exchange.getIn().setHeader(Kinesis2Constants.PARTITION_KEY, record.partitionKey()); - exchange.getIn().setHeader(Kinesis2Constants.SEQUENCE_NUMBER, record.sequenceNumber()); - return exchange; - } - public KinesisClient getClient() { return kinesisClient; } diff --git a/components/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/AWS2S3Consumer.java b/components/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/AWS2S3Consumer.java index 2776fd0..a4e4fef 100644 --- a/components/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/AWS2S3Consumer.java +++ b/components/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/AWS2S3Consumer.java @@ -16,6 +16,7 @@ */ package org.apache.camel.component.aws2.s3; +import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.LinkedList; @@ -25,11 +26,14 @@ import java.util.Queue; import org.apache.camel.AsyncCallback; import org.apache.camel.Exchange; +import org.apache.camel.ExchangePattern; import org.apache.camel.ExtendedExchange; -import org.apache.camel.NoFactoryAvailableException; +import org.apache.camel.Message; import org.apache.camel.Processor; +import org.apache.camel.RuntimeCamelException; import org.apache.camel.spi.Synchronization; import org.apache.camel.support.ScheduledBatchPollingConsumer; +import org.apache.camel.support.SynchronizationAdapter; import org.apache.camel.util.CastUtils; import org.apache.camel.util.IOHelper; import org.apache.camel.util.ObjectHelper; @@ -52,6 +56,7 @@ import software.amazon.awssdk.services.s3.model.ListObjectsRequest; import software.amazon.awssdk.services.s3.model.ListObjectsResponse; import software.amazon.awssdk.services.s3.model.NoSuchKeyException; import software.amazon.awssdk.services.s3.model.S3Object; +import software.amazon.awssdk.utils.IoUtils; /** * A Consumer of messages from the Amazon Web Service Simple Storage Service <a href="http://aws.amazon.com/s3/">AWS @@ -64,7 +69,7 @@ public class AWS2S3Consumer extends ScheduledBatchPollingConsumer { private String marker; private transient String s3ConsumerToString; - public AWS2S3Consumer(AWS2S3Endpoint endpoint, Processor processor) throws NoFactoryAvailableException { + public AWS2S3Consumer(AWS2S3Endpoint endpoint, Processor processor) { super(endpoint, processor); } @@ -180,7 +185,7 @@ public class AWS2S3Consumer extends ScheduledBatchPollingConsumer { protected Queue<Exchange> createExchanges(ResponseInputStream<GetObjectResponse> s3Object, String key) { Queue<Exchange> answer = new LinkedList<>(); - Exchange exchange = getEndpoint().createExchange(s3Object, key); + Exchange exchange = createExchange(s3Object, key); answer.add(exchange); return answer; } @@ -212,7 +217,7 @@ public class AWS2S3Consumer extends ScheduledBatchPollingConsumer { if (includeS3Object(s3Object)) { s3Objects.add(s3Object); - Exchange exchange = getEndpoint().createExchange(s3Object, s3ObjectSummary.key()); + Exchange exchange = createExchange(s3Object, s3ObjectSummary.key()); answer.add(exchange); } else { // If includeFolders != true and the object is not included, it is safe to close the object here. @@ -238,7 +243,6 @@ public class AWS2S3Consumer extends ScheduledBatchPollingConsumer { * @return true to include, false to exclude */ protected boolean includeS3Object(ResponseInputStream<GetObjectResponse> s3Object) { - if (getConfiguration().isIncludeFolders()) { return true; } else { @@ -365,6 +369,67 @@ public class AWS2S3Consumer extends ScheduledBatchPollingConsumer { return (AWS2S3Endpoint) super.getEndpoint(); } + public Exchange createExchange(ResponseInputStream<GetObjectResponse> s3Object, String key) { + return createExchange(getEndpoint().getExchangePattern(), s3Object, key); + } + + public Exchange createExchange(ExchangePattern pattern, ResponseInputStream<GetObjectResponse> s3Object, String key) { + LOG.trace("Getting object with key [{}] from bucket [{}]...", key, getConfiguration().getBucketName()); + + LOG.trace("Got object [{}]", s3Object); + + Exchange exchange = createExchange(true); + exchange.setPattern(pattern); + Message message = exchange.getIn(); + + if (getConfiguration().isIncludeBody()) { + try { + message.setBody(IoUtils.toByteArray(s3Object)); + } catch (IOException e) { + throw new RuntimeCamelException(e); + } + } else { + message.setBody(s3Object); + } + + message.setHeader(AWS2S3Constants.KEY, key); + message.setHeader(AWS2S3Constants.BUCKET_NAME, getConfiguration().getBucketName()); + message.setHeader(AWS2S3Constants.E_TAG, s3Object.response().eTag()); + message.setHeader(AWS2S3Constants.LAST_MODIFIED, s3Object.response().lastModified()); + message.setHeader(AWS2S3Constants.VERSION_ID, s3Object.response().versionId()); + message.setHeader(AWS2S3Constants.CONTENT_TYPE, s3Object.response().contentType()); + message.setHeader(AWS2S3Constants.CONTENT_LENGTH, s3Object.response().contentLength()); + message.setHeader(AWS2S3Constants.CONTENT_ENCODING, s3Object.response().contentEncoding()); + message.setHeader(AWS2S3Constants.CONTENT_DISPOSITION, s3Object.response().contentDisposition()); + message.setHeader(AWS2S3Constants.CACHE_CONTROL, s3Object.response().cacheControl()); + message.setHeader(AWS2S3Constants.SERVER_SIDE_ENCRYPTION, s3Object.response().serverSideEncryption()); + message.setHeader(AWS2S3Constants.EXPIRATION_TIME, s3Object.response().expiration()); + message.setHeader(AWS2S3Constants.REPLICATION_STATUS, s3Object.response().replicationStatus()); + message.setHeader(AWS2S3Constants.STORAGE_CLASS, s3Object.response().storageClass()); + message.setHeader(AWS2S3Constants.METADATA, s3Object.response().metadata()); + + /* + * If includeBody == true, it is safe to close the object here because the S3Object + * was consumed already. If includeBody != true, the caller is responsible for + * closing the stream once the body has been fully consumed or use the autoCloseBody + * configuration to automatically schedule the body closing at the end of exchange. + */ + if (getConfiguration().isIncludeBody()) { + IOHelper.close(s3Object); + } else { + if (getConfiguration().isAutocloseBody()) { + exchange.adapt(ExtendedExchange.class).addOnCompletion(new SynchronizationAdapter() { + @Override + public void onDone(Exchange exchange) { + IOHelper.close(s3Object); + } + }); + } + } + + return exchange; + } + @Override public String toString() { if (s3ConsumerToString == null) { diff --git a/components/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/AWS2S3Endpoint.java b/components/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/AWS2S3Endpoint.java index 8be1966..9cad4ee 100644 --- a/components/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/AWS2S3Endpoint.java +++ b/components/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/AWS2S3Endpoint.java @@ -16,37 +16,25 @@ */ package org.apache.camel.component.aws2.s3; -import java.io.IOException; - import org.apache.camel.Category; import org.apache.camel.Component; import org.apache.camel.Consumer; -import org.apache.camel.Exchange; -import org.apache.camel.ExchangePattern; -import org.apache.camel.ExtendedExchange; -import org.apache.camel.Message; import org.apache.camel.Processor; import org.apache.camel.Producer; -import org.apache.camel.RuntimeCamelException; import org.apache.camel.component.aws2.s3.client.AWS2S3ClientFactory; import org.apache.camel.spi.Metadata; import org.apache.camel.spi.UriEndpoint; import org.apache.camel.spi.UriParam; import org.apache.camel.spi.UriPath; import org.apache.camel.support.ScheduledPollEndpoint; -import org.apache.camel.support.SynchronizationAdapter; -import org.apache.camel.util.IOHelper; import org.apache.camel.util.ObjectHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import software.amazon.awssdk.awscore.exception.AwsServiceException; -import software.amazon.awssdk.core.ResponseInputStream; import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.s3.model.CreateBucketRequest; -import software.amazon.awssdk.services.s3.model.GetObjectResponse; import software.amazon.awssdk.services.s3.model.HeadBucketRequest; import software.amazon.awssdk.services.s3.model.PutBucketPolicyRequest; -import software.amazon.awssdk.utils.IoUtils; /** * Store and retrieve objects from AWS S3 Storage Service using AWS SDK version 2.x. @@ -152,66 +140,6 @@ public class AWS2S3Endpoint extends ScheduledPollEndpoint { super.doStop(); } - public Exchange createExchange(ResponseInputStream<GetObjectResponse> s3Object, String key) { - return createExchange(getExchangePattern(), s3Object, key); - } - - public Exchange createExchange(ExchangePattern pattern, ResponseInputStream<GetObjectResponse> s3Object, String key) { - LOG.trace("Getting object with key [{}] from bucket [{}]...", key, getConfiguration().getBucketName()); - - LOG.trace("Got object [{}]", s3Object); - - Exchange exchange = super.createExchange(pattern); - Message message = exchange.getIn(); - - if (configuration.isIncludeBody()) { - try { - message.setBody(IoUtils.toByteArray(s3Object)); - } catch (IOException e) { - throw new RuntimeCamelException(e); - } - } else { - message.setBody(s3Object); - } - - message.setHeader(AWS2S3Constants.KEY, key); - message.setHeader(AWS2S3Constants.BUCKET_NAME, getConfiguration().getBucketName()); - message.setHeader(AWS2S3Constants.E_TAG, s3Object.response().eTag()); - message.setHeader(AWS2S3Constants.LAST_MODIFIED, s3Object.response().lastModified()); - message.setHeader(AWS2S3Constants.VERSION_ID, s3Object.response().versionId()); - message.setHeader(AWS2S3Constants.CONTENT_TYPE, s3Object.response().contentType()); - message.setHeader(AWS2S3Constants.CONTENT_LENGTH, s3Object.response().contentLength()); - message.setHeader(AWS2S3Constants.CONTENT_ENCODING, s3Object.response().contentEncoding()); - message.setHeader(AWS2S3Constants.CONTENT_DISPOSITION, s3Object.response().contentDisposition()); - message.setHeader(AWS2S3Constants.CACHE_CONTROL, s3Object.response().cacheControl()); - message.setHeader(AWS2S3Constants.SERVER_SIDE_ENCRYPTION, s3Object.response().serverSideEncryption()); - message.setHeader(AWS2S3Constants.EXPIRATION_TIME, s3Object.response().expiration()); - message.setHeader(AWS2S3Constants.REPLICATION_STATUS, s3Object.response().replicationStatus()); - message.setHeader(AWS2S3Constants.STORAGE_CLASS, s3Object.response().storageClass()); - message.setHeader(AWS2S3Constants.METADATA, s3Object.response().metadata()); - - /* - * If includeBody == true, it is safe to close the object here because the S3Object - * was consumed already. If includeBody != true, the caller is responsible for - * closing the stream once the body has been fully consumed or use the autoCloseBody - * configuration to automatically schedule the body closing at the end of exchange. - */ - if (configuration.isIncludeBody()) { - IOHelper.close(s3Object); - } else { - if (configuration.isAutocloseBody()) { - exchange.adapt(ExtendedExchange.class).addOnCompletion(new SynchronizationAdapter() { - @Override - public void onDone(Exchange exchange) { - IOHelper.close(s3Object); - } - }); - } - } - - return exchange; - } - public AWS2S3Configuration getConfiguration() { return configuration; } diff --git a/components/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Consumer.java b/components/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Consumer.java index 660df45..3cc5cb7 100644 --- a/components/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Consumer.java +++ b/components/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Consumer.java @@ -18,17 +18,21 @@ package org.apache.camel.component.aws2.sqs; import java.util.Arrays; import java.util.Collection; +import java.util.HashMap; import java.util.LinkedList; import java.util.List; +import java.util.Map; import java.util.Queue; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import org.apache.camel.Exchange; +import org.apache.camel.ExchangePattern; import org.apache.camel.ExtendedExchange; -import org.apache.camel.NoFactoryAvailableException; +import org.apache.camel.Message; import org.apache.camel.Processor; +import org.apache.camel.spi.HeaderFilterStrategy; import org.apache.camel.spi.Synchronization; import org.apache.camel.support.ScheduledBatchPollingConsumer; import org.apache.camel.util.CastUtils; @@ -40,6 +44,7 @@ import software.amazon.awssdk.awscore.exception.AwsServiceException; import software.amazon.awssdk.services.sqs.SqsClient; import software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityRequest; import software.amazon.awssdk.services.sqs.model.DeleteMessageRequest; +import software.amazon.awssdk.services.sqs.model.MessageAttributeValue; import software.amazon.awssdk.services.sqs.model.MessageNotInflightException; import software.amazon.awssdk.services.sqs.model.QueueDeletedRecentlyException; import software.amazon.awssdk.services.sqs.model.QueueDoesNotExistException; @@ -60,7 +65,7 @@ public class Sqs2Consumer extends ScheduledBatchPollingConsumer { private Collection<String> attributeNames; private Collection<String> messageAttributeNames; - public Sqs2Consumer(Sqs2Endpoint endpoint, Processor processor) throws NoFactoryAvailableException { + public Sqs2Consumer(Sqs2Endpoint endpoint, Processor processor) { super(endpoint, processor); if (getConfiguration().getAttributeNames() != null) { @@ -136,7 +141,7 @@ public class Sqs2Consumer extends ScheduledBatchPollingConsumer { Queue<Exchange> answer = new LinkedList<>(); for (software.amazon.awssdk.services.sqs.model.Message message : messages) { - Exchange exchange = getEndpoint().createExchange(message); + Exchange exchange = createExchange(message); answer.add(exchange); } @@ -282,6 +287,47 @@ public class Sqs2Consumer extends ScheduledBatchPollingConsumer { return (Sqs2Endpoint) super.getEndpoint(); } + public Exchange createExchange(software.amazon.awssdk.services.sqs.model.Message msg) { + return createExchange(getEndpoint().getExchangePattern(), msg); + } + + private Exchange createExchange(ExchangePattern pattern, software.amazon.awssdk.services.sqs.model.Message msg) { + Exchange exchange = createExchange(true); + exchange.setPattern(pattern); + Message message = exchange.getIn(); + message.setBody(msg.body()); + message.setHeaders(new HashMap<>(msg.attributesAsStrings())); + message.setHeader(Sqs2Constants.MESSAGE_ID, msg.messageId()); + message.setHeader(Sqs2Constants.MD5_OF_BODY, msg.md5OfBody()); + message.setHeader(Sqs2Constants.RECEIPT_HANDLE, msg.receiptHandle()); + message.setHeader(Sqs2Constants.ATTRIBUTES, msg.attributes()); + message.setHeader(Sqs2Constants.MESSAGE_ATTRIBUTES, msg.messageAttributes()); + + // Need to apply the SqsHeaderFilterStrategy this time + HeaderFilterStrategy headerFilterStrategy = getEndpoint().getHeaderFilterStrategy(); + // add all sqs message attributes as camel message headers so that + // knowledge of + // the Sqs class MessageAttributeValue will not leak to the client + for (Map.Entry<String, MessageAttributeValue> entry : msg.messageAttributes().entrySet()) { + String header = entry.getKey(); + Object value = translateValue(entry.getValue()); + if (!headerFilterStrategy.applyFilterToExternalHeaders(header, value, exchange)) { + message.setHeader(header, value); + } + } + return exchange; + } + + private static Object translateValue(MessageAttributeValue mav) { + Object result = null; + if (mav.stringValue() != null) { + result = mav.stringValue(); + } else if (mav.binaryValue() != null) { + result = mav.binaryValue(); + } + return result; + } + @Override public String toString() { if (sqsConsumerToString == null) { diff --git a/components/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Endpoint.java b/components/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Endpoint.java index 2dc209c..1eb4014 100644 --- a/components/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Endpoint.java +++ b/components/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Endpoint.java @@ -21,13 +21,9 @@ import java.io.InputStream; import java.nio.charset.Charset; import java.util.HashMap; import java.util.Map; -import java.util.Map.Entry; import org.apache.camel.Category; import org.apache.camel.Consumer; -import org.apache.camel.Exchange; -import org.apache.camel.ExchangePattern; -import org.apache.camel.Message; import org.apache.camel.Processor; import org.apache.camel.Producer; import org.apache.camel.component.aws2.sqs.client.Sqs2ClientFactory; @@ -52,7 +48,6 @@ import software.amazon.awssdk.services.sqs.model.CreateQueueResponse; import software.amazon.awssdk.services.sqs.model.GetQueueUrlRequest; import software.amazon.awssdk.services.sqs.model.GetQueueUrlResponse; import software.amazon.awssdk.services.sqs.model.ListQueuesResponse; -import software.amazon.awssdk.services.sqs.model.MessageAttributeValue; import software.amazon.awssdk.services.sqs.model.QueueAttributeName; import software.amazon.awssdk.services.sqs.model.QueueDoesNotExistException; import software.amazon.awssdk.services.sqs.model.SetQueueAttributesRequest; @@ -343,36 +338,6 @@ public class Sqs2Endpoint extends ScheduledPollEndpoint implements HeaderFilterS super.doStop(); } - public Exchange createExchange(software.amazon.awssdk.services.sqs.model.Message msg) { - return createExchange(getExchangePattern(), msg); - } - - private Exchange createExchange(ExchangePattern pattern, software.amazon.awssdk.services.sqs.model.Message msg) { - Exchange exchange = super.createExchange(pattern); - Message message = exchange.getIn(); - message.setBody(msg.body()); - message.setHeaders(new HashMap<>(msg.attributesAsStrings())); - message.setHeader(Sqs2Constants.MESSAGE_ID, msg.messageId()); - message.setHeader(Sqs2Constants.MD5_OF_BODY, msg.md5OfBody()); - message.setHeader(Sqs2Constants.RECEIPT_HANDLE, msg.receiptHandle()); - message.setHeader(Sqs2Constants.ATTRIBUTES, msg.attributes()); - message.setHeader(Sqs2Constants.MESSAGE_ATTRIBUTES, msg.messageAttributes()); - - // Need to apply the SqsHeaderFilterStrategy this time - HeaderFilterStrategy headerFilterStrategy = getHeaderFilterStrategy(); - // add all sqs message attributes as camel message headers so that - // knowledge of - // the Sqs class MessageAttributeValue will not leak to the client - for (Entry<String, MessageAttributeValue> entry : msg.messageAttributes().entrySet()) { - String header = entry.getKey(); - Object value = translateValue(entry.getValue()); - if (!headerFilterStrategy.applyFilterToExternalHeaders(header, value, exchange)) { - message.setHeader(header, value); - } - } - return exchange; - } - public Sqs2Configuration getConfiguration() { return configuration; } @@ -406,13 +371,4 @@ public class Sqs2Endpoint extends ScheduledPollEndpoint implements HeaderFilterS this.maxMessagesPerPoll = maxMessagesPerPoll; } - private Object translateValue(MessageAttributeValue mav) { - Object result = null; - if (mav.stringValue() != null) { - result = mav.stringValue(); - } else if (mav.binaryValue() != null) { - result = mav.binaryValue(); - } - return result; - } }