Repository: camel Updated Branches: refs/heads/master e7c9e405a -> 1e9878801
CAMEL-7654 Supports Message Attribuets with HeaderFilterStrategy in AWS SQS component Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/1e987880 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/1e987880 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/1e987880 Branch: refs/heads/master Commit: 1e98788019bc1b4ebdd26fcbaa248ffa7a9715df Parents: 2ca8187 Author: Willem Jiang <willem.ji...@gmail.com> Authored: Wed Aug 13 17:44:15 2014 +0800 Committer: Willem Jiang <willem.ji...@gmail.com> Committed: Wed Aug 13 17:49:50 2014 +0800 ---------------------------------------------------------------------- .../camel/component/aws/sqs/SqsEndpoint.java | 27 ++++++++++++++-- .../aws/sqs/SqsHeaderFilterStrategy.java | 30 ++++++++++++++++++ .../camel/component/aws/sqs/SqsProducer.java | 33 +++++++++++++------- .../component/aws/sqs/SqsProducerTest.java | 8 +++-- 4 files changed, 81 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/1e987880/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java index b48fbe6..8ed85c5 100644 --- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java +++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java @@ -31,7 +31,6 @@ import com.amazonaws.services.sqs.model.ListQueuesResult; import com.amazonaws.services.sqs.model.MessageAttributeValue; import com.amazonaws.services.sqs.model.QueueAttributeName; import com.amazonaws.services.sqs.model.SetQueueAttributesRequest; - import org.apache.camel.Consumer; import org.apache.camel.Exchange; import org.apache.camel.ExchangePattern; @@ -40,6 +39,8 @@ import org.apache.camel.Processor; import org.apache.camel.Producer; import org.apache.camel.impl.DefaultExchange; import org.apache.camel.impl.ScheduledPollEndpoint; +import org.apache.camel.spi.HeaderFilterStrategy; +import org.apache.camel.spi.HeaderFilterStrategyAware; import org.apache.camel.util.ObjectHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -49,7 +50,7 @@ import org.slf4j.LoggerFactory; * Defines the <a href="http://camel.apache.org/aws.html">AWS SQS Endpoint</a>. * */ -public class SqsEndpoint extends ScheduledPollEndpoint { +public class SqsEndpoint extends ScheduledPollEndpoint implements HeaderFilterStrategyAware { private static final Logger LOG = LoggerFactory.getLogger(SqsEndpoint.class); @@ -57,12 +58,21 @@ public class SqsEndpoint extends ScheduledPollEndpoint { private String queueUrl; private SqsConfiguration configuration; private int maxMessagesPerPoll; + private HeaderFilterStrategy headerFilterStrategy; public SqsEndpoint(String uri, SqsComponent component, SqsConfiguration configuration) { super(uri, component); this.configuration = configuration; } + + public HeaderFilterStrategy getHeaderFilterStrategy() { + return headerFilterStrategy; + } + public void setHeaderFilterStrategy(HeaderFilterStrategy strategy) { + this.headerFilterStrategy = strategy; + } + public Producer createProducer() throws Exception { return new SqsProducer(this); } @@ -87,6 +97,11 @@ public class SqsEndpoint extends ScheduledPollEndpoint { if (ObjectHelper.isNotEmpty(getConfiguration().getAmazonSQSEndpoint())) { client.setEndpoint(getConfiguration().getAmazonSQSEndpoint()); } + + // check the setting the headerFilterStrategy + if (headerFilterStrategy == null) { + headerFilterStrategy = new SqsHeaderFilterStrategy(); + } // If both region and Account ID is provided the queue URL can be built manually. // This allows accessing queues where you don't have permission to list queues or query queues @@ -191,10 +206,16 @@ public class SqsEndpoint extends ScheduledPollEndpoint { message.setHeader(SqsConstants.ATTRIBUTES, msg.getAttributes()); message.setHeader(SqsConstants.MESSAGE_ATTRIBUTES, msg.getMessageAttributes()); + //Need to apply the SqsHeaderFilterStrategy this time + HeaderFilterStrategy headerFilterStrategy = getHeaderFilterStrategy(); //add all sqs message attributes as camel message headers so that knowledge of //the Sqs class MessageAttributeValue will not leak to the client for (Entry<String, MessageAttributeValue> entry : msg.getMessageAttributes().entrySet()) { - message.setHeader(entry.getKey(), translateValue(entry.getValue())); + String header = entry.getKey(); + Object value = translateValue(entry.getValue()); + if (!headerFilterStrategy.applyFilterToExternalHeaders(header, value, exchange)) { + message.setHeader(header, value); + } } return exchange; } http://git-wip-us.apache.org/repos/asf/camel/blob/1e987880/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsHeaderFilterStrategy.java ---------------------------------------------------------------------- diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsHeaderFilterStrategy.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsHeaderFilterStrategy.java new file mode 100644 index 0000000..fb5f425 --- /dev/null +++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsHeaderFilterStrategy.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.aws.sqs; + +import org.apache.camel.impl.DefaultHeaderFilterStrategy; + +public class SqsHeaderFilterStrategy extends DefaultHeaderFilterStrategy { + public SqsHeaderFilterStrategy() { + initialize(); + } + + protected void initialize() { + // filter headers begin with "Camel" or "org.apache.camel" + setOutFilterPattern("(Camel|org\\.apache\\.camel)[\\.|a-z|A-z|0-9]*"); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/1e987880/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsProducer.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsProducer.java index 0a4c948..874e021 100644 --- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsProducer.java +++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsProducer.java @@ -25,11 +25,11 @@ import com.amazonaws.services.sqs.AmazonSQS; import com.amazonaws.services.sqs.model.MessageAttributeValue; import com.amazonaws.services.sqs.model.SendMessageRequest; import com.amazonaws.services.sqs.model.SendMessageResult; - import org.apache.camel.Exchange; import org.apache.camel.Message; import org.apache.camel.NoFactoryAvailableException; import org.apache.camel.impl.DefaultProducer; +import org.apache.camel.spi.HeaderFilterStrategy; import org.apache.camel.util.URISupport; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,7 +50,7 @@ public class SqsProducer extends DefaultProducer { public void process(Exchange exchange) throws Exception { String body = exchange.getIn().getBody(String.class); SendMessageRequest request = new SendMessageRequest(getQueueUrl(), body); - request.setMessageAttributes(translateAttributes(exchange.getIn().getHeaders())); + request.setMessageAttributes(translateAttributes(exchange.getIn().getHeaders(), exchange)); addDelay(request, exchange); LOG.trace("Sending request [{}] from exchange [{}]...", request, exchange); @@ -106,19 +106,28 @@ public class SqsProducer extends DefaultProducer { return "SqsProducer[" + URISupport.sanitizeUri(getEndpoint().getEndpointUri()) + "]"; } - private Map<String, MessageAttributeValue> translateAttributes(Map<String, Object> headers) { + private Map<String, MessageAttributeValue> translateAttributes(Map<String, Object> headers, Exchange exchange) { Map<String, MessageAttributeValue> result = new HashMap<String, MessageAttributeValue>(); + HeaderFilterStrategy headerFilterStrategy = getEndpoint().getHeaderFilterStrategy(); for (Entry<String, Object> entry : headers.entrySet()) { - Object value = entry.getValue(); - MessageAttributeValue mav = new MessageAttributeValue(); - if (value instanceof String) { - mav.setDataType("String"); - mav.withStringValue((String)value); - } else if (value instanceof ByteBuffer) { - mav.setDataType("Binary"); - mav.withBinaryValue((ByteBuffer)value); + // only put the message header which is not filtered into the message attribute + if (!headerFilterStrategy.applyFilterToCamelHeaders(entry.getKey(), entry.getValue(), exchange)) { + Object value = entry.getValue(); + if (value instanceof String) { + MessageAttributeValue mav = new MessageAttributeValue(); + mav.setDataType("String"); + mav.withStringValue((String)value); + result.put(entry.getKey(), mav); + } else if (value instanceof ByteBuffer) { + MessageAttributeValue mav = new MessageAttributeValue(); + mav.setDataType("Binary"); + mav.withBinaryValue((ByteBuffer)value); + result.put(entry.getKey(), mav); + } else { + // cannot translate the message header to message attribute value + LOG.warn("Cannot put the message header key={0}, value={1} into Sqs MessageAttribute", entry.getKey(), entry.getValue()); + } } - result.put(entry.getKey(), mav); } return result; } http://git-wip-us.apache.org/repos/asf/camel/blob/1e987880/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsProducerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsProducerTest.java b/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsProducerTest.java index e6742ca..bed350c 100644 --- a/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsProducerTest.java +++ b/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsProducerTest.java @@ -23,17 +23,16 @@ import java.util.Map; import com.amazonaws.services.sqs.AmazonSQSClient; import com.amazonaws.services.sqs.model.SendMessageRequest; import com.amazonaws.services.sqs.model.SendMessageResult; - import org.apache.camel.Exchange; import org.apache.camel.ExchangePattern; import org.apache.camel.Message; +import org.apache.camel.spi.HeaderFilterStrategy; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.runners.MockitoJUnitRunner; - import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; import static org.mockito.Matchers.any; @@ -54,6 +53,8 @@ public class SqsProducerTest { private static final ByteBuffer SAMPLE_MESSAGE_HEADER_VALUE_2 = ByteBuffer.wrap(new byte[10]); private static final String SAMPLE_MESSAGE_HEADER_NAME_3 = "header_name_3"; private static final String SAMPLE_MESSAGE_HEADER_VALUE_3 = "heder_value_3"; + private static final String SAMPLE_MESSAGE_HEADER_NAME_4 = "CamelHeader_1"; + private static final String SAMPLE_MESSAGE_HEADER_VALUE_4 = "testValue"; Exchange exchange = mock(Exchange.class, RETURNS_DEEP_STUBS); @@ -72,6 +73,7 @@ public class SqsProducerTest { underTest = new SqsProducer(sqsEndpoint); sendMessageResult = new SendMessageResult().withMD5OfMessageBody(MESSAGE_MD5).withMessageId(MESSAGE_ID); sqsConfiguration = new SqsConfiguration(); + HeaderFilterStrategy headerFilterStrategy = new SqsHeaderFilterStrategy(); sqsConfiguration.setDelaySeconds(Integer.valueOf(0)); when(sqsEndpoint.getClient()).thenReturn(amazonSQSClient); when(sqsEndpoint.getConfiguration()).thenReturn(sqsConfiguration); @@ -81,6 +83,7 @@ public class SqsProducerTest { when(exchange.getPattern()).thenReturn(ExchangePattern.InOnly); when(inMessage.getBody(String.class)).thenReturn(SAMPLE_MESSAGE_BODY); when(sqsEndpoint.getQueueUrl()).thenReturn(QUEUE_URL); + when(sqsEndpoint.getHeaderFilterStrategy()).thenReturn(headerFilterStrategy); } @Test @@ -173,6 +176,7 @@ public class SqsProducerTest { headers.put(SAMPLE_MESSAGE_HEADER_NAME_1, SAMPLE_MESSAGE_HEADER_VALUE_1); headers.put(SAMPLE_MESSAGE_HEADER_NAME_2, SAMPLE_MESSAGE_HEADER_VALUE_2); headers.put(SAMPLE_MESSAGE_HEADER_NAME_3, SAMPLE_MESSAGE_HEADER_VALUE_3); + headers.put(SAMPLE_MESSAGE_HEADER_NAME_4, SAMPLE_MESSAGE_HEADER_VALUE_4); when(inMessage.getHeaders()).thenReturn(headers); underTest.process(exchange);