Repository: camel Updated Branches: refs/heads/master 46f83c924 -> 9a5357e66
[CAMEL-10786] Prototyping QueueService producer/consumer, with no test tries so far due to the account unavailability Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/9a5357e6 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/9a5357e6 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/9a5357e6 Branch: refs/heads/master Commit: 9a5357e669a0fd466fd9c6c2463ccdc651daee03 Parents: 46f83c9 Author: Sergey Beryozkin <sberyoz...@gmail.com> Authored: Wed Feb 22 16:41:06 2017 +0000 Committer: Sergey Beryozkin <sberyoz...@gmail.com> Committed: Wed Feb 22 16:41:06 2017 +0000 ---------------------------------------------------------------------- .../azure/blob/BlobServiceConstants.java | 1 + .../azure/blob/BlobServiceProducer.java | 12 +- .../azure/blob/BlobServiceRequestOptions.java | 14 +- .../component/azure/blob/BlobServiceUtil.java | 3 +- .../common/AbstractServiceRequestOptions.java | 31 ++++ .../azure/queue/QueueServiceConfiguration.java | 46 +++++- .../azure/queue/QueueServiceConstants.java | 11 +- .../azure/queue/QueueServiceConsumer.java | 12 +- .../azure/queue/QueueServiceOperations.java | 11 +- .../azure/queue/QueueServiceProducer.java | 145 +++++++++++++++++-- .../azure/queue/QueueServiceRequestOptions.java | 33 +++++ .../component/azure/queue/QueueServiceUtil.java | 30 ++++ 12 files changed, 312 insertions(+), 37 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/9a5357e6/components/camel-azure/src/main/java/org/apache/camel/component/azure/blob/BlobServiceConstants.java ---------------------------------------------------------------------- diff --git a/components/camel-azure/src/main/java/org/apache/camel/component/azure/blob/BlobServiceConstants.java b/components/camel-azure/src/main/java/org/apache/camel/component/azure/blob/BlobServiceConstants.java index f8f644a..2fd1a99 100644 --- a/components/camel-azure/src/main/java/org/apache/camel/component/azure/blob/BlobServiceConstants.java +++ b/components/camel-azure/src/main/java/org/apache/camel/component/azure/blob/BlobServiceConstants.java @@ -22,6 +22,7 @@ public interface BlobServiceConstants { String BLOB_CLIENT = "AzureBlobClient"; String SERVICE_URI_SEGMENT = ".blob.core.windows.net"; + String BLOB_SERVICE_REQUEST_OPTIONS = "BlobServiceRequestOptions"; String ACCESS_CONDITION = "BlobAccessCondition"; String BLOB_REQUEST_OPTIONS = "BlobRequestOptions"; String OPERATION_CONTEXT = "BlobOperationContext"; http://git-wip-us.apache.org/repos/asf/camel/blob/9a5357e6/components/camel-azure/src/main/java/org/apache/camel/component/azure/blob/BlobServiceProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-azure/src/main/java/org/apache/camel/component/azure/blob/BlobServiceProducer.java b/components/camel-azure/src/main/java/org/apache/camel/component/azure/blob/BlobServiceProducer.java index 0f01588..360b4aa 100644 --- a/components/camel-azure/src/main/java/org/apache/camel/component/azure/blob/BlobServiceProducer.java +++ b/components/camel-azure/src/main/java/org/apache/camel/component/azure/blob/BlobServiceProducer.java @@ -120,9 +120,15 @@ public class BlobServiceProducer extends DefaultProducer { LOG.trace("Getting the blob list from the container [{}] from exchange [{}]...", getConfiguration().getContainerName(), exchange); BlobServiceConfiguration cfg = getConfiguration(); - @SuppressWarnings("unchecked") - EnumSet<BlobListingDetails> details = - (EnumSet<BlobListingDetails>)exchange.getIn().getHeader(BlobServiceConstants.BLOB_LISTING_DETAILS); + EnumSet<BlobListingDetails> details = null; + Object detailsObject = exchange.getIn().getHeader(BlobServiceConstants.BLOB_LISTING_DETAILS); + if (detailsObject instanceof EnumSet) { + @SuppressWarnings("unchecked") + EnumSet<BlobListingDetails> theDetails = (EnumSet<BlobListingDetails>)detailsObject; + details = theDetails; + } else if (detailsObject instanceof BlobListingDetails) { + details = EnumSet.of((BlobListingDetails)detailsObject); + } Iterable<ListBlobItem> items = client.listBlobs(cfg.getBlobPrefix(), cfg.isUseFlatListing(), details, opts.getRequestOpts(), opts.getOpContext()); http://git-wip-us.apache.org/repos/asf/camel/blob/9a5357e6/components/camel-azure/src/main/java/org/apache/camel/component/azure/blob/BlobServiceRequestOptions.java ---------------------------------------------------------------------- diff --git a/components/camel-azure/src/main/java/org/apache/camel/component/azure/blob/BlobServiceRequestOptions.java b/components/camel-azure/src/main/java/org/apache/camel/component/azure/blob/BlobServiceRequestOptions.java index 3d1342e..bbb9385 100644 --- a/components/camel-azure/src/main/java/org/apache/camel/component/azure/blob/BlobServiceRequestOptions.java +++ b/components/camel-azure/src/main/java/org/apache/camel/component/azure/blob/BlobServiceRequestOptions.java @@ -17,14 +17,13 @@ package org.apache.camel.component.azure.blob; import com.microsoft.azure.storage.AccessCondition; -import com.microsoft.azure.storage.OperationContext; import com.microsoft.azure.storage.blob.BlobRequestOptions; +import org.apache.camel.component.azure.common.AbstractServiceRequestOptions; -public class BlobServiceRequestOptions { +public class BlobServiceRequestOptions extends AbstractServiceRequestOptions { private AccessCondition accessCond; private BlobRequestOptions requestOpts; - private OperationContext opContext; - + public AccessCondition getAccessCond() { return accessCond; } @@ -41,11 +40,4 @@ public class BlobServiceRequestOptions { this.requestOpts = requestOpts; } - public OperationContext getOpContext() { - return opContext; - } - - public void setOpContext(OperationContext opContext) { - this.opContext = opContext; - } } http://git-wip-us.apache.org/repos/asf/camel/blob/9a5357e6/components/camel-azure/src/main/java/org/apache/camel/component/azure/blob/BlobServiceUtil.java ---------------------------------------------------------------------- diff --git a/components/camel-azure/src/main/java/org/apache/camel/component/azure/blob/BlobServiceUtil.java b/components/camel-azure/src/main/java/org/apache/camel/component/azure/blob/BlobServiceUtil.java index dcf6dc3..0f89819 100644 --- a/components/camel-azure/src/main/java/org/apache/camel/component/azure/blob/BlobServiceUtil.java +++ b/components/camel-azure/src/main/java/org/apache/camel/component/azure/blob/BlobServiceUtil.java @@ -216,7 +216,8 @@ public final class BlobServiceUtil { public static BlobServiceRequestOptions getRequestOptions(Exchange exchange) { - BlobServiceRequestOptions opts = exchange.getIn().getBody(BlobServiceRequestOptions.class); + BlobServiceRequestOptions opts = exchange.getIn().getHeader( + BlobServiceConstants.BLOB_SERVICE_REQUEST_OPTIONS, BlobServiceRequestOptions.class); if (opts != null) { return opts; } else { http://git-wip-us.apache.org/repos/asf/camel/blob/9a5357e6/components/camel-azure/src/main/java/org/apache/camel/component/azure/common/AbstractServiceRequestOptions.java ---------------------------------------------------------------------- diff --git a/components/camel-azure/src/main/java/org/apache/camel/component/azure/common/AbstractServiceRequestOptions.java b/components/camel-azure/src/main/java/org/apache/camel/component/azure/common/AbstractServiceRequestOptions.java new file mode 100644 index 0000000..db9b635 --- /dev/null +++ b/components/camel-azure/src/main/java/org/apache/camel/component/azure/common/AbstractServiceRequestOptions.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.azure.common; + +import com.microsoft.azure.storage.OperationContext; + +public abstract class AbstractServiceRequestOptions { + private OperationContext opContext; + + public OperationContext getOpContext() { + return opContext; + } + + public void setOpContext(OperationContext opContext) { + this.opContext = opContext; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/9a5357e6/components/camel-azure/src/main/java/org/apache/camel/component/azure/queue/QueueServiceConfiguration.java ---------------------------------------------------------------------- diff --git a/components/camel-azure/src/main/java/org/apache/camel/component/azure/queue/QueueServiceConfiguration.java b/components/camel-azure/src/main/java/org/apache/camel/component/azure/queue/QueueServiceConfiguration.java index d211d7e..e85cd2c 100644 --- a/components/camel-azure/src/main/java/org/apache/camel/component/azure/queue/QueueServiceConfiguration.java +++ b/components/camel-azure/src/main/java/org/apache/camel/component/azure/queue/QueueServiceConfiguration.java @@ -28,8 +28,17 @@ public class QueueServiceConfiguration extends AbstractConfiguration { @UriParam private CloudQueue azureQueueClient; - @UriParam(label = "producer", defaultValue = "getMessage") - private QueueServiceOperations operation = QueueServiceOperations.getMessage; + @UriParam(label = "producer", defaultValue = "listQueues") + private QueueServiceOperations operation = QueueServiceOperations.listQueues; + + @UriParam(label = "producer") + private int messageTimeToLive; + + @UriParam(label = "producer") + private int messageVisibilityDelay; + + @UriParam(label = "producer") + private String queuePrefix; public String getQueueName() { return queueName; @@ -63,4 +72,37 @@ public class QueueServiceConfiguration extends AbstractConfiguration { public void setOperation(QueueServiceOperations operation) { this.operation = operation; } + + public int getMessageTimeToLive() { + return messageTimeToLive; + } + + /** + * Message Time To Live in seconds + */ + public void setMessageTimeToLive(int messageTimeToLive) { + this.messageTimeToLive = messageTimeToLive; + } + + public int getMessageVisibilityDelay() { + return messageVisibilityDelay; + } + + /** + * Message Visibility Delay in seconds + */ + public void setMessageVisibilityDelay(int messageVisibilityDelay) { + this.messageVisibilityDelay = messageVisibilityDelay; + } + + public String getQueuePrefix() { + return queuePrefix; + } + + /** + * Set a prefix which can be used for listing the queues + */ + public void setQueuePrefix(String queuePrefix) { + this.queuePrefix = queuePrefix; + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/9a5357e6/components/camel-azure/src/main/java/org/apache/camel/component/azure/queue/QueueServiceConstants.java ---------------------------------------------------------------------- diff --git a/components/camel-azure/src/main/java/org/apache/camel/component/azure/queue/QueueServiceConstants.java b/components/camel-azure/src/main/java/org/apache/camel/component/azure/queue/QueueServiceConstants.java index 0e7a523..ba2bd0f 100644 --- a/components/camel-azure/src/main/java/org/apache/camel/component/azure/queue/QueueServiceConstants.java +++ b/components/camel-azure/src/main/java/org/apache/camel/component/azure/queue/QueueServiceConstants.java @@ -22,7 +22,12 @@ public interface QueueServiceConstants { String QUEUE_CLIENT = "AzureQueueClient"; String SERVICE_URI_SEGMENT = ".queue.core.windows.net"; - String ACCESS_CONDITION = "BlobAccessCondition"; - String BLOB_REQUEST_OPTIONS = "BlobRequestOptions"; - String OPERATION_CONTEXT = "BlobOperationContext"; + String QUEUE_SERVICE_REQUEST_OPTIONS = "QueueServiceRequestOptions"; + String QUEUE_REQUEST_OPTIONS = "QueueRequestOptions"; + String OPERATION_CONTEXT = "QueueOperationContext"; + String MESSAGE_UPDATE_FIELDS = "QueueMessageUpdateFields"; + String QUEUE_LISTING_DETAILS = "QueueListingDetails"; + + String QUEUE_CREATED = "QueueCreated"; + } http://git-wip-us.apache.org/repos/asf/camel/blob/9a5357e6/components/camel-azure/src/main/java/org/apache/camel/component/azure/queue/QueueServiceConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-azure/src/main/java/org/apache/camel/component/azure/queue/QueueServiceConsumer.java b/components/camel-azure/src/main/java/org/apache/camel/component/azure/queue/QueueServiceConsumer.java index 9793a1c..a70ad46 100644 --- a/components/camel-azure/src/main/java/org/apache/camel/component/azure/queue/QueueServiceConsumer.java +++ b/components/camel-azure/src/main/java/org/apache/camel/component/azure/queue/QueueServiceConsumer.java @@ -38,7 +38,8 @@ public class QueueServiceConsumer extends ScheduledPollConsumer { protected int poll() throws Exception { Exchange exchange = super.getEndpoint().createExchange(); try { - getMessage(exchange); + LOG.trace("Retrieving a message"); + retrieveMessage(exchange); super.getAsyncProcessor().process(exchange); return 1; } catch (StorageException ex) { @@ -50,10 +51,11 @@ public class QueueServiceConsumer extends ScheduledPollConsumer { } } - private void getMessage(Exchange exchange) throws Exception { - LOG.trace("Getting the message from the queue [{}] from exchange [{}]...", - getConfiguration().getQueueName(), exchange); - throw new UnsupportedOperationException(); + private void retrieveMessage(Exchange exchange) throws Exception { + //TODO: Support the batch processing if needed, given that it is possible + // to retrieve more than 1 message in one go, similarly to camel-aws/s3 consumer. + QueueServiceUtil.retrieveMessage(exchange, getConfiguration()); + } protected QueueServiceConfiguration getConfiguration() { http://git-wip-us.apache.org/repos/asf/camel/blob/9a5357e6/components/camel-azure/src/main/java/org/apache/camel/component/azure/queue/QueueServiceOperations.java ---------------------------------------------------------------------- diff --git a/components/camel-azure/src/main/java/org/apache/camel/component/azure/queue/QueueServiceOperations.java b/components/camel-azure/src/main/java/org/apache/camel/component/azure/queue/QueueServiceOperations.java index a1e0898..536b47e 100644 --- a/components/camel-azure/src/main/java/org/apache/camel/component/azure/queue/QueueServiceOperations.java +++ b/components/camel-azure/src/main/java/org/apache/camel/component/azure/queue/QueueServiceOperations.java @@ -17,6 +17,13 @@ package org.apache.camel.component.azure.queue; public enum QueueServiceOperations { - getMessage, - putMessage + listQueues, + createQueue, + deleteQueue, + addMessage, + retrieveMessage, + peekMessage, + updateMessage, + deleteMessage + } http://git-wip-us.apache.org/repos/asf/camel/blob/9a5357e6/components/camel-azure/src/main/java/org/apache/camel/component/azure/queue/QueueServiceProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-azure/src/main/java/org/apache/camel/component/azure/queue/QueueServiceProducer.java b/components/camel-azure/src/main/java/org/apache/camel/component/azure/queue/QueueServiceProducer.java index 434ed2d..ff15369 100644 --- a/components/camel-azure/src/main/java/org/apache/camel/component/azure/queue/QueueServiceProducer.java +++ b/components/camel-azure/src/main/java/org/apache/camel/component/azure/queue/QueueServiceProducer.java @@ -16,9 +16,16 @@ */ package org.apache.camel.component.azure.queue; +import java.util.EnumSet; + +import com.microsoft.azure.storage.queue.CloudQueue; +import com.microsoft.azure.storage.queue.CloudQueueMessage; +import com.microsoft.azure.storage.queue.MessageUpdateFields; +import com.microsoft.azure.storage.queue.QueueListingDetails; import org.apache.camel.Endpoint; import org.apache.camel.Exchange; import org.apache.camel.component.azure.blob.BlobServiceConstants; +import org.apache.camel.component.azure.common.ExchangeUtil; import org.apache.camel.impl.DefaultProducer; import org.apache.camel.util.ObjectHelper; import org.apache.camel.util.URISupport; @@ -40,14 +47,32 @@ public class QueueServiceProducer extends DefaultProducer { public void process(final Exchange exchange) throws Exception { QueueServiceOperations operation = determineOperation(exchange); if (ObjectHelper.isEmpty(operation)) { - operation = QueueServiceOperations.getMessage; + operation = QueueServiceOperations.listQueues; } else { switch (operation) { - case getMessage: - getMessage(exchange); + case retrieveMessage: + retrieveMessage(exchange); + break; + case peekMessage: + peekMessage(exchange); + break; + case createQueue: + createQueue(exchange); + break; + case deleteQueue: + deleteQueue(exchange); + break; + case addMessage: + addMessage(exchange); + break; + case updateMessage: + updateMessage(exchange); break; - case putMessage: - putMessage(exchange); + case deleteMessage: + deleteMessage(exchange); + break; + case listQueues: + listQueues(exchange); break; default: throw new IllegalArgumentException("Unsupported operation"); @@ -56,15 +81,115 @@ public class QueueServiceProducer extends DefaultProducer { } - private void getMessage(Exchange exchange) { - LOG.trace("Getting the message from the queue [{}] from exchange [{}]...", + private void listQueues(Exchange exchange) throws Exception { + CloudQueue client = QueueServiceUtil.createQueueClient(getConfiguration()); + QueueServiceRequestOptions opts = QueueServiceUtil.getRequestOptions(exchange); + QueueListingDetails details = (QueueListingDetails)exchange.getIn().getHeader(QueueServiceConstants.QUEUE_LISTING_DETAILS); + if (details == null) { + details = QueueListingDetails.ALL; + } + Iterable<CloudQueue> list = client.getServiceClient().listQueues( + getConfiguration().getQueuePrefix(), details, + opts.getRequestOpts(), opts.getOpContext()); + ExchangeUtil.getMessageForResponse(exchange).setBody(list); + } + + private void createQueue(Exchange exchange) throws Exception { + CloudQueue client = QueueServiceUtil.createQueueClient(getConfiguration()); + QueueServiceRequestOptions opts = QueueServiceUtil.getRequestOptions(exchange); + doCreateQueue(client, opts, exchange); + } + + private void doCreateQueue(CloudQueue client, QueueServiceRequestOptions opts, Exchange exchange) throws Exception { + LOG.trace("Creating the queue [{}] from exchange [{}]...", getConfiguration().getQueueName(), exchange); - throw new UnsupportedOperationException(); + client.createIfNotExists(opts.getRequestOpts(), opts.getOpContext()); + ExchangeUtil.getMessageForResponse(exchange) + .setHeader(QueueServiceConstants.QUEUE_CREATED, Boolean.TRUE); } - private void putMessage(Exchange exchange) { + + private void deleteQueue(Exchange exchange) throws Exception { + LOG.trace("Deleting the queue [{}] from exchange [{}]...", + getConfiguration().getQueueName(), exchange); + CloudQueue client = QueueServiceUtil.createQueueClient(getConfiguration()); + QueueServiceRequestOptions opts = QueueServiceUtil.getRequestOptions(exchange); + client.delete(opts.getRequestOpts(), opts.getOpContext()); + } + + private void addMessage(Exchange exchange) throws Exception { LOG.trace("Putting the message into the queue [{}] from exchange [{}]...", getConfiguration().getQueueName(), exchange); - throw new UnsupportedOperationException(); + CloudQueue client = QueueServiceUtil.createQueueClient(getConfiguration()); + QueueServiceRequestOptions opts = QueueServiceUtil.getRequestOptions(exchange); + + Boolean queueCreated = exchange.getIn().getHeader(QueueServiceConstants.QUEUE_CREATED, + Boolean.class); + if (Boolean.TRUE != queueCreated) { + doCreateQueue(client, opts, exchange); + } + + CloudQueueMessage message = getCloudQueueMessage(exchange); + client.addMessage(message, + getConfiguration().getMessageTimeToLive(), + getConfiguration().getMessageVisibilityDelay(), + opts.getRequestOpts(), opts.getOpContext()); + } + private void updateMessage(Exchange exchange) throws Exception { + CloudQueue client = QueueServiceUtil.createQueueClient(getConfiguration()); + QueueServiceRequestOptions opts = QueueServiceUtil.getRequestOptions(exchange); + + CloudQueueMessage message = getCloudQueueMessage(exchange); + LOG.trace("Updating the message in the queue [{}] from exchange [{}]...", + getConfiguration().getQueueName(), exchange); + + EnumSet<MessageUpdateFields> fields = null; + Object fieldsObject = exchange.getIn().getHeader(QueueServiceConstants.MESSAGE_UPDATE_FIELDS); + if (fieldsObject instanceof EnumSet) { + @SuppressWarnings("unchecked") + EnumSet<MessageUpdateFields> theFields = (EnumSet<MessageUpdateFields>)fieldsObject; + fields = theFields; + } else if (fieldsObject instanceof MessageUpdateFields) { + fields = EnumSet.of((MessageUpdateFields)fieldsObject); + } + client.updateMessage(message, + getConfiguration().getMessageVisibilityDelay(), + fields, + opts.getRequestOpts(), opts.getOpContext()); + } + + private void deleteMessage(Exchange exchange) throws Exception { + LOG.trace("Deleting the message from the queue [{}] from exchange [{}]...", + getConfiguration().getQueueName(), exchange); + CloudQueue client = QueueServiceUtil.createQueueClient(getConfiguration()); + QueueServiceRequestOptions opts = QueueServiceUtil.getRequestOptions(exchange); + CloudQueueMessage message = getCloudQueueMessage(exchange); + client.deleteMessage(message, opts.getRequestOpts(), opts.getOpContext()); + } + + private void retrieveMessage(Exchange exchange) throws Exception { + QueueServiceUtil.retrieveMessage(exchange, getConfiguration()); + } + + private void peekMessage(Exchange exchange) throws Exception { + CloudQueue client = QueueServiceUtil.createQueueClient(getConfiguration()); + QueueServiceRequestOptions opts = QueueServiceUtil.getRequestOptions(exchange); + CloudQueueMessage message = client.peekMessage(opts.getRequestOpts(), opts.getOpContext()); + ExchangeUtil.getMessageForResponse(exchange).setBody(message); + } + + + private CloudQueueMessage getCloudQueueMessage(Exchange exchange) throws Exception { + Object body = exchange.getIn().getMandatoryBody(); + CloudQueueMessage message = null; + if (body instanceof CloudQueueMessage) { + message = (CloudQueueMessage)body; + } else if (body instanceof String) { + message = new CloudQueueMessage((String)body); + } + if (message == null) { + throw new IllegalArgumentException("Unsupported queue message type:" + body.getClass().getName()); + } + return message; } private QueueServiceOperations determineOperation(Exchange exchange) { http://git-wip-us.apache.org/repos/asf/camel/blob/9a5357e6/components/camel-azure/src/main/java/org/apache/camel/component/azure/queue/QueueServiceRequestOptions.java ---------------------------------------------------------------------- diff --git a/components/camel-azure/src/main/java/org/apache/camel/component/azure/queue/QueueServiceRequestOptions.java b/components/camel-azure/src/main/java/org/apache/camel/component/azure/queue/QueueServiceRequestOptions.java new file mode 100644 index 0000000..f10b83a --- /dev/null +++ b/components/camel-azure/src/main/java/org/apache/camel/component/azure/queue/QueueServiceRequestOptions.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.azure.queue; + +import com.microsoft.azure.storage.queue.QueueRequestOptions; +import org.apache.camel.component.azure.common.AbstractServiceRequestOptions; + +public class QueueServiceRequestOptions extends AbstractServiceRequestOptions { + private QueueRequestOptions requestOpts; + + public QueueRequestOptions getRequestOpts() { + return requestOpts; + } + + public void setRequestOpts(QueueRequestOptions requestOpts) { + this.requestOpts = requestOpts; + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/9a5357e6/components/camel-azure/src/main/java/org/apache/camel/component/azure/queue/QueueServiceUtil.java ---------------------------------------------------------------------- diff --git a/components/camel-azure/src/main/java/org/apache/camel/component/azure/queue/QueueServiceUtil.java b/components/camel-azure/src/main/java/org/apache/camel/component/azure/queue/QueueServiceUtil.java index 17121b0..6f09418 100644 --- a/components/camel-azure/src/main/java/org/apache/camel/component/azure/queue/QueueServiceUtil.java +++ b/components/camel-azure/src/main/java/org/apache/camel/component/azure/queue/QueueServiceUtil.java @@ -18,8 +18,13 @@ package org.apache.camel.component.azure.queue; import java.net.URI; +import com.microsoft.azure.storage.OperationContext; import com.microsoft.azure.storage.StorageCredentials; import com.microsoft.azure.storage.queue.CloudQueue; +import com.microsoft.azure.storage.queue.CloudQueueMessage; +import com.microsoft.azure.storage.queue.QueueRequestOptions; +import org.apache.camel.Exchange; +import org.apache.camel.component.azure.common.ExchangeUtil; public final class QueueServiceUtil { private QueueServiceUtil() { @@ -64,4 +69,29 @@ public final class QueueServiceUtil { public static StorageCredentials getAccountCredentials(QueueServiceConfiguration cfg) { return cfg.getCredentials(); } + + public static void retrieveMessage(Exchange exchange, QueueServiceConfiguration cfg) throws Exception { + CloudQueue client = createQueueClient(cfg); + QueueServiceRequestOptions opts = getRequestOptions(exchange); + CloudQueueMessage message = client.retrieveMessage(cfg.getMessageVisibilityDelay(), + opts.getRequestOpts(), opts.getOpContext()); + ExchangeUtil.getMessageForResponse(exchange).setBody(message); + } + + public static QueueServiceRequestOptions getRequestOptions(Exchange exchange) { + QueueServiceRequestOptions opts = exchange.getIn().getHeader( + QueueServiceConstants.QUEUE_SERVICE_REQUEST_OPTIONS, QueueServiceRequestOptions.class); + if (opts != null) { + return opts; + } else { + opts = new QueueServiceRequestOptions(); + } + QueueRequestOptions requestOpts = + exchange.getIn().getHeader(QueueServiceConstants.QUEUE_REQUEST_OPTIONS, QueueRequestOptions.class); + OperationContext opContext = + exchange.getIn().getHeader(QueueServiceConstants.OPERATION_CONTEXT, OperationContext.class); + opts.setOpContext(opContext); + opts.setRequestOpts(requestOpts); + return opts; + } }