Author: cmueller Date: Wed Sep 19 20:24:52 2012 New Revision: 1387741 URL: http://svn.apache.org/viewvc?rev=1387741&view=rev Log: CAMEL-5414: SqsEndpoint can't retrieve existing queue url with visibility timeout different than default
Modified: camel/branches/camel-2.9.x/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java camel/branches/camel-2.9.x/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsEndpointUseExistingQueueTest.java Modified: camel/branches/camel-2.9.x/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java URL: http://svn.apache.org/viewvc/camel/branches/camel-2.9.x/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java?rev=1387741&r1=1387740&r2=1387741&view=diff ============================================================================== --- camel/branches/camel-2.9.x/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java (original) +++ camel/branches/camel-2.9.x/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsEndpoint.java Wed Sep 19 20:24:52 2012 @@ -25,6 +25,7 @@ import com.amazonaws.services.sqs.model. import com.amazonaws.services.sqs.model.CreateQueueResult; import com.amazonaws.services.sqs.model.ListQueuesResult; import com.amazonaws.services.sqs.model.QueueAttributeName; +import com.amazonaws.services.sqs.model.SetQueueAttributesRequest; import org.apache.camel.Consumer; import org.apache.camel.Exchange; @@ -80,35 +81,63 @@ public class SqsEndpoint extends Schedul for (String url : listQueuesResult.getQueueUrls()) { if (url.endsWith("/" + configuration.getQueueName())) { queueUrl = url; - LOG.trace("Queue available at '{}'. Using existing queue attributes!", queueUrl); + LOG.trace("Queue available at '{}'.", queueUrl); break; } } if (queueUrl == null) { - LOG.trace("Queue '{}' doesn't exist. Will create it...", configuration.getQueueName()); + createQueue(client); + } else { + updateQueueAttributes(client); + } + } - // creates a new queue, or returns the URL of an existing one - CreateQueueRequest request = new CreateQueueRequest(configuration.getQueueName()); - if (getConfiguration().getDefaultVisibilityTimeout() != null) { - request.getAttributes().put(QueueAttributeName.VisibilityTimeout.name(), String.valueOf(getConfiguration().getDefaultVisibilityTimeout())); - } - if (getConfiguration().getMaximumMessageSize() != null) { - request.getAttributes().put(QueueAttributeName.MaximumMessageSize.name(), String.valueOf(getConfiguration().getMaximumMessageSize())); - } - if (getConfiguration().getMessageRetentionPeriod() != null) { - request.getAttributes().put(QueueAttributeName.MessageRetentionPeriod.name(), String.valueOf(getConfiguration().getMessageRetentionPeriod())); - } - if (getConfiguration().getPolicy() != null) { - request.getAttributes().put(QueueAttributeName.Policy.name(), String.valueOf(getConfiguration().getPolicy())); - } - LOG.trace("Creating queue [{}] with request [{}]...", configuration.getQueueName(), request); - - CreateQueueResult queueResult = client.createQueue(request); - queueUrl = queueResult.getQueueUrl(); - - LOG.trace("Queue created and available at: {}", queueUrl); + private void createQueue(AmazonSQSClient client) { + LOG.trace("Queue '{}' doesn't exist. Will create it...", configuration.getQueueName()); + + // creates a new queue, or returns the URL of an existing one + CreateQueueRequest request = new CreateQueueRequest(configuration.getQueueName()); + if (getConfiguration().getDefaultVisibilityTimeout() != null) { + request.getAttributes().put(QueueAttributeName.VisibilityTimeout.name(), String.valueOf(getConfiguration().getDefaultVisibilityTimeout())); + } + if (getConfiguration().getMaximumMessageSize() != null) { + request.getAttributes().put(QueueAttributeName.MaximumMessageSize.name(), String.valueOf(getConfiguration().getMaximumMessageSize())); + } + if (getConfiguration().getMessageRetentionPeriod() != null) { + request.getAttributes().put(QueueAttributeName.MessageRetentionPeriod.name(), String.valueOf(getConfiguration().getMessageRetentionPeriod())); + } + if (getConfiguration().getPolicy() != null) { + request.getAttributes().put(QueueAttributeName.Policy.name(), String.valueOf(getConfiguration().getPolicy())); + } + LOG.trace("Creating queue [{}] with request [{}]...", configuration.getQueueName(), request); + + CreateQueueResult queueResult = client.createQueue(request); + queueUrl = queueResult.getQueueUrl(); + + LOG.trace("Queue created and available at: {}", queueUrl); + } + + private void updateQueueAttributes(AmazonSQSClient client) { + LOG.trace("Updating queue '{}' with the provided queue attributes...", configuration.getQueueName()); + + SetQueueAttributesRequest request = new SetQueueAttributesRequest(); + request.setQueueUrl(queueUrl); + if (getConfiguration().getDefaultVisibilityTimeout() != null) { + request.getAttributes().put(QueueAttributeName.VisibilityTimeout.name(), String.valueOf(getConfiguration().getDefaultVisibilityTimeout())); } + if (getConfiguration().getMaximumMessageSize() != null) { + request.getAttributes().put(QueueAttributeName.MaximumMessageSize.name(), String.valueOf(getConfiguration().getMaximumMessageSize())); + } + if (getConfiguration().getMessageRetentionPeriod() != null) { + request.getAttributes().put(QueueAttributeName.MessageRetentionPeriod.name(), String.valueOf(getConfiguration().getMessageRetentionPeriod())); + } + if (getConfiguration().getPolicy() != null) { + request.getAttributes().put(QueueAttributeName.Policy.name(), String.valueOf(getConfiguration().getPolicy())); + } + client.setQueueAttributes(request); + + LOG.trace("Queue '{}' updated and available at {}'", configuration.getQueueName(), queueUrl); } @Override Modified: camel/branches/camel-2.9.x/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsEndpointUseExistingQueueTest.java URL: http://svn.apache.org/viewvc/camel/branches/camel-2.9.x/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsEndpointUseExistingQueueTest.java?rev=1387741&r1=1387740&r2=1387741&view=diff ============================================================================== --- camel/branches/camel-2.9.x/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsEndpointUseExistingQueueTest.java (original) +++ camel/branches/camel-2.9.x/components/camel-aws/src/test/java/org/apache/camel/component/aws/sqs/SqsEndpointUseExistingQueueTest.java Wed Sep 19 20:24:52 2012 @@ -28,6 +28,7 @@ import com.amazonaws.services.sqs.model. import com.amazonaws.services.sqs.model.Message; import com.amazonaws.services.sqs.model.ReceiveMessageRequest; import com.amazonaws.services.sqs.model.ReceiveMessageResult; +import com.amazonaws.services.sqs.model.SetQueueAttributesRequest; import org.apache.camel.EndpointInject; import org.apache.camel.builder.RouteBuilder; @@ -90,6 +91,11 @@ public class SqsEndpointUseExistingQueue } @Override + public void setQueueAttributes(SetQueueAttributesRequest setQueueAttributesRequest) throws AmazonServiceException, AmazonClientException { + // noop + } + + @Override public ReceiveMessageResult receiveMessage(ReceiveMessageRequest receiveMessageRequest) throws AmazonServiceException, AmazonClientException { ReceiveMessageResult result = new ReceiveMessageResult(); List<Message> resultMessages = result.getMessages();