Repository: nifi
Updated Branches:
  refs/heads/master 7d242076c -> 26d90fbcc


NIFI-1833 - Azure Storage processors

Signed-off-by: Bryan Rosander <brosan...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/3488a169
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/3488a169
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/3488a169

Branch: refs/heads/master
Commit: 3488a169caa06edaf527edc1273acb36682a1d77
Parents: 7d24207
Author: Simon Elliston Ball <si...@simonellistonball.com>
Authored: Mon May 2 00:35:34 2016 +0100
Committer: Bryan Rosander <brosan...@apache.org>
Committed: Tue May 2 14:39:16 2017 -0400

----------------------------------------------------------------------
 .../nifi-azure-bundle/nifi-azure-nar/pom.xml    |   6 +
 .../nifi-azure-processors/pom.xml               |  47 ++++-
 .../azure/AbstractAzureBlobProcessor.java       |  39 ++++
 .../azure/AbstractAzureProcessor.java           |  85 ++++++++
 .../nifi/processors/azure/AzureConstants.java   |  38 ++++
 .../azure/storage/FetchAzureBlobStorage.java    | 114 +++++++++++
 .../azure/storage/ListAzureBlobStorage.java     | 193 +++++++++++++++++++
 .../azure/storage/PutAzureBlobStorage.java      | 116 +++++++++++
 .../azure/storage/utils/BlobInfo.java           | 188 ++++++++++++++++++
 .../org.apache.nifi.processor.Processor         |   5 +-
 .../azure/storage/AbstractAzureIT.java          | 106 ++++++++++
 .../azure/storage/ITFetchAzureBlobStorage.java  |  61 ++++++
 .../azure/storage/ITListAzureBlobStorage.java   |  75 +++++++
 .../azure/storage/ITPutAzureStorageBlob.java    |  51 +++++
 14 files changed, 1114 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/3488a169/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-nar/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-nar/pom.xml 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-nar/pom.xml
index f823e6a..f75bb7f 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-nar/pom.xml
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-nar/pom.xml
@@ -35,6 +35,12 @@
             <artifactId>nifi-azure-processors</artifactId>
             <version>1.2.0-SNAPSHOT</version>
         </dependency>
+        
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-standard-services-api-nar</artifactId>
+            <type>nar</type>
+        </dependency>  
     </dependencies>
 
 </project>

http://git-wip-us.apache.org/repos/asf/nifi/blob/3488a169/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml
index 9049b3f..8330bcc 100644
--- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml
@@ -1,14 +1,14 @@
 <?xml version="1.0" encoding="UTF-8"?>
 <!-- Licensed to the Apache Software Foundation (ASF) under one or more 
contributor
-license agreements. See the NOTICE file distributed with this work for 
additional
-information regarding copyright ownership. The ASF licenses this file to
-You under the Apache License, Version 2.0 (the "License"); you may not use
-this file except in compliance with the License. You may obtain a copy of
-the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
-by applicable law or agreed to in writing, software distributed under the
-License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
-OF ANY KIND, either express or implied. See the License for the specific
-language governing permissions and limitations under the License. -->
+    license agreements. See the NOTICE file distributed with this work for 
additional
+    information regarding copyright ownership. The ASF licenses this file to
+    You under the Apache License, Version 2.0 (the "License"); you may not use
+    this file except in compliance with the License. You may obtain a copy of
+    the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
+    by applicable law or agreed to in writing, software distributed under the
+    License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR 
CONDITIONS
+    OF ANY KIND, either express or implied. See the License for the specific
+    language governing permissions and limitations under the License. -->
 <project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
     <modelVersion>4.0.0</modelVersion>
 
@@ -32,11 +32,35 @@ language governing permissions and limitations under the 
License. -->
             <artifactId>nifi-utils</artifactId>
         </dependency>
         <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-ssl-context-service-api</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.avro</groupId>
+            <artifactId>avro</artifactId>
+        </dependency>
+        <dependency>
             <groupId>com.microsoft.azure</groupId>
             <artifactId>azure-eventhubs</artifactId>
             <version>0.9.0</version>
         </dependency>
         <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-core</artifactId>
+            <version>2.8.6</version>
+        </dependency>
+        <!--<dependency>
+            <groupId>com.microsoft.eventhubs.client</groupId>
+            <artifactId>eventhubs-client</artifactId>
+            <version>0.9.1</version>
+        </dependency>-->
+        <dependency>
+            <groupId>com.microsoft.azure</groupId>
+            <artifactId>azure-storage</artifactId>
+            <version>5.0.0</version>
+        </dependency>
+        <dependency>
             <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-mock</artifactId>
             <scope>test</scope>
@@ -57,5 +81,10 @@ language governing permissions and limitations under the 
License. -->
             <version>${powermock.version}</version>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-standard-processors</artifactId>
+            <version>${project.version}</version>
+        </dependency>
     </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/nifi/blob/3488a169/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureBlobProcessor.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureBlobProcessor.java
 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureBlobProcessor.java
