[GitHub] nifi pull request #2611: NIFI-5015: Implemented Azure Queue Storage processo...
Github user asfgit closed the pull request at: https://github.com/apache/nifi/pull/2611 ---
[GitHub] nifi pull request #2611: NIFI-5015: Implemented Azure Queue Storage processo...
Github user zenfenan commented on a diff in the pull request: https://github.com/apache/nifi/pull/2611#discussion_r183125399 --- Diff: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/GetAzureQueueStorage.java --- @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.azure.storage.queue; + +import com.microsoft.azure.storage.StorageException; +import com.microsoft.azure.storage.queue.CloudQueueMessage; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Collections; +import java.util.Arrays; +import java.util.Set; +import java.util.Map; +import java.util.HashMap; +import java.util.concurrent.TimeUnit; + +@SeeAlso({PutAzureQueueStorage.class}) +@InputRequirement(Requirement.INPUT_FORBIDDEN) +@Tags({"azure", "queue", "microsoft", "storage", "dequeue", "cloud"}) +@CapabilityDescription("Retrieves the messages from an Azure Queue Storage. The retrieved messages will be deleted from the queue by default. If the requirement is " + +"to consume messages without deleting them, set 'Auto Delete Messages' to 'false'.") +@WritesAttributes({ +@WritesAttribute(attribute = "azure.queue.uri", description = "The absolute URI of the configured Azure Queue Storage"), +@WritesAttribute(attribute = "azure.queue.insertionTime", description = "The time when the message was inserted into the queue storage"), +@WritesAttribute(attribute = "azure.queue.expirationTime", description = "The time when the message will expire from the queue storage"), +@WritesAttribute(attribute = "azure.queue.messageId", description = "The ID of the retrieved message"), +@WritesAttribute(attribute = "azure.queue.popReceipt", description = "The pop receipt of the retrieved message"), +}) +public class GetAzureQueueStorage extends AbstractAzureQueueStorage { + +public static final PropertyDescriptor AUTO_DELETE = new PropertyDescriptor.Builder() +.name("auto-delete-messages") +.displayName("Auto Delete Messages") +.description("Specifies whether the received message is to be automatically deleted from the queue.") +.required(true) +.allowableValues("true", "false") +.defaultValue("true") +.addValidator(StandardValidators.BOOLEAN_VALIDATOR) +.build(); + +public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder() +.name("batch-size") +.displayName("Batch Size") +.description("The number of messages to be retrieved from the queue.") +.required(true) +.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) +.defaultValue("32") +.build(); + +public
[GitHub] nifi pull request #2611: NIFI-5015: Implemented Azure Queue Storage processo...
Github user zenfenan commented on a diff in the pull request: https://github.com/apache/nifi/pull/2611#discussion_r183126762 --- Diff: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/AbstractAzureQueueStorage.java --- @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.azure.storage.queue; + +import com.microsoft.azure.storage.CloudStorageAccount; +import com.microsoft.azure.storage.StorageCredentials; +import com.microsoft.azure.storage.StorageCredentialsSharedAccessSignature; +import com.microsoft.azure.storage.StorageException; +import com.microsoft.azure.storage.queue.CloudQueue; +import com.microsoft.azure.storage.queue.CloudQueueClient; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils; + +import java.net.URI; +import java.net.URISyntaxException; +import java.security.InvalidKeyException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +public abstract class AbstractAzureQueueStorage extends AbstractProcessor { + +public static final PropertyDescriptor QUEUE = new PropertyDescriptor.Builder() +.name("storage-cloudQueue-name") +.displayName("Queue Name") +.description("Name of the Azure Storage Queue") +.required(true) +.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) --- End diff -- You were right. Removed `@OnScheduled` not just because of this reason alone but `ACCOUNT_NAME` and `ACCOUNT_KEY` or `SAS_TOKEN` properties use `ExpressionLanguageScope.FLOWFILES` so that could be a potential bug since with that method dint get flowfile as a parameter before. Now it has been changed. ---
[GitHub] nifi pull request #2611: NIFI-5015: Implemented Azure Queue Storage processo...
Github user zenfenan commented on a diff in the pull request: https://github.com/apache/nifi/pull/2611#discussion_r183125717 --- Diff: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/GetAzureQueueStorage.java --- @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.azure.storage.queue; + +import com.microsoft.azure.storage.StorageException; +import com.microsoft.azure.storage.queue.CloudQueueMessage; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Collections; +import java.util.Arrays; +import java.util.Set; +import java.util.Map; +import java.util.HashMap; +import java.util.concurrent.TimeUnit; + +@SeeAlso({PutAzureQueueStorage.class}) +@InputRequirement(Requirement.INPUT_FORBIDDEN) +@Tags({"azure", "queue", "microsoft", "storage", "dequeue", "cloud"}) +@CapabilityDescription("Retrieves the messages from an Azure Queue Storage. The retrieved messages will be deleted from the queue by default. If the requirement is " + +"to consume messages without deleting them, set 'Auto Delete Messages' to 'false'.") +@WritesAttributes({ +@WritesAttribute(attribute = "azure.queue.uri", description = "The absolute URI of the configured Azure Queue Storage"), +@WritesAttribute(attribute = "azure.queue.insertionTime", description = "The time when the message was inserted into the queue storage"), +@WritesAttribute(attribute = "azure.queue.expirationTime", description = "The time when the message will expire from the queue storage"), +@WritesAttribute(attribute = "azure.queue.messageId", description = "The ID of the retrieved message"), +@WritesAttribute(attribute = "azure.queue.popReceipt", description = "The pop receipt of the retrieved message"), +}) +public class GetAzureQueueStorage extends AbstractAzureQueueStorage { + +public static final PropertyDescriptor AUTO_DELETE = new PropertyDescriptor.Builder() +.name("auto-delete-messages") +.displayName("Auto Delete Messages") +.description("Specifies whether the received message is to be automatically deleted from the queue.") +.required(true) +.allowableValues("true", "false") +.defaultValue("true") +.addValidator(StandardValidators.BOOLEAN_VALIDATOR) +.build(); + +public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder() +.name("batch-size") +.displayName("Batch Size") +.description("The number of messages to be retrieved from the queue.") +.required(true) +.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) +.defaultValue("32") +.build(); + +public
[GitHub] nifi pull request #2611: NIFI-5015: Implemented Azure Queue Storage processo...
Github user zenfenan commented on a diff in the pull request: https://github.com/apache/nifi/pull/2611#discussion_r183125143 --- Diff: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/GetAzureQueueStorage.java --- @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.azure.storage.queue; + +import com.microsoft.azure.storage.StorageException; +import com.microsoft.azure.storage.queue.CloudQueueMessage; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Collections; +import java.util.Arrays; +import java.util.Set; +import java.util.Map; +import java.util.HashMap; +import java.util.concurrent.TimeUnit; + +@SeeAlso({PutAzureQueueStorage.class}) +@InputRequirement(Requirement.INPUT_FORBIDDEN) +@Tags({"azure", "queue", "microsoft", "storage", "dequeue", "cloud"}) +@CapabilityDescription("Retrieves the messages from an Azure Queue Storage. The retrieved messages will be deleted from the queue by default. If the requirement is " + +"to consume messages without deleting them, set 'Auto Delete Messages' to 'false'.") +@WritesAttributes({ +@WritesAttribute(attribute = "azure.queue.uri", description = "The absolute URI of the configured Azure Queue Storage"), +@WritesAttribute(attribute = "azure.queue.insertionTime", description = "The time when the message was inserted into the queue storage"), +@WritesAttribute(attribute = "azure.queue.expirationTime", description = "The time when the message will expire from the queue storage"), +@WritesAttribute(attribute = "azure.queue.messageId", description = "The ID of the retrieved message"), +@WritesAttribute(attribute = "azure.queue.popReceipt", description = "The pop receipt of the retrieved message"), +}) +public class GetAzureQueueStorage extends AbstractAzureQueueStorage { + +public static final PropertyDescriptor AUTO_DELETE = new PropertyDescriptor.Builder() +.name("auto-delete-messages") +.displayName("Auto Delete Messages") +.description("Specifies whether the received message is to be automatically deleted from the queue.") +.required(true) +.allowableValues("true", "false") +.defaultValue("true") +.addValidator(StandardValidators.BOOLEAN_VALIDATOR) +.build(); + +public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder() +.name("batch-size") +.displayName("Batch Size") +.description("The number of messages to be retrieved from the queue.") +.required(true) +.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) +.defaultValue("32") +.build(); + +public
[GitHub] nifi pull request #2611: NIFI-5015: Implemented Azure Queue Storage processo...
Github user pvillard31 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2611#discussion_r182965345 --- Diff: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/GetAzureQueueStorage.java --- @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.azure.storage.queue; + +import com.microsoft.azure.storage.StorageException; +import com.microsoft.azure.storage.queue.CloudQueueMessage; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Collections; +import java.util.Arrays; +import java.util.Set; +import java.util.Map; +import java.util.HashMap; +import java.util.concurrent.TimeUnit; + +@SeeAlso({PutAzureQueueStorage.class}) +@InputRequirement(Requirement.INPUT_FORBIDDEN) +@Tags({"azure", "queue", "microsoft", "storage", "dequeue", "cloud"}) +@CapabilityDescription("Retrieves the messages from an Azure Queue Storage. The retrieved messages will be deleted from the queue by default. If the requirement is " + +"to consume messages without deleting them, set 'Auto Delete Messages' to 'false'.") +@WritesAttributes({ +@WritesAttribute(attribute = "azure.queue.uri", description = "The absolute URI of the configured Azure Queue Storage"), +@WritesAttribute(attribute = "azure.queue.insertionTime", description = "The time when the message was inserted into the queue storage"), +@WritesAttribute(attribute = "azure.queue.expirationTime", description = "The time when the message will expire from the queue storage"), +@WritesAttribute(attribute = "azure.queue.messageId", description = "The ID of the retrieved message"), +@WritesAttribute(attribute = "azure.queue.popReceipt", description = "The pop receipt of the retrieved message"), +}) +public class GetAzureQueueStorage extends AbstractAzureQueueStorage { + +public static final PropertyDescriptor AUTO_DELETE = new PropertyDescriptor.Builder() +.name("auto-delete-messages") +.displayName("Auto Delete Messages") +.description("Specifies whether the received message is to be automatically deleted from the queue.") +.required(true) +.allowableValues("true", "false") +.defaultValue("true") +.addValidator(StandardValidators.BOOLEAN_VALIDATOR) +.build(); + +public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder() +.name("batch-size") +.displayName("Batch Size") +.description("The number of messages to be retrieved from the queue.") +.required(true) +.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) +.defaultValue("32") +.build(); + +
[GitHub] nifi pull request #2611: NIFI-5015: Implemented Azure Queue Storage processo...
Github user pvillard31 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2611#discussion_r182965161 --- Diff: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/AbstractAzureQueueStorage.java --- @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.azure.storage.queue; + +import com.microsoft.azure.storage.CloudStorageAccount; +import com.microsoft.azure.storage.StorageCredentials; +import com.microsoft.azure.storage.StorageCredentialsSharedAccessSignature; +import com.microsoft.azure.storage.StorageException; +import com.microsoft.azure.storage.queue.CloudQueue; +import com.microsoft.azure.storage.queue.CloudQueueClient; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils; + +import java.net.URI; +import java.net.URISyntaxException; +import java.security.InvalidKeyException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +public abstract class AbstractAzureQueueStorage extends AbstractProcessor { + +public static final PropertyDescriptor QUEUE = new PropertyDescriptor.Builder() +.name("storage-cloudQueue-name") +.displayName("Queue Name") +.description("Name of the Azure Storage Queue") +.required(true) +.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) +.build(); + +public static final Relationship REL_SUCCESS = new Relationship.Builder() +.name("success") +.description("All successfully processed FlowFiles are routed to this relationship") +.build(); + +public static final Relationship REL_FAILURE = new Relationship.Builder() +.name("failure") +.description("Unsuccessful operations will be transferred to the failure relationship.") +.build(); + +private static final String FORMAT_QUEUE_CONNECTION_STRING = "DefaultEndpointsProtocol=https;AccountName=%s;AccountKey=%s"; --- End diff -- No, it's just in case some are added and we don't want to update the code. But we can leave as-is. ---
[GitHub] nifi pull request #2611: NIFI-5015: Implemented Azure Queue Storage processo...
Github user pvillard31 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2611#discussion_r182964837 --- Diff: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/AbstractAzureQueueStorage.java --- @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.azure.storage.queue; + +import com.microsoft.azure.storage.CloudStorageAccount; +import com.microsoft.azure.storage.StorageCredentials; +import com.microsoft.azure.storage.StorageCredentialsSharedAccessSignature; +import com.microsoft.azure.storage.StorageException; +import com.microsoft.azure.storage.queue.CloudQueue; +import com.microsoft.azure.storage.queue.CloudQueueClient; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils; + +import java.net.URI; +import java.net.URISyntaxException; +import java.security.InvalidKeyException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +public abstract class AbstractAzureQueueStorage extends AbstractProcessor { + +public static final PropertyDescriptor QUEUE = new PropertyDescriptor.Builder() +.name("storage-cloudQueue-name") +.displayName("Queue Name") +.description("Name of the Azure Storage Queue") +.required(true) +.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) --- End diff -- I don't see where you're accessing this property using flow files. In the abstract class, you're doing: java final String queueName = validationContext.getProperty(QUEUE).evaluateAttributeExpressions().getValue(); in the ``@OnScheduled`` method. So you're not using the flow files. Am I missing something? ---
[GitHub] nifi pull request #2611: NIFI-5015: Implemented Azure Queue Storage processo...
Github user zenfenan commented on a diff in the pull request: https://github.com/apache/nifi/pull/2611#discussion_r182958368 --- Diff: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/GetAzureQueueStorage.java --- @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.azure.storage.queue; + +import com.microsoft.azure.storage.StorageException; +import com.microsoft.azure.storage.queue.CloudQueueMessage; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Collections; +import java.util.Arrays; +import java.util.Set; +import java.util.Map; +import java.util.HashMap; +import java.util.concurrent.TimeUnit; + +@SeeAlso({PutAzureQueueStorage.class}) +@InputRequirement(Requirement.INPUT_FORBIDDEN) +@Tags({"azure", "queue", "microsoft", "storage", "dequeue", "cloud"}) +@CapabilityDescription("Retrieves the messages from an Azure Queue Storage. The retrieved messages will be deleted from the queue by default. If the requirement is " + +"to consume messages without deleting them, set 'Auto Delete Messages' to 'false'.") +@WritesAttributes({ +@WritesAttribute(attribute = "azure.queue.uri", description = "The absolute URI of the configured Azure Queue Storage"), +@WritesAttribute(attribute = "azure.queue.insertionTime", description = "The time when the message was inserted into the queue storage"), +@WritesAttribute(attribute = "azure.queue.expirationTime", description = "The time when the message will expire from the queue storage"), +@WritesAttribute(attribute = "azure.queue.messageId", description = "The ID of the retrieved message"), +@WritesAttribute(attribute = "azure.queue.popReceipt", description = "The pop receipt of the retrieved message"), +}) +public class GetAzureQueueStorage extends AbstractAzureQueueStorage { + +public static final PropertyDescriptor AUTO_DELETE = new PropertyDescriptor.Builder() +.name("auto-delete-messages") +.displayName("Auto Delete Messages") +.description("Specifies whether the received message is to be automatically deleted from the queue.") +.required(true) +.allowableValues("true", "false") +.defaultValue("true") +.addValidator(StandardValidators.BOOLEAN_VALIDATOR) +.build(); + +public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder() +.name("batch-size") +.displayName("Batch Size") +.description("The number of messages to be retrieved from the queue.") +.required(true) +.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) +.defaultValue("32") +.build(); + +public
[GitHub] nifi pull request #2611: NIFI-5015: Implemented Azure Queue Storage processo...
Github user zenfenan commented on a diff in the pull request: https://github.com/apache/nifi/pull/2611#discussion_r182948627 --- Diff: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/GetAzureQueueStorage.java --- @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.azure.storage.queue; + +import com.microsoft.azure.storage.StorageException; +import com.microsoft.azure.storage.queue.CloudQueueMessage; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Collections; +import java.util.Arrays; +import java.util.Set; +import java.util.Map; +import java.util.HashMap; +import java.util.concurrent.TimeUnit; + +@SeeAlso({PutAzureQueueStorage.class}) +@InputRequirement(Requirement.INPUT_FORBIDDEN) +@Tags({"azure", "queue", "microsoft", "storage", "dequeue", "cloud"}) +@CapabilityDescription("Retrieves the messages from an Azure Queue Storage. The retrieved messages will be deleted from the queue by default. If the requirement is " + +"to consume messages without deleting them, set 'Auto Delete Messages' to 'false'.") +@WritesAttributes({ +@WritesAttribute(attribute = "azure.queue.uri", description = "The absolute URI of the configured Azure Queue Storage"), +@WritesAttribute(attribute = "azure.queue.insertionTime", description = "The time when the message was inserted into the queue storage"), +@WritesAttribute(attribute = "azure.queue.expirationTime", description = "The time when the message will expire from the queue storage"), +@WritesAttribute(attribute = "azure.queue.messageId", description = "The ID of the retrieved message"), +@WritesAttribute(attribute = "azure.queue.popReceipt", description = "The pop receipt of the retrieved message"), +}) +public class GetAzureQueueStorage extends AbstractAzureQueueStorage { + +public static final PropertyDescriptor AUTO_DELETE = new PropertyDescriptor.Builder() +.name("auto-delete-messages") +.displayName("Auto Delete Messages") +.description("Specifies whether the received message is to be automatically deleted from the queue.") +.required(true) +.allowableValues("true", "false") +.defaultValue("true") +.addValidator(StandardValidators.BOOLEAN_VALIDATOR) +.build(); + +public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder() +.name("batch-size") +.displayName("Batch Size") +.description("The number of messages to be retrieved from the queue.") +.required(true) +.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) +.defaultValue("32") +.build(); + +public
[GitHub] nifi pull request #2611: NIFI-5015: Implemented Azure Queue Storage processo...
Github user zenfenan commented on a diff in the pull request: https://github.com/apache/nifi/pull/2611#discussion_r182948377 --- Diff: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/GetAzureQueueStorage.java --- @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.azure.storage.queue; + +import com.microsoft.azure.storage.StorageException; +import com.microsoft.azure.storage.queue.CloudQueueMessage; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Collections; +import java.util.Arrays; +import java.util.Set; +import java.util.Map; +import java.util.HashMap; +import java.util.concurrent.TimeUnit; + +@SeeAlso({PutAzureQueueStorage.class}) +@InputRequirement(Requirement.INPUT_FORBIDDEN) +@Tags({"azure", "queue", "microsoft", "storage", "dequeue", "cloud"}) +@CapabilityDescription("Retrieves the messages from an Azure Queue Storage. The retrieved messages will be deleted from the queue by default. If the requirement is " + +"to consume messages without deleting them, set 'Auto Delete Messages' to 'false'.") +@WritesAttributes({ +@WritesAttribute(attribute = "azure.queue.uri", description = "The absolute URI of the configured Azure Queue Storage"), +@WritesAttribute(attribute = "azure.queue.insertionTime", description = "The time when the message was inserted into the queue storage"), +@WritesAttribute(attribute = "azure.queue.expirationTime", description = "The time when the message will expire from the queue storage"), +@WritesAttribute(attribute = "azure.queue.messageId", description = "The ID of the retrieved message"), +@WritesAttribute(attribute = "azure.queue.popReceipt", description = "The pop receipt of the retrieved message"), +}) +public class GetAzureQueueStorage extends AbstractAzureQueueStorage { + +public static final PropertyDescriptor AUTO_DELETE = new PropertyDescriptor.Builder() +.name("auto-delete-messages") +.displayName("Auto Delete Messages") +.description("Specifies whether the received message is to be automatically deleted from the queue.") +.required(true) +.allowableValues("true", "false") +.defaultValue("true") +.addValidator(StandardValidators.BOOLEAN_VALIDATOR) +.build(); + +public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder() +.name("batch-size") +.displayName("Batch Size") +.description("The number of messages to be retrieved from the queue.") +.required(true) +.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) +.defaultValue("32") +.build(); + +public
[GitHub] nifi pull request #2611: NIFI-5015: Implemented Azure Queue Storage processo...
Github user zenfenan commented on a diff in the pull request: https://github.com/apache/nifi/pull/2611#discussion_r182948346 --- Diff: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/GetAzureQueueStorage.java --- @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.azure.storage.queue; + +import com.microsoft.azure.storage.StorageException; +import com.microsoft.azure.storage.queue.CloudQueueMessage; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Collections; +import java.util.Arrays; +import java.util.Set; +import java.util.Map; +import java.util.HashMap; +import java.util.concurrent.TimeUnit; + +@SeeAlso({PutAzureQueueStorage.class}) +@InputRequirement(Requirement.INPUT_FORBIDDEN) +@Tags({"azure", "queue", "microsoft", "storage", "dequeue", "cloud"}) +@CapabilityDescription("Retrieves the messages from an Azure Queue Storage. The retrieved messages will be deleted from the queue by default. If the requirement is " + +"to consume messages without deleting them, set 'Auto Delete Messages' to 'false'.") +@WritesAttributes({ +@WritesAttribute(attribute = "azure.queue.uri", description = "The absolute URI of the configured Azure Queue Storage"), +@WritesAttribute(attribute = "azure.queue.insertionTime", description = "The time when the message was inserted into the queue storage"), +@WritesAttribute(attribute = "azure.queue.expirationTime", description = "The time when the message will expire from the queue storage"), +@WritesAttribute(attribute = "azure.queue.messageId", description = "The ID of the retrieved message"), +@WritesAttribute(attribute = "azure.queue.popReceipt", description = "The pop receipt of the retrieved message"), +}) +public class GetAzureQueueStorage extends AbstractAzureQueueStorage { + +public static final PropertyDescriptor AUTO_DELETE = new PropertyDescriptor.Builder() +.name("auto-delete-messages") +.displayName("Auto Delete Messages") +.description("Specifies whether the received message is to be automatically deleted from the queue.") +.required(true) +.allowableValues("true", "false") +.defaultValue("true") +.addValidator(StandardValidators.BOOLEAN_VALIDATOR) +.build(); + +public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder() +.name("batch-size") +.displayName("Batch Size") +.description("The number of messages to be retrieved from the queue.") +.required(true) +.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) +.defaultValue("32") +.build(); + +public
[GitHub] nifi pull request #2611: NIFI-5015: Implemented Azure Queue Storage processo...
Github user zenfenan commented on a diff in the pull request: https://github.com/apache/nifi/pull/2611#discussion_r182948251 --- Diff: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/GetAzureQueueStorage.java --- @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.azure.storage.queue; + +import com.microsoft.azure.storage.StorageException; +import com.microsoft.azure.storage.queue.CloudQueueMessage; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Collections; +import java.util.Arrays; +import java.util.Set; +import java.util.Map; +import java.util.HashMap; +import java.util.concurrent.TimeUnit; + +@SeeAlso({PutAzureQueueStorage.class}) +@InputRequirement(Requirement.INPUT_FORBIDDEN) +@Tags({"azure", "queue", "microsoft", "storage", "dequeue", "cloud"}) +@CapabilityDescription("Retrieves the messages from an Azure Queue Storage. The retrieved messages will be deleted from the queue by default. If the requirement is " + +"to consume messages without deleting them, set 'Auto Delete Messages' to 'false'.") +@WritesAttributes({ +@WritesAttribute(attribute = "azure.queue.uri", description = "The absolute URI of the configured Azure Queue Storage"), +@WritesAttribute(attribute = "azure.queue.insertionTime", description = "The time when the message was inserted into the queue storage"), +@WritesAttribute(attribute = "azure.queue.expirationTime", description = "The time when the message will expire from the queue storage"), +@WritesAttribute(attribute = "azure.queue.messageId", description = "The ID of the retrieved message"), +@WritesAttribute(attribute = "azure.queue.popReceipt", description = "The pop receipt of the retrieved message"), +}) +public class GetAzureQueueStorage extends AbstractAzureQueueStorage { + +public static final PropertyDescriptor AUTO_DELETE = new PropertyDescriptor.Builder() +.name("auto-delete-messages") +.displayName("Auto Delete Messages") +.description("Specifies whether the received message is to be automatically deleted from the queue.") +.required(true) +.allowableValues("true", "false") +.defaultValue("true") +.addValidator(StandardValidators.BOOLEAN_VALIDATOR) +.build(); + +public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder() +.name("batch-size") +.displayName("Batch Size") +.description("The number of messages to be retrieved from the queue.") +.required(true) +.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) +.defaultValue("32") +.build(); + +public
[GitHub] nifi pull request #2611: NIFI-5015: Implemented Azure Queue Storage processo...
Github user zenfenan commented on a diff in the pull request: https://github.com/apache/nifi/pull/2611#discussion_r182948045 --- Diff: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/AbstractAzureQueueStorage.java --- @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.azure.storage.queue; + +import com.microsoft.azure.storage.CloudStorageAccount; +import com.microsoft.azure.storage.StorageCredentials; +import com.microsoft.azure.storage.StorageCredentialsSharedAccessSignature; +import com.microsoft.azure.storage.StorageException; +import com.microsoft.azure.storage.queue.CloudQueue; +import com.microsoft.azure.storage.queue.CloudQueueClient; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils; + +import java.net.URI; +import java.net.URISyntaxException; +import java.security.InvalidKeyException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +public abstract class AbstractAzureQueueStorage extends AbstractProcessor { + +public static final PropertyDescriptor QUEUE = new PropertyDescriptor.Builder() +.name("storage-cloudQueue-name") +.displayName("Queue Name") +.description("Name of the Azure Storage Queue") +.required(true) +.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) +.build(); + +public static final Relationship REL_SUCCESS = new Relationship.Builder() +.name("success") +.description("All successfully processed FlowFiles are routed to this relationship") +.build(); + +public static final Relationship REL_FAILURE = new Relationship.Builder() +.name("failure") +.description("Unsuccessful operations will be transferred to the failure relationship.") +.build(); + +private static final String FORMAT_QUEUE_CONNECTION_STRING = "DefaultEndpointsProtocol=https;AccountName=%s;AccountKey=%s"; +private static final String FORMAT_QUEUE_BASE_URI = "https://%s.queue.core.windows.net;; + +private static final Set relationships = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE))); + +protected CloudQueue cloudQueue; + +@Override +public Set getRelationships() { +return relationships; +} + +@Override +protected Collection customValidate(final ValidationContext validationContext) { +final List problems = new ArrayList<>(AzureStorageUtils.validateCredentialProperties(validationContext)); + +final String queueName = validationContext.getProperty(QUEUE).evaluateAttributeExpressions().getValue(); +if(!StringUtils.isAllLowerCase(queueName)) { --- End diff -- That's what I thought because that's the simplest thing to do. However I wanted to emphasis to the user about the same. I din't know any other way other than letting the validator do its thing. ---
[GitHub] nifi pull request #2611: NIFI-5015: Implemented Azure Queue Storage processo...
Github user zenfenan commented on a diff in the pull request: https://github.com/apache/nifi/pull/2611#discussion_r182947890 --- Diff: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/AbstractAzureQueueStorage.java --- @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.azure.storage.queue; + +import com.microsoft.azure.storage.CloudStorageAccount; +import com.microsoft.azure.storage.StorageCredentials; +import com.microsoft.azure.storage.StorageCredentialsSharedAccessSignature; +import com.microsoft.azure.storage.StorageException; +import com.microsoft.azure.storage.queue.CloudQueue; +import com.microsoft.azure.storage.queue.CloudQueueClient; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils; + +import java.net.URI; +import java.net.URISyntaxException; +import java.security.InvalidKeyException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +public abstract class AbstractAzureQueueStorage extends AbstractProcessor { + +public static final PropertyDescriptor QUEUE = new PropertyDescriptor.Builder() +.name("storage-cloudQueue-name") +.displayName("Queue Name") +.description("Name of the Azure Storage Queue") +.required(true) +.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) +.build(); + +public static final Relationship REL_SUCCESS = new Relationship.Builder() +.name("success") +.description("All successfully processed FlowFiles are routed to this relationship") +.build(); + +public static final Relationship REL_FAILURE = new Relationship.Builder() +.name("failure") +.description("Unsuccessful operations will be transferred to the failure relationship.") +.build(); + +private static final String FORMAT_QUEUE_CONNECTION_STRING = "DefaultEndpointsProtocol=https;AccountName=%s;AccountKey=%s"; --- End diff -- I don't think the connection string takes any other properties other than `AccountName`, `AccountKey`, `DefaultEndpointsProtocol`, `EndpointSuffix` (for Azure China & Azure Government). You are aware about any? ---
[GitHub] nifi pull request #2611: NIFI-5015: Implemented Azure Queue Storage processo...
Github user zenfenan commented on a diff in the pull request: https://github.com/apache/nifi/pull/2611#discussion_r182946668 --- Diff: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/AbstractAzureQueueStorage.java --- @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.azure.storage.queue; + +import com.microsoft.azure.storage.CloudStorageAccount; +import com.microsoft.azure.storage.StorageCredentials; +import com.microsoft.azure.storage.StorageCredentialsSharedAccessSignature; +import com.microsoft.azure.storage.StorageException; +import com.microsoft.azure.storage.queue.CloudQueue; +import com.microsoft.azure.storage.queue.CloudQueueClient; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils; + +import java.net.URI; +import java.net.URISyntaxException; +import java.security.InvalidKeyException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +public abstract class AbstractAzureQueueStorage extends AbstractProcessor { + +public static final PropertyDescriptor QUEUE = new PropertyDescriptor.Builder() +.name("storage-cloudQueue-name") +.displayName("Queue Name") +.description("Name of the Azure Storage Queue") +.required(true) +.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) +.build(); + +public static final Relationship REL_SUCCESS = new Relationship.Builder() +.name("success") +.description("All successfully processed FlowFiles are routed to this relationship") +.build(); + +public static final Relationship REL_FAILURE = new Relationship.Builder() +.name("failure") +.description("Unsuccessful operations will be transferred to the failure relationship.") +.build(); + +private static final String FORMAT_QUEUE_CONNECTION_STRING = "DefaultEndpointsProtocol=https;AccountName=%s;AccountKey=%s"; +private static final String FORMAT_QUEUE_BASE_URI = "https://%s.queue.core.windows.net;; --- End diff -- The end point won't change unless Azure wants to break backwards compatibility.. Just kidding. I don't think the end point will change since it is what the `azure-storage` SDKs use. However, it might be possible to property'lize one thing, the connection mode. The SDKs, I believe, support both `http` as well as `https` but since https is recommended both in the Azure docs as well as in general, I went with https by default. Also the blob processors use https. ---
[GitHub] nifi pull request #2611: NIFI-5015: Implemented Azure Queue Storage processo...
Github user zenfenan commented on a diff in the pull request: https://github.com/apache/nifi/pull/2611#discussion_r182946138 --- Diff: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/AbstractAzureQueueStorage.java --- @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.azure.storage.queue; + +import com.microsoft.azure.storage.CloudStorageAccount; +import com.microsoft.azure.storage.StorageCredentials; +import com.microsoft.azure.storage.StorageCredentialsSharedAccessSignature; +import com.microsoft.azure.storage.StorageException; +import com.microsoft.azure.storage.queue.CloudQueue; +import com.microsoft.azure.storage.queue.CloudQueueClient; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils; + +import java.net.URI; +import java.net.URISyntaxException; +import java.security.InvalidKeyException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +public abstract class AbstractAzureQueueStorage extends AbstractProcessor { + +public static final PropertyDescriptor QUEUE = new PropertyDescriptor.Builder() +.name("storage-cloudQueue-name") +.displayName("Queue Name") +.description("Name of the Azure Storage Queue") +.required(true) +.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) --- End diff -- Any particular reason why it shouldn't be expected to be present as a flowfile attribute? It is not sensitive so I thought it can be read from both FlowFile as well as Variable Registry ---
[GitHub] nifi pull request #2611: NIFI-5015: Implemented Azure Queue Storage processo...
Github user pvillard31 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2611#discussion_r182836230 --- Diff: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/GetAzureQueueStorage.java --- @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.azure.storage.queue; + +import com.microsoft.azure.storage.StorageException; +import com.microsoft.azure.storage.queue.CloudQueueMessage; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Collections; +import java.util.Arrays; +import java.util.Set; +import java.util.Map; +import java.util.HashMap; +import java.util.concurrent.TimeUnit; + +@SeeAlso({PutAzureQueueStorage.class}) +@InputRequirement(Requirement.INPUT_FORBIDDEN) +@Tags({"azure", "queue", "microsoft", "storage", "dequeue", "cloud"}) +@CapabilityDescription("Retrieves the messages from an Azure Queue Storage. The retrieved messages will be deleted from the queue by default. If the requirement is " + +"to consume messages without deleting them, set 'Auto Delete Messages' to 'false'.") +@WritesAttributes({ +@WritesAttribute(attribute = "azure.queue.uri", description = "The absolute URI of the configured Azure Queue Storage"), +@WritesAttribute(attribute = "azure.queue.insertionTime", description = "The time when the message was inserted into the queue storage"), +@WritesAttribute(attribute = "azure.queue.expirationTime", description = "The time when the message will expire from the queue storage"), +@WritesAttribute(attribute = "azure.queue.messageId", description = "The ID of the retrieved message"), +@WritesAttribute(attribute = "azure.queue.popReceipt", description = "The pop receipt of the retrieved message"), +}) +public class GetAzureQueueStorage extends AbstractAzureQueueStorage { + +public static final PropertyDescriptor AUTO_DELETE = new PropertyDescriptor.Builder() +.name("auto-delete-messages") +.displayName("Auto Delete Messages") +.description("Specifies whether the received message is to be automatically deleted from the queue.") +.required(true) +.allowableValues("true", "false") +.defaultValue("true") +.addValidator(StandardValidators.BOOLEAN_VALIDATOR) +.build(); + +public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder() +.name("batch-size") +.displayName("Batch Size") +.description("The number of messages to be retrieved from the queue.") +.required(true) +.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) +.defaultValue("32") +.build(); + +
[GitHub] nifi pull request #2611: NIFI-5015: Implemented Azure Queue Storage processo...
Github user pvillard31 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2611#discussion_r182835135 --- Diff: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/AbstractAzureQueueStorage.java --- @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.azure.storage.queue; + +import com.microsoft.azure.storage.CloudStorageAccount; +import com.microsoft.azure.storage.StorageCredentials; +import com.microsoft.azure.storage.StorageCredentialsSharedAccessSignature; +import com.microsoft.azure.storage.StorageException; +import com.microsoft.azure.storage.queue.CloudQueue; +import com.microsoft.azure.storage.queue.CloudQueueClient; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils; + +import java.net.URI; +import java.net.URISyntaxException; +import java.security.InvalidKeyException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +public abstract class AbstractAzureQueueStorage extends AbstractProcessor { + +public static final PropertyDescriptor QUEUE = new PropertyDescriptor.Builder() +.name("storage-cloudQueue-name") +.displayName("Queue Name") +.description("Name of the Azure Storage Queue") +.required(true) +.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) +.build(); + +public static final Relationship REL_SUCCESS = new Relationship.Builder() +.name("success") +.description("All successfully processed FlowFiles are routed to this relationship") +.build(); + +public static final Relationship REL_FAILURE = new Relationship.Builder() +.name("failure") +.description("Unsuccessful operations will be transferred to the failure relationship.") +.build(); + +private static final String FORMAT_QUEUE_CONNECTION_STRING = "DefaultEndpointsProtocol=https;AccountName=%s;AccountKey=%s"; --- End diff -- should it be configurable in case user wants to provide more parameters? with dynamic properties? ---
[GitHub] nifi pull request #2611: NIFI-5015: Implemented Azure Queue Storage processo...
Github user pvillard31 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2611#discussion_r182835251 --- Diff: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/AbstractAzureQueueStorage.java --- @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.azure.storage.queue; + +import com.microsoft.azure.storage.CloudStorageAccount; +import com.microsoft.azure.storage.StorageCredentials; +import com.microsoft.azure.storage.StorageCredentialsSharedAccessSignature; +import com.microsoft.azure.storage.StorageException; +import com.microsoft.azure.storage.queue.CloudQueue; +import com.microsoft.azure.storage.queue.CloudQueueClient; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils; + +import java.net.URI; +import java.net.URISyntaxException; +import java.security.InvalidKeyException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +public abstract class AbstractAzureQueueStorage extends AbstractProcessor { + +public static final PropertyDescriptor QUEUE = new PropertyDescriptor.Builder() +.name("storage-cloudQueue-name") +.displayName("Queue Name") +.description("Name of the Azure Storage Queue") +.required(true) +.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) +.build(); + +public static final Relationship REL_SUCCESS = new Relationship.Builder() +.name("success") +.description("All successfully processed FlowFiles are routed to this relationship") +.build(); + +public static final Relationship REL_FAILURE = new Relationship.Builder() +.name("failure") +.description("Unsuccessful operations will be transferred to the failure relationship.") +.build(); + +private static final String FORMAT_QUEUE_CONNECTION_STRING = "DefaultEndpointsProtocol=https;AccountName=%s;AccountKey=%s"; +private static final String FORMAT_QUEUE_BASE_URI = "https://%s.queue.core.windows.net;; + +private static final Set relationships = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE))); + +protected CloudQueue cloudQueue; + +@Override +public Set getRelationships() { +return relationships; +} + +@Override +protected Collection customValidate(final ValidationContext validationContext) { +final List problems = new ArrayList<>(AzureStorageUtils.validateCredentialProperties(validationContext)); + +final String queueName = validationContext.getProperty(QUEUE).evaluateAttributeExpressions().getValue(); +if(!StringUtils.isAllLowerCase(queueName)) { --- End diff -- instead of doing a custom validate this could be forced to lower case directly in the code, no? ---
[GitHub] nifi pull request #2611: NIFI-5015: Implemented Azure Queue Storage processo...
Github user pvillard31 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2611#discussion_r182835025 --- Diff: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/AbstractAzureQueueStorage.java --- @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.azure.storage.queue; + +import com.microsoft.azure.storage.CloudStorageAccount; +import com.microsoft.azure.storage.StorageCredentials; +import com.microsoft.azure.storage.StorageCredentialsSharedAccessSignature; +import com.microsoft.azure.storage.StorageException; +import com.microsoft.azure.storage.queue.CloudQueue; +import com.microsoft.azure.storage.queue.CloudQueueClient; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils; + +import java.net.URI; +import java.net.URISyntaxException; +import java.security.InvalidKeyException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +public abstract class AbstractAzureQueueStorage extends AbstractProcessor { + +public static final PropertyDescriptor QUEUE = new PropertyDescriptor.Builder() +.name("storage-cloudQueue-name") +.displayName("Queue Name") +.description("Name of the Azure Storage Queue") +.required(true) +.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) +.build(); + +public static final Relationship REL_SUCCESS = new Relationship.Builder() +.name("success") +.description("All successfully processed FlowFiles are routed to this relationship") +.build(); + +public static final Relationship REL_FAILURE = new Relationship.Builder() +.name("failure") +.description("Unsuccessful operations will be transferred to the failure relationship.") +.build(); + +private static final String FORMAT_QUEUE_CONNECTION_STRING = "DefaultEndpointsProtocol=https;AccountName=%s;AccountKey=%s"; +private static final String FORMAT_QUEUE_BASE_URI = "https://%s.queue.core.windows.net;; --- End diff -- are we sure this will never change? could be something provided by the user? ---
[GitHub] nifi pull request #2611: NIFI-5015: Implemented Azure Queue Storage processo...
Github user pvillard31 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2611#discussion_r182836695 --- Diff: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/GetAzureQueueStorage.java --- @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.azure.storage.queue; + +import com.microsoft.azure.storage.StorageException; +import com.microsoft.azure.storage.queue.CloudQueueMessage; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Collections; +import java.util.Arrays; +import java.util.Set; +import java.util.Map; +import java.util.HashMap; +import java.util.concurrent.TimeUnit; + +@SeeAlso({PutAzureQueueStorage.class}) +@InputRequirement(Requirement.INPUT_FORBIDDEN) +@Tags({"azure", "queue", "microsoft", "storage", "dequeue", "cloud"}) +@CapabilityDescription("Retrieves the messages from an Azure Queue Storage. The retrieved messages will be deleted from the queue by default. If the requirement is " + +"to consume messages without deleting them, set 'Auto Delete Messages' to 'false'.") +@WritesAttributes({ +@WritesAttribute(attribute = "azure.queue.uri", description = "The absolute URI of the configured Azure Queue Storage"), +@WritesAttribute(attribute = "azure.queue.insertionTime", description = "The time when the message was inserted into the queue storage"), +@WritesAttribute(attribute = "azure.queue.expirationTime", description = "The time when the message will expire from the queue storage"), +@WritesAttribute(attribute = "azure.queue.messageId", description = "The ID of the retrieved message"), +@WritesAttribute(attribute = "azure.queue.popReceipt", description = "The pop receipt of the retrieved message"), +}) +public class GetAzureQueueStorage extends AbstractAzureQueueStorage { + +public static final PropertyDescriptor AUTO_DELETE = new PropertyDescriptor.Builder() +.name("auto-delete-messages") +.displayName("Auto Delete Messages") +.description("Specifies whether the received message is to be automatically deleted from the queue.") +.required(true) +.allowableValues("true", "false") +.defaultValue("true") +.addValidator(StandardValidators.BOOLEAN_VALIDATOR) +.build(); + +public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder() +.name("batch-size") +.displayName("Batch Size") +.description("The number of messages to be retrieved from the queue.") +.required(true) +.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) +.defaultValue("32") +.build(); + +
[GitHub] nifi pull request #2611: NIFI-5015: Implemented Azure Queue Storage processo...
Github user pvillard31 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2611#discussion_r182836816 --- Diff: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/GetAzureQueueStorage.java --- @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.azure.storage.queue; + +import com.microsoft.azure.storage.StorageException; +import com.microsoft.azure.storage.queue.CloudQueueMessage; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Collections; +import java.util.Arrays; +import java.util.Set; +import java.util.Map; +import java.util.HashMap; +import java.util.concurrent.TimeUnit; + +@SeeAlso({PutAzureQueueStorage.class}) +@InputRequirement(Requirement.INPUT_FORBIDDEN) +@Tags({"azure", "queue", "microsoft", "storage", "dequeue", "cloud"}) +@CapabilityDescription("Retrieves the messages from an Azure Queue Storage. The retrieved messages will be deleted from the queue by default. If the requirement is " + +"to consume messages without deleting them, set 'Auto Delete Messages' to 'false'.") +@WritesAttributes({ +@WritesAttribute(attribute = "azure.queue.uri", description = "The absolute URI of the configured Azure Queue Storage"), +@WritesAttribute(attribute = "azure.queue.insertionTime", description = "The time when the message was inserted into the queue storage"), +@WritesAttribute(attribute = "azure.queue.expirationTime", description = "The time when the message will expire from the queue storage"), +@WritesAttribute(attribute = "azure.queue.messageId", description = "The ID of the retrieved message"), +@WritesAttribute(attribute = "azure.queue.popReceipt", description = "The pop receipt of the retrieved message"), +}) +public class GetAzureQueueStorage extends AbstractAzureQueueStorage { + +public static final PropertyDescriptor AUTO_DELETE = new PropertyDescriptor.Builder() +.name("auto-delete-messages") +.displayName("Auto Delete Messages") +.description("Specifies whether the received message is to be automatically deleted from the queue.") +.required(true) +.allowableValues("true", "false") +.defaultValue("true") +.addValidator(StandardValidators.BOOLEAN_VALIDATOR) +.build(); + +public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder() +.name("batch-size") +.displayName("Batch Size") +.description("The number of messages to be retrieved from the queue.") +.required(true) +.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) +.defaultValue("32") +.build(); + +
[GitHub] nifi pull request #2611: NIFI-5015: Implemented Azure Queue Storage processo...
Github user pvillard31 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2611#discussion_r182836448 --- Diff: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/GetAzureQueueStorage.java --- @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.azure.storage.queue; + +import com.microsoft.azure.storage.StorageException; +import com.microsoft.azure.storage.queue.CloudQueueMessage; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Collections; +import java.util.Arrays; +import java.util.Set; +import java.util.Map; +import java.util.HashMap; +import java.util.concurrent.TimeUnit; + +@SeeAlso({PutAzureQueueStorage.class}) +@InputRequirement(Requirement.INPUT_FORBIDDEN) +@Tags({"azure", "queue", "microsoft", "storage", "dequeue", "cloud"}) +@CapabilityDescription("Retrieves the messages from an Azure Queue Storage. The retrieved messages will be deleted from the queue by default. If the requirement is " + +"to consume messages without deleting them, set 'Auto Delete Messages' to 'false'.") +@WritesAttributes({ +@WritesAttribute(attribute = "azure.queue.uri", description = "The absolute URI of the configured Azure Queue Storage"), +@WritesAttribute(attribute = "azure.queue.insertionTime", description = "The time when the message was inserted into the queue storage"), +@WritesAttribute(attribute = "azure.queue.expirationTime", description = "The time when the message will expire from the queue storage"), +@WritesAttribute(attribute = "azure.queue.messageId", description = "The ID of the retrieved message"), +@WritesAttribute(attribute = "azure.queue.popReceipt", description = "The pop receipt of the retrieved message"), +}) +public class GetAzureQueueStorage extends AbstractAzureQueueStorage { + +public static final PropertyDescriptor AUTO_DELETE = new PropertyDescriptor.Builder() +.name("auto-delete-messages") +.displayName("Auto Delete Messages") +.description("Specifies whether the received message is to be automatically deleted from the queue.") +.required(true) +.allowableValues("true", "false") +.defaultValue("true") +.addValidator(StandardValidators.BOOLEAN_VALIDATOR) +.build(); + +public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder() +.name("batch-size") +.displayName("Batch Size") +.description("The number of messages to be retrieved from the queue.") +.required(true) +.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) +.defaultValue("32") +.build(); + +
[GitHub] nifi pull request #2611: NIFI-5015: Implemented Azure Queue Storage processo...
Github user pvillard31 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2611#discussion_r182834919 --- Diff: nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/queue/AbstractAzureQueueStorage.java --- @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.azure.storage.queue; + +import com.microsoft.azure.storage.CloudStorageAccount; +import com.microsoft.azure.storage.StorageCredentials; +import com.microsoft.azure.storage.StorageCredentialsSharedAccessSignature; +import com.microsoft.azure.storage.StorageException; +import com.microsoft.azure.storage.queue.CloudQueue; +import com.microsoft.azure.storage.queue.CloudQueueClient; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils; + +import java.net.URI; +import java.net.URISyntaxException; +import java.security.InvalidKeyException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +public abstract class AbstractAzureQueueStorage extends AbstractProcessor { + +public static final PropertyDescriptor QUEUE = new PropertyDescriptor.Builder() +.name("storage-cloudQueue-name") +.displayName("Queue Name") +.description("Name of the Azure Storage Queue") +.required(true) +.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) --- End diff -- shouldn't it be with the variable registry scope? ---
[GitHub] nifi pull request #2611: NIFI-5015: Implemented Azure Queue Storage processo...
GitHub user zenfenan opened a pull request: https://github.com/apache/nifi/pull/2611 NIFI-5015: Implemented Azure Queue Storage processors Thank you for submitting a contribution to Apache NiFi. In order to streamline the review of the contribution we ask you to ensure the following steps have been taken: ### For all changes: - [x] Is there a JIRA ticket associated with this PR? Is it referenced in the commit message? - [x] Does your PR title start with NIFI- where is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character. - [x] Has your PR been rebased against the latest commit within the target branch (typically master)? - [x] Is your initial contribution a single, squashed commit? ### For code changes: - [x] Have you ensured that the full suite of tests is executed via mvn -Pcontrib-check clean install at the root nifi folder? - [x] Have you written or updated unit tests to verify your changes? - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? - [ ] If applicable, have you updated the LICENSE file, including the main LICENSE file under nifi-assembly? - [ ] If applicable, have you updated the NOTICE file, including the main NOTICE file found under nifi-assembly? - [x] If adding new Properties, have you added .displayName in addition to .name (programmatic access) for each of the new properties? ### For documentation related changes: - [ ] Have you ensured that format looks appropriate for the output in which it is rendered? ### Note: Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible. You can merge this pull request into a Git repository by running: $ git pull https://github.com/zenfenan/nifi NIFI-5015 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/nifi/pull/2611.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2611 commit 42e01874c44923d6b27955a7e39a18007c67940c Author: zenfenanDate: 2018-04-02T02:18:37Z NIFI-5015: Implemented Azure Queue Storage processors ---