new file mode 100644
index 0000000..82eae12
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureBlobProcessor.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+public abstract class AbstractAzureBlobProcessor extends 
AbstractAzureProcessor {
+
+    public static final PropertyDescriptor BLOB = new 
PropertyDescriptor.Builder().name("Blob").description("The filename of the 
blob").addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            
.expressionLanguageSupported(true).required(true).defaultValue("${azure.blobname}").build();
+    
+    public static final List<PropertyDescriptor> properties = Collections
+            .unmodifiableList(Arrays.asList(AzureConstants.ACCOUNT_NAME, 
AzureConstants.ACCOUNT_KEY, AzureConstants.CONTAINER, BLOB));
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return properties;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/3488a169/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureProcessor.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureProcessor.java
 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureProcessor.java
new file mode 100644
index 0000000..5ab1f8b
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureProcessor.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure;
+
+import java.net.URISyntaxException;
+import java.security.InvalidKeyException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.Relationship;
+
+import com.microsoft.azure.storage.CloudStorageAccount;
+
+public abstract class AbstractAzureProcessor extends AbstractProcessor {
+
+    public static final Relationship REL_SUCCESS = new 
Relationship.Builder().name("success").description("All FlowFiles that are 
received are routed to success").build();
+    protected static final Relationship REL_FAILURE = new 
Relationship.Builder().name("failure").description("Any failed fetches will be 
transferred to the failure relation.").build();
+    public static final Set<Relationship> RELATIONSHIPS = 
Collections.unmodifiableSet(new HashSet<>(Arrays.asList(REL_SUCCESS, 
REL_FAILURE)));
+
+    protected CloudStorageAccount createStorageConnection(ProcessContext 
context) {
+        final String accountName = 
context.getProperty(AzureConstants.ACCOUNT_NAME).evaluateAttributeExpressions().getValue();
+        final String accountKey = 
context.getProperty(AzureConstants.ACCOUNT_KEY).evaluateAttributeExpressions().getValue();
+        return createStorageConnectionFromNameAndKey(accountName, accountKey);
+    }
+
+    protected CloudStorageAccount createStorageConnection(ProcessContext 
context, FlowFile flowFile) {
+        final String accountName = 
context.getProperty(AzureConstants.ACCOUNT_NAME).evaluateAttributeExpressions(flowFile).getValue();
+        final String accountKey = 
context.getProperty(AzureConstants.ACCOUNT_KEY).evaluateAttributeExpressions(flowFile).getValue();
+        return createStorageConnectionFromNameAndKey(accountName, accountKey);
+    }
+
+    private CloudStorageAccount createStorageConnectionFromNameAndKey(String 
accountName, String accountKey) {
+        final String storageConnectionString = 
String.format("DefaultEndpointsProtocol=https;AccountName=%s;AccountKey=%s", 
accountName, accountKey);
+        try {
+            return 
createStorageAccountFromConnectionString(storageConnectionString);
+        } catch (InvalidKeyException | IllegalArgumentException | 
URISyntaxException e) {
+            throw new IllegalArgumentException(e);
+        }
+    }
+
+    /**
+     * Validates the connection string and returns the storage account. The 
connection string must be in the Azure connection string format.
+     *
+     * @param storageConnectionString
+     *            Connection string for the storage service or the emulator
+     * @return The newly created CloudStorageAccount object
+     *
+     */
+    protected static CloudStorageAccount 
createStorageAccountFromConnectionString(String storageConnectionString) throws 
IllegalArgumentException, URISyntaxException, InvalidKeyException {
+        CloudStorageAccount storageAccount;
+        try {
+            storageAccount = 
CloudStorageAccount.parse(storageConnectionString);
+        } catch (IllegalArgumentException | URISyntaxException e) {
+            throw e;
+        } catch (InvalidKeyException e) {
+            throw e;
+        }
+        return storageAccount;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/3488a169/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AzureConstants.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AzureConstants.java
 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AzureConstants.java
new file mode 100644
index 0000000..eaa234c
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AzureConstants.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.processor.util.StandardValidators;
+
+public final class AzureConstants {
+    public static final String BLOCK = "Block";
+    public static final String PAGE = "Page";
+
+    public static final PropertyDescriptor ACCOUNT_KEY = new 
PropertyDescriptor.Builder().name("Storage Account Key").description("The 
storage account key")
+            
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(true).required(true).sensitive(true).build();
+
+    public static final PropertyDescriptor ACCOUNT_NAME = new 
PropertyDescriptor.Builder().name("Storage Account Name").description("The 
storage account name")
+            
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(true).required(true).sensitive(true).build();
+
+    public static final PropertyDescriptor CONTAINER = new 
PropertyDescriptor.Builder().name("Container name").description("Name of the 
azure storage container")
+            
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(true).required(true).build();
+
+    private AzureConstants() {
+        // do not instantiate
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/3488a169/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureBlobStorage.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureBlobStorage.java
 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureBlobStorage.java
new file mode 100644
index 0000000..2229cfd
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureBlobStorage.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.storage;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.URISyntaxException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.OutputStreamCallback;
+import org.apache.nifi.processors.azure.AbstractAzureBlobProcessor;
+import org.apache.nifi.processors.azure.AzureConstants;
+
+import com.microsoft.azure.storage.CloudStorageAccount;
+import com.microsoft.azure.storage.StorageException;
+import com.microsoft.azure.storage.blob.CloudBlob;
+import com.microsoft.azure.storage.blob.CloudBlobClient;
+import com.microsoft.azure.storage.blob.CloudBlobContainer;
+
+@Tags({ "azure", "microsoft", "cloud", "storage", "blob" })
+@CapabilityDescription("Retrieves contents of an Azure Storage Blob, writing 
the contents to the content of the FlowFile")
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@WritesAttributes({
+    @WritesAttribute(attribute = "azure.length", description = "The length of 
the blob fetched")
+})
+public class FetchAzureBlobStorage extends AbstractAzureBlobProcessor {
+    public static final List<PropertyDescriptor> PROPERTIES = Collections
+            .unmodifiableList(Arrays.asList(AzureConstants.ACCOUNT_NAME, 
AzureConstants.ACCOUNT_KEY, AzureConstants.CONTAINER, BLOB));
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTIES;
+    }
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        final long startNanos = System.nanoTime();
+
+        String containerName = 
context.getProperty(AzureConstants.CONTAINER).evaluateAttributeExpressions(flowFile).getValue();
+        String blobPath = 
context.getProperty(BLOB).evaluateAttributeExpressions(flowFile).getValue();
+
+        try {
+            CloudStorageAccount storageAccount = 
createStorageConnection(context, flowFile);
+            CloudBlobClient blobClient = 
storageAccount.createCloudBlobClient();
+            CloudBlobContainer container = 
blobClient.getContainerReference(containerName);
+
+            final Map<String, String> attributes = new HashMap<>();
+            final CloudBlob blob = container.getBlockBlobReference(blobPath);
+
+            // TODO - we may be able do fancier things with ranges and
+            // distribution of download over threads, investigate
+            flowFile = session.write(flowFile, new OutputStreamCallback() {
+                @Override
+                public void process(OutputStream os) throws IOException {
+                    try {
+                        blob.download(os);
+                    } catch (StorageException e) {
+                        throw new IOException(e);
+                    }
+                }
+            });
+
+            long length = blob.getProperties().getLength();
+            attributes.put("azure.length", String.valueOf(length));
+
+            if (!attributes.isEmpty()) {
+                flowFile = session.putAllAttributes(flowFile, attributes);
+            }
+
+            session.transfer(flowFile, REL_SUCCESS);
+            final long transferMillis = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
+            session.getProvenanceReporter().fetch(flowFile, 
blob.getSnapshotQualifiedUri().toString(), transferMillis);
+
+        } catch (IllegalArgumentException | URISyntaxException | 
StorageException e1) {
+            flowFile = session.penalize(flowFile);
+            session.transfer(flowFile, REL_FAILURE);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/3488a169/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java
 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java
new file mode 100644
index 0000000..f4a793b
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.storage;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.security.InvalidKeyException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.Stateful;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.azure.AzureConstants;
+import org.apache.nifi.processors.azure.storage.utils.BlobInfo;
+import org.apache.nifi.processors.azure.storage.utils.BlobInfo.Builder;
+import org.apache.nifi.processors.standard.AbstractListProcessor;
+
+import com.microsoft.azure.storage.CloudStorageAccount;
+import com.microsoft.azure.storage.OperationContext;
+import com.microsoft.azure.storage.StorageException;
+import com.microsoft.azure.storage.StorageUri;
+import com.microsoft.azure.storage.blob.BlobListingDetails;
+import com.microsoft.azure.storage.blob.BlobProperties;
+import com.microsoft.azure.storage.blob.BlobRequestOptions;
+import com.microsoft.azure.storage.blob.CloudBlob;
+import com.microsoft.azure.storage.blob.CloudBlobClient;
+import com.microsoft.azure.storage.blob.CloudBlobContainer;
+import com.microsoft.azure.storage.blob.CloudBlockBlob;
+import com.microsoft.azure.storage.blob.ListBlobItem;
+
+@TriggerSerially
+@Tags({ "azure", "microsoft", "cloud", "storage", "blob" })
+@SeeAlso({ FetchAzureBlobStorage.class })
+@CapabilityDescription("Lists blobs in an Azure Storage container. Listing 
details are attached to an empty FlowFile for use with FetchAzureBlobStorage")
+@InputRequirement(Requirement.INPUT_FORBIDDEN)
+@WritesAttributes({ @WritesAttribute(attribute = "azure.container", 
description = "The name of the azure container"),
+        @WritesAttribute(attribute = "azure.blobname", description = "The name 
of the azure blob"), @WritesAttribute(attribute = "azure.primaryUri", 
description = "Primary location for blob content"),
+        @WritesAttribute(attribute = "azure.secondaryUri", description = 
"Secondary location for blob content"), @WritesAttribute(attribute = 
"azure.etag", description = "Etag for the Azure blob"),
+        @WritesAttribute(attribute = "azure.length", description = "Length of 
the blob"), @WritesAttribute(attribute = "azure.timestamp", description = "The 
timestamp in Azure for the blob"),
+        @WritesAttribute(attribute = "mime.type", description = "MimeType of 
the content"), @WritesAttribute(attribute = "lang", description = "Language 
code for the content"),
+        @WritesAttribute(attribute = "azure.blobtype", description = "This is 
the type of blob and can be either page or block type") })
+@Stateful(scopes = { Scope.LOCAL, Scope.CLUSTER }, description = "After 
performing a listing of blobs, the timestamp of the newest blob is stored. "
+        + "This allows the Processor to list only blobs that have been added 
or modified after " + "this date the next time that the Processor is run.")
+public class ListAzureBlobStorage extends AbstractListProcessor<BlobInfo> {
+
+    private static final PropertyDescriptor PREFIX = new 
PropertyDescriptor.Builder().name("Prefix").description("Search prefix for 
listing").addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(true).required(false).build();
+
+    public static final List<PropertyDescriptor> PROPERTIES = 
Collections.unmodifiableList(Arrays.asList(AzureConstants.ACCOUNT_NAME, 
AzureConstants.ACCOUNT_KEY, AzureConstants.CONTAINER, PREFIX));
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTIES;
+    }
+
+    @Override
+    protected Map<String, String> createAttributes(BlobInfo entity, 
ProcessContext context) {
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("azure.etag", entity.getEtag());
+        attributes.put("azure.primaryUri", entity.getPrimaryUri());
+        attributes.put("azure.secondaryUri", entity.getSecondaryUri());
+        attributes.put("azure.blobname", entity.getName());
+        attributes.put("azure.blobtype", entity.getBlobType());
+        attributes.put("azure.length", String.valueOf(entity.getLength()));
+        attributes.put("azure.timestamp", 
String.valueOf(entity.getTimestamp()));
+        attributes.put("mime.type", entity.getContentType());
+        attributes.put("lang", entity.getContentLanguage());
+
+        return attributes;
+    }
+
+    @Override
+    protected String getPath(final ProcessContext context) {
+        return 
context.getProperty(AzureConstants.CONTAINER).evaluateAttributeExpressions().getValue();
+    }
+
+    @Override
+    protected boolean isListingResetNecessary(final PropertyDescriptor 
property) {
+        // TODO - implement
+        return false;
+    }
+
+    @Override
+    protected Scope getStateScope(final ProcessContext context) {
+        return Scope.CLUSTER;
+    }
+
+    @Override
+    protected List<BlobInfo> performListing(final ProcessContext context, 
final Long minTimestamp) throws IOException {
+        String containerName = 
context.getProperty(AzureConstants.CONTAINER).evaluateAttributeExpressions().getValue();
+        String prefix = 
context.getProperty(PREFIX).evaluateAttributeExpressions().getValue();
+        if (prefix == null) {
+            prefix = "";
+        }
+        final List<BlobInfo> listing = new ArrayList<>();
+        try {
+            CloudStorageAccount storageAccount = 
createStorageConnection(context);
+            CloudBlobClient blobClient = 
storageAccount.createCloudBlobClient();
+            CloudBlobContainer container = 
blobClient.getContainerReference(containerName);
+
+            BlobRequestOptions blobRequestOptions = null;
+            OperationContext operationContext = null;
+
+            for (ListBlobItem blob : container.listBlobs(prefix, true, 
EnumSet.of(BlobListingDetails.METADATA), blobRequestOptions, operationContext)) 
{
+                if (blob instanceof CloudBlob) {
+                    CloudBlob cloudBlob = (CloudBlob) blob;
+                    BlobProperties properties = cloudBlob.getProperties();
+                    StorageUri uri = 
cloudBlob.getSnapshotQualifiedStorageUri();
+
+                    Builder builder = new 
BlobInfo.Builder().primaryUri(uri.getPrimaryUri().toString()).secondaryUri(uri.getSecondaryUri().toString()).contentType(properties.getContentType())
+                            
.contentLanguage(properties.getContentLanguage()).etag(properties.getEtag()).lastModifiedTime(properties.getLastModified().getTime()).length(properties.getLength());
+
+                    if (blob instanceof CloudBlockBlob) {
+                        builder.blobType(AzureConstants.BLOCK);
+                    } else {
+                        builder.blobType(AzureConstants.PAGE);
+                    }
+                    listing.add(builder.build());
+                }
+            }
+        } catch (IllegalArgumentException | URISyntaxException | 
StorageException e) {
+            throw (new IOException(e));
+        }
+        return listing;
+    }
+
+    protected static CloudStorageAccount 
createStorageConnection(ProcessContext context) {
+        final String accountName = 
context.getProperty(AzureConstants.ACCOUNT_NAME).evaluateAttributeExpressions().getValue();
+        final String accountKey = 
context.getProperty(AzureConstants.ACCOUNT_KEY).evaluateAttributeExpressions().getValue();
+        final String storageConnectionString = 
String.format("DefaultEndpointsProtocol=http;AccountName=%s;AccountKey=%s", 
accountName, accountKey);
+        try {
+            return 
createStorageAccountFromConnectionString(storageConnectionString);
+        } catch (InvalidKeyException | URISyntaxException e) {
+            throw new IllegalArgumentException(e);
+        }
+    }
+
+    /**
+     * Validates the connection string and returns the storage account. The 
connection string must be in the Azure connection string format.
+     *
+     * @param storageConnectionString
+     *            Connection string for the storage service or the emulator
+     * @return The newly created CloudStorageAccount object
+     *
+     */
+    private static CloudStorageAccount 
createStorageAccountFromConnectionString(String storageConnectionString) throws 
IllegalArgumentException, URISyntaxException, InvalidKeyException {
+
+        CloudStorageAccount storageAccount;
+        try {
+            storageAccount = 
CloudStorageAccount.parse(storageConnectionString);
+        } catch (IllegalArgumentException | URISyntaxException e) {
+            System.out.println("\nConnection string specifies an invalid 
URI.");
+            System.out.println("Please confirm the connection string is in the 
Azure connection string format.");
+            throw e;
+        } catch (InvalidKeyException e) {
+            System.out.println("\nConnection string specifies an invalid 
key.");
+            System.out.println("Please confirm the AccountName and AccountKey 
in the connection string are valid.");
+            throw e;
+        }
+        return storageAccount;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/3488a169/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage.java
 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage.java
new file mode 100644
index 0000000..1327a0b
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.storage;
+
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URISyntaxException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.processors.azure.AbstractAzureBlobProcessor;
+import org.apache.nifi.processors.azure.AzureConstants;
+
+import com.microsoft.azure.storage.CloudStorageAccount;
+import com.microsoft.azure.storage.StorageException;
+import com.microsoft.azure.storage.blob.BlobProperties;
+import com.microsoft.azure.storage.blob.CloudBlob;
+import com.microsoft.azure.storage.blob.CloudBlobClient;
+import com.microsoft.azure.storage.blob.CloudBlobContainer;
+
+@Tags({ "azure", "microsoft", "cloud", "storage", "blob" })
+@SeeAlso({ ListAzureBlobStorage.class, FetchAzureBlobStorage.class })
+@CapabilityDescription("Puts content into an Azure Storage Blob")
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@WritesAttributes({ @WritesAttribute(attribute = "azure.container", 
description = "The name of the azure container"),
+        @WritesAttribute(attribute = "azure.blobname", description = "The name 
of the azure blob"),
+        @WritesAttribute(attribute = "azure.primaryUri", description = 
"Primary location for blob content"),
+        @WritesAttribute(attribute = "azure.etag", description = "Etag for the 
Azure blob"),
+        @WritesAttribute(attribute = "azure.length", description = "Length of 
the blob"),
+        @WritesAttribute(attribute = "azure.timestamp", description = "The 
timestamp in Azure for the blob"),
+        @WritesAttribute(attribute = "azure.blobtype", description = "This is 
the type of blob and can be either page or block type") })
+public class PutAzureBlobStorage extends AbstractAzureBlobProcessor {
+
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        final long startNanos = System.nanoTime();
+
+        String containerName = 
context.getProperty(AzureConstants.CONTAINER).evaluateAttributeExpressions(flowFile).getValue();
+
+        String blobPath = 
context.getProperty(BLOB).evaluateAttributeExpressions(flowFile).getValue();
+
+        try {
+            CloudStorageAccount storageAccount = 
createStorageConnection(context, flowFile);
+            CloudBlobClient blobClient = 
storageAccount.createCloudBlobClient();
+            CloudBlobContainer container = 
blobClient.getContainerReference(containerName);
+
+            CloudBlob blob = container.getBlockBlobReference(blobPath);
+
+            final Map<String, String> attributes = new HashMap<>();
+            long length = flowFile.getSize();
+            session.read(flowFile, new InputStreamCallback() {
+                @Override
+                public void process(final InputStream rawIn) throws 
IOException {
+                    final InputStream in = new BufferedInputStream(rawIn);
+                    try {
+                        blob.upload(in, length);
+                        BlobProperties properties = blob.getProperties();
+                        attributes.put("azure.container", containerName);
+                        attributes.put("azure.primaryUri", 
blob.getSnapshotQualifiedUri().toString());
+                        attributes.put("azure.etag", properties.getEtag());
+                        attributes.put("azure.length", String.valueOf(length));
+                        attributes.put("azure.timestamp", 
String.valueOf(properties.getLastModified()));
+                    } catch (StorageException | URISyntaxException e) {
+                        throw new IOException(e);
+                    }
+                }
+            });
+
+            if (!attributes.isEmpty()) {
+                flowFile = session.putAllAttributes(flowFile, attributes);
+            }
+            session.transfer(flowFile, REL_SUCCESS);
+
+            final long transferMillis = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
+            session.getProvenanceReporter().send(flowFile, 
blob.getSnapshotQualifiedUri().toString(), transferMillis);
+
+        } catch (IllegalArgumentException | URISyntaxException | 
StorageException e) {
+            getLogger().error("Failed to put Azure blob {}", new 
Object[]{blobPath}, e);
+            flowFile = session.penalize(flowFile);
+            session.transfer(flowFile, REL_FAILURE);
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/3488a169/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/BlobInfo.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/BlobInfo.java
 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/BlobInfo.java
new file mode 100644
index 0000000..d429878
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/BlobInfo.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.storage.utils;
+
+import java.io.Serializable;
+
+import org.apache.nifi.processors.standard.util.ListableEntity;
+
+public class BlobInfo implements Comparable<BlobInfo>, Serializable, 
ListableEntity {
+    private static final long serialVersionUID = 1L;
+
+    private final String primaryUri;
+    private final String secondaryUri;
+    private final String contentType;
+    private final String contentLanguage;
+    private final String etag;
+    private final long lastModifiedTime;
+    private final long length;
+    private final String blobType;
+
+    public static long getSerialversionuid() {
+        return serialVersionUID;
+    }
+
+    public String getPrimaryUri() {
+        return primaryUri;
+    }
+
+    public String getSecondaryUri() {
+        return secondaryUri;
+    }
+
+    public String getContentType() {
+        return contentType;
+    }
+
+    public String getContentLanguage() {
+        return contentLanguage;
+    }
+
+    public String getEtag() {
+        return etag;
+    }
+
+    public long getLastModifiedTime() {
+        return lastModifiedTime;
+    }
+
+    public long getLength() {
+        return length;
+    }
+
+    public String getBlobType() {
+        return blobType;
+    }
+
+    public static final class Builder {
+        private String primaryUri;
+        private String secondaryUri;
+        private String contentType;
+        private String contentLanguage;
+        private String etag;
+        private long lastModifiedTime;
+        private long length;
+        private String blobType;
+
+        public Builder primaryUri(String primaryUri) {
+            this.primaryUri = primaryUri;
+            return this;
+        }
+
+        public Builder secondaryUri(String secondaryUri) {
+            this.secondaryUri = secondaryUri;
+            return this;
+        }
+
+        public Builder contentType(String contentType) {
+            this.contentType = contentType;
+            return this;
+        }
+
+        public Builder contentLanguage(String contentLanguage) {
+            this.contentLanguage = contentLanguage;
+            return this;
+        }
+
+        public Builder etag(String etag) {
+            this.etag = etag;
+            return this;
+        }
+
+        public Builder lastModifiedTime(long lastModifiedTime) {
+            this.lastModifiedTime = lastModifiedTime;
+            return this;
+        }
+
+        public Builder length(long length) {
+            this.length = length;
+            return this;
+        }
+
+        public Builder blobType(String blobType) {
+            this.blobType = blobType;
+            return this;
+        }
+
+        public BlobInfo build() {
+            return new BlobInfo(this);
+        }
+
+    }
+
+    @Override
+    public int hashCode() {
+        final int prime = 31;
+        int result = 1;
+        result = prime * result + ((etag == null) ? 0 : etag.hashCode());
+        return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj) {
+            return true;
+        }
+        if (obj == null) {
+            return false;
+        }
+        if (getClass() != obj.getClass()) {
+            return false;
+        }
+        BlobInfo other = (BlobInfo) obj;
+        if (etag == null) {
+            if (other.etag != null) {
+                return false;
+            }
+        } else if (!etag.equals(other.etag)) {
+            return false;
+        }
+        return true;
+    }
+
+    @Override
+    public int compareTo(BlobInfo o) {
+        return etag.compareTo(o.etag);
+    }
+
+    protected BlobInfo(final Builder builder) {
+        this.primaryUri = builder.primaryUri;
+        this.secondaryUri = builder.secondaryUri;
+        this.contentType = builder.contentType;
+        this.contentLanguage = builder.contentLanguage;
+        this.etag = builder.etag;
+        this.lastModifiedTime = builder.lastModifiedTime;
+        this.length = builder.length;
+        this.blobType = builder.blobType;
+    }
+
+    @Override
+    public String getName() {
+        String primaryUri = getPrimaryUri();
+        return primaryUri.substring(primaryUri.lastIndexOf('/') + 1);
+    }
+
+    @Override
+    public String getIdentifier() {
+        return getPrimaryUri();
+    }
+
+    @Override
+    public long getTimestamp() {
+        return getLastModifiedTime();
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/3488a169/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index 178e52c..84b3300 100644
--- 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -13,4 +13,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 org.apache.nifi.processors.azure.eventhub.PutAzureEventHub
-org.apache.nifi.processors.azure.eventhub.GetAzureEventHub
\ No newline at end of file
+org.apache.nifi.processors.azure.eventhub.GetAzureEventHub
+org.apache.nifi.processors.azure.storage.FetchAzureBlobStorage
+org.apache.nifi.processors.azure.storage.ListAzureBlobStorage
+org.apache.nifi.processors.azure.storage.PutAzureBlobStorage
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/3488a169/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureIT.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureIT.java
 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureIT.java
new file mode 100644
index 0000000..34702eb
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureIT.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.storage;
+
+import static org.junit.Assert.fail;
+
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.security.InvalidKeyException;
+import java.util.Properties;
+
+import org.apache.nifi.util.file.FileUtils;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+import com.microsoft.azure.storage.CloudStorageAccount;
+import com.microsoft.azure.storage.StorageException;
+import com.microsoft.azure.storage.blob.CloudBlob;
+import com.microsoft.azure.storage.blob.CloudBlobClient;
+import com.microsoft.azure.storage.blob.CloudBlobContainer;
+import com.microsoft.azure.storage.blob.DeleteSnapshotsOption;
+import com.microsoft.azure.storage.blob.ListBlobItem;
+import com.microsoft.azure.storage.table.CloudTable;
+import com.microsoft.azure.storage.table.CloudTableClient;
+
+public abstract class AbstractAzureIT {
+    protected static final String CREDENTIALS_FILE = 
System.getProperty("user.home") + "/azure-credentials.PROPERTIES";
+    public static final String TEST_CONTAINER_NAME = "nifitest";
+
+    private static final Properties CONFIG;
+    protected static final String TEST_BLOB_NAME = "testing";
+    protected static final String TEST_TABLE_NAME = "testing";
+
+    static {
+        final FileInputStream fis;
+        CONFIG = new Properties();
+        try {
+            fis = new FileInputStream(CREDENTIALS_FILE);
+            try {
+                CONFIG.load(fis);
+            } catch (IOException e) {
+                fail("Could not open credentials file " + CREDENTIALS_FILE + 
": " + e.getLocalizedMessage());
+            } finally {
+                FileUtils.closeQuietly(fis);
+            }
+        } catch (FileNotFoundException e) {
+            fail("Could not open credentials file " + CREDENTIALS_FILE + ": " 
+ e.getLocalizedMessage());
+        }
+
+    }
+
+    @BeforeClass
+    public static void oneTimeSetup() throws StorageException, 
InvalidKeyException, URISyntaxException {
+        CloudBlobContainer container = getContainer();
+        container.createIfNotExists();
+    }
+
+    @AfterClass
+    public static void tearDown() throws InvalidKeyException, 
URISyntaxException, StorageException {
+        CloudBlobContainer container = getContainer();
+        for (ListBlobItem blob : container.listBlobs()) {
+            if (blob instanceof CloudBlob) {
+                ((CloudBlob) 
blob).delete(DeleteSnapshotsOption.INCLUDE_SNAPSHOTS, null, null, null);
+            }
+        }
+    }
+
+    public static String getAccountName() {
+        return CONFIG.getProperty("accountName");
+    }
+
+    public static String getAccountKey() {
+        return CONFIG.getProperty("accountKey");
+    }
+
+    protected static CloudBlobContainer getContainer() throws 
InvalidKeyException, URISyntaxException, StorageException {
+        String storageConnectionString = 
String.format("DefaultEndpointsProtocol=http;AccountName=%s;AccountKey=%s", 
getAccountName(), getAccountKey());
+        CloudStorageAccount storageAccount = 
CloudStorageAccount.parse(storageConnectionString);
+        CloudBlobClient blobClient = storageAccount.createCloudBlobClient();
+        return blobClient.getContainerReference(TEST_CONTAINER_NAME);
+    }
+
+    protected static CloudTable getTable() throws InvalidKeyException, 
URISyntaxException, StorageException {
+        String storageConnectionString = 
String.format("DefaultEndpointsProtocol=http;AccountName=%s;AccountKey=%s", 
getAccountName(), getAccountKey());
+        CloudStorageAccount storageAccount = 
CloudStorageAccount.parse(storageConnectionString);
+        CloudTableClient tableClient = storageAccount.createCloudTableClient();
+        return tableClient.getTableReference(TEST_TABLE_NAME);
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/3488a169/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITFetchAzureBlobStorage.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITFetchAzureBlobStorage.java
 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITFetchAzureBlobStorage.java
new file mode 100644
index 0000000..1e8a8f7
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITFetchAzureBlobStorage.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.storage;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.security.InvalidKeyException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.nifi.processors.azure.AzureConstants;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Test;
+
+import com.microsoft.azure.storage.StorageException;
+
+public class ITFetchAzureBlobStorage extends AbstractAzureIT {
+
+    @Test
+    public void testFetchingBlob() throws InvalidKeyException, 
URISyntaxException, StorageException, IOException {
+        final TestRunner runner = TestRunners.newTestRunner(new 
FetchAzureBlobStorage());
+
+        runner.setValidateExpressionUsage(true);
+
+        runner.setProperty(AzureConstants.ACCOUNT_NAME, getAccountName());
+        runner.setProperty(AzureConstants.ACCOUNT_KEY, getAccountKey());
+        runner.setProperty(AzureConstants.CONTAINER, TEST_CONTAINER_NAME);
+        runner.setProperty(FetchAzureBlobStorage.BLOB, "${azure.blobname}");
+
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("azure.primaryUri", "http://"; + getAccountName() + 
".blob.core.windows.net/" + TEST_CONTAINER_NAME + "/" + TEST_BLOB_NAME);
+        attributes.put("azure.blobname", TEST_BLOB_NAME);
+        attributes.put("azure.blobtype", AzureConstants.BLOCK);
+        runner.enqueue(new byte[0], attributes);
+        runner.run();
+
+        
runner.assertAllFlowFilesTransferred(FetchAzureBlobStorage.REL_SUCCESS, 1);
+        List<MockFlowFile> flowFilesForRelationship = 
runner.getFlowFilesForRelationship(FetchAzureBlobStorage.REL_SUCCESS);
+        for (MockFlowFile flowFile : flowFilesForRelationship) {
+            flowFile.assertContentEquals("0123456789".getBytes());
+            flowFile.assertAttributeEquals("azure.length", "10");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/3488a169/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITListAzureBlobStorage.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITListAzureBlobStorage.java
 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITListAzureBlobStorage.java
new file mode 100644
index 0000000..277538c
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITListAzureBlobStorage.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.storage;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URISyntaxException;
+import java.security.InvalidKeyException;
+
+import org.apache.nifi.processors.azure.AzureConstants;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.microsoft.azure.storage.StorageException;
+import com.microsoft.azure.storage.blob.CloudBlob;
+import com.microsoft.azure.storage.blob.CloudBlobContainer;
+
+public class ITListAzureBlobStorage extends AbstractAzureIT {
+
+    @BeforeClass
+    public static void setupSomeFiles() throws InvalidKeyException, 
URISyntaxException, StorageException, IOException {
+        CloudBlobContainer container = getContainer();
+        container.createIfNotExists();
+
+        CloudBlob blob = container.getBlockBlobReference(TEST_BLOB_NAME);
+        byte[] buf = "0123456789".getBytes();
+        InputStream in = new ByteArrayInputStream(buf);
+        blob.upload(in, 10);
+    }
+
+    @AfterClass
+    public static void tearDown() throws InvalidKeyException, 
URISyntaxException, StorageException {
+        CloudBlobContainer container = getContainer();
+        container.deleteIfExists();
+    }
+
+    @Test
+    public void testListsAzureBlobStorageContent() {
+        final TestRunner runner = TestRunners.newTestRunner(new 
ListAzureBlobStorage());
+
+        runner.setProperty(AzureConstants.ACCOUNT_NAME, getAccountName());
+        runner.setProperty(AzureConstants.ACCOUNT_KEY, getAccountKey());
+        runner.setProperty(AzureConstants.CONTAINER, TEST_CONTAINER_NAME);
+
+        // requires multiple runs to deal with List processor checking
+        runner.run(3);
+
+        runner.assertTransferCount(ListAzureBlobStorage.REL_SUCCESS, 1);
+        runner.assertAllFlowFilesTransferred(ListAzureBlobStorage.REL_SUCCESS, 
1);
+
+        for (MockFlowFile entry : 
runner.getFlowFilesForRelationship(ListAzureBlobStorage.REL_SUCCESS)) {
+            entry.assertAttributeEquals("azure.length", "10");
+            entry.assertAttributeEquals("mime.type", 
"application/octet-stream");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/3488a169/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureStorageBlob.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureStorageBlob.java
 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureStorageBlob.java
new file mode 100644
index 0000000..0308add
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureStorageBlob.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.storage;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.nifi.processors.azure.AzureConstants;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Test;
+
+public class ITPutAzureStorageBlob extends AbstractAzureIT {
+
+    @Test
+    public void testPuttingBlob() throws IOException {
+        final TestRunner runner = TestRunners.newTestRunner(new 
PutAzureBlobStorage());
+
+        runner.setValidateExpressionUsage(true);
+
+        runner.setProperty(AzureConstants.ACCOUNT_NAME, getAccountName());
+        runner.setProperty(AzureConstants.ACCOUNT_KEY, getAccountKey());
+        runner.setProperty(AzureConstants.CONTAINER, TEST_CONTAINER_NAME);
+        runner.setProperty(FetchAzureBlobStorage.BLOB, "testingUpload");
+
+        runner.enqueue("0123456789".getBytes());
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(PutAzureBlobStorage.REL_SUCCESS, 
1);
+        List<MockFlowFile> flowFilesForRelationship = 
runner.getFlowFilesForRelationship(PutAzureBlobStorage.REL_SUCCESS);
+        for (MockFlowFile flowFile : flowFilesForRelationship) {
+            flowFile.assertContentEquals("0123456789".getBytes());
+            flowFile.assertAttributeEquals("azure.length", "10");
+        }
+    }
+}

Reply via email to