NIFI-1833 - Addressed issues from PR review. Addressed dependency issues from the review. Addressed a checkstyle issue. Review: reworded the descriptions. Review: implemented the reset condition logic. Review: dropped static qualifier from method signatures, not required really Review: removed sys.out, inlined a single method to get access to the ProcessContext.getName() Switched to HTTPS as per MSFT recommendation. Some DRY. Dropped cruft. Addressing review suggestions from 4/5 Review: documentation improvements Review: documentation improvements
This closes #1636. Signed-off-by: Bryan Rosander <brosan...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/f30c8169 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/f30c8169 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/f30c8169 Branch: refs/heads/master Commit: f30c8169abe8705911b0cdac18aa90f3a217dc78 Parents: 3488a16 Author: Andrew Grande <apere...@gmail.com> Authored: Tue Apr 4 14:57:21 2017 -0400 Committer: Bryan Rosander <brosan...@apache.org> Committed: Tue May 2 14:39:37 2017 -0400 ---------------------------------------------------------------------- .../nifi-azure-bundle/nifi-azure-nar/pom.xml | 2 +- .../nifi-azure-processors/pom.xml | 1 + .../azure/AbstractAzureBlobProcessor.java | 6 +- .../azure/AbstractAzureProcessor.java | 12 ++-- .../nifi/processors/azure/AzureConstants.java | 3 + .../azure/storage/FetchAzureBlobStorage.java | 20 +++--- .../azure/storage/ListAzureBlobStorage.java | 73 ++++++++------------ .../azure/storage/PutAzureBlobStorage.java | 40 +++++------ .../azure/storage/utils/BlobInfo.java | 2 +- .../additionalDetails.html | 39 +++++++++++ .../additionalDetails.html | 39 +++++++++++ .../additionalDetails.html | 39 +++++++++++ .../azure/storage/AbstractAzureIT.java | 35 ++++------ .../azure/storage/ITFetchAzureBlobStorage.java | 3 +- 14 files changed, 205 insertions(+), 109 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/f30c8169/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-nar/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-nar/pom.xml b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-nar/pom.xml index f75bb7f..e6c3c9b 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-nar/pom.xml +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-nar/pom.xml @@ -38,7 +38,7 @@ <dependency> <groupId>org.apache.nifi</groupId> - <artifactId>nifi-standard-services-api-nar</artifactId> + <artifactId>nifi-standard-nar</artifactId> <type>nar</type> </dependency> </dependencies> http://git-wip-us.apache.org/repos/asf/nifi/blob/f30c8169/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml index 8330bcc..9b4f28b 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/pom.xml @@ -85,6 +85,7 @@ <groupId>org.apache.nifi</groupId> <artifactId>nifi-standard-processors</artifactId> <version>${project.version}</version> + <scope>provided</scope> </dependency> </dependencies> </project> http://git-wip-us.apache.org/repos/asf/nifi/blob/f30c8169/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureBlobProcessor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureBlobProcessor.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureBlobProcessor.java index 82eae12..2026711 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureBlobProcessor.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureBlobProcessor.java @@ -27,13 +27,13 @@ public abstract class AbstractAzureBlobProcessor extends AbstractAzureProcessor public static final PropertyDescriptor BLOB = new PropertyDescriptor.Builder().name("Blob").description("The filename of the blob").addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .expressionLanguageSupported(true).required(true).defaultValue("${azure.blobname}").build(); - - public static final List<PropertyDescriptor> properties = Collections + + private static final List<PropertyDescriptor> PROPERTIES = Collections .unmodifiableList(Arrays.asList(AzureConstants.ACCOUNT_NAME, AzureConstants.ACCOUNT_KEY, AzureConstants.CONTAINER, BLOB)); @Override protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { - return properties; + return PROPERTIES; } } http://git-wip-us.apache.org/repos/asf/nifi/blob/f30c8169/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureProcessor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureProcessor.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureProcessor.java index 5ab1f8b..c95ee99 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureProcessor.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureProcessor.java @@ -32,8 +32,8 @@ import com.microsoft.azure.storage.CloudStorageAccount; public abstract class AbstractAzureProcessor extends AbstractProcessor { - public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("All FlowFiles that are received are routed to success").build(); - protected static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("Any failed fetches will be transferred to the failure relation.").build(); + public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("All successfully processed FlowFiles are routed to this relationship").build(); + public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("Unsuccessful operations will be transferred to the failure relationship.").build(); public static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE))); protected CloudStorageAccount createStorageConnection(ProcessContext context) { @@ -49,7 +49,7 @@ public abstract class AbstractAzureProcessor extends AbstractProcessor { } private CloudStorageAccount createStorageConnectionFromNameAndKey(String accountName, String accountKey) { - final String storageConnectionString = String.format("DefaultEndpointsProtocol=https;AccountName=%s;AccountKey=%s", accountName, accountKey); + final String storageConnectionString = String.format(AzureConstants.FORMAT_DEFAULT_CONNECTION_STRING, accountName, accountKey); try { return createStorageAccountFromConnectionString(storageConnectionString); } catch (InvalidKeyException | IllegalArgumentException | URISyntaxException e) { @@ -65,13 +65,11 @@ public abstract class AbstractAzureProcessor extends AbstractProcessor { * @return The newly created CloudStorageAccount object * */ - protected static CloudStorageAccount createStorageAccountFromConnectionString(String storageConnectionString) throws IllegalArgumentException, URISyntaxException, InvalidKeyException { + private static CloudStorageAccount createStorageAccountFromConnectionString(String storageConnectionString) throws IllegalArgumentException, URISyntaxException, InvalidKeyException { CloudStorageAccount storageAccount; try { storageAccount = CloudStorageAccount.parse(storageConnectionString); - } catch (IllegalArgumentException | URISyntaxException e) { - throw e; - } catch (InvalidKeyException e) { + } catch (IllegalArgumentException | URISyntaxException | InvalidKeyException e) { throw e; } return storageAccount; http://git-wip-us.apache.org/repos/asf/nifi/blob/f30c8169/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AzureConstants.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AzureConstants.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AzureConstants.java index eaa234c..9a51030 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AzureConstants.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AzureConstants.java @@ -32,6 +32,9 @@ public final class AzureConstants { public static final PropertyDescriptor CONTAINER = new PropertyDescriptor.Builder().name("Container name").description("Name of the azure storage container") .addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(true).required(true).build(); + // use HTTPS by default as per MSFT recommendation + public static final String FORMAT_DEFAULT_CONNECTION_STRING = "DefaultEndpointsProtocol=https;AccountName=%s;AccountKey=%s"; + private AzureConstants() { // do not instantiate } http://git-wip-us.apache.org/repos/asf/nifi/blob/f30c8169/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureBlobStorage.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureBlobStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureBlobStorage.java index 2229cfd..163a962 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureBlobStorage.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureBlobStorage.java @@ -17,7 +17,6 @@ package org.apache.nifi.processors.azure.storage; import java.io.IOException; -import java.io.OutputStream; import java.net.URISyntaxException; import java.util.Arrays; import java.util.Collections; @@ -31,13 +30,13 @@ import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.exception.ProcessException; -import org.apache.nifi.processor.io.OutputStreamCallback; import org.apache.nifi.processors.azure.AbstractAzureBlobProcessor; import org.apache.nifi.processors.azure.AzureConstants; @@ -49,12 +48,14 @@ import com.microsoft.azure.storage.blob.CloudBlobContainer; @Tags({ "azure", "microsoft", "cloud", "storage", "blob" }) @CapabilityDescription("Retrieves contents of an Azure Storage Blob, writing the contents to the content of the FlowFile") +@SeeAlso({ ListAzureBlobStorage.class, PutAzureBlobStorage.class }) @InputRequirement(Requirement.INPUT_REQUIRED) @WritesAttributes({ @WritesAttribute(attribute = "azure.length", description = "The length of the blob fetched") }) public class FetchAzureBlobStorage extends AbstractAzureBlobProcessor { - public static final List<PropertyDescriptor> PROPERTIES = Collections + + private static final List<PropertyDescriptor> PROPERTIES = Collections .unmodifiableList(Arrays.asList(AzureConstants.ACCOUNT_NAME, AzureConstants.ACCOUNT_KEY, AzureConstants.CONTAINER, BLOB)); @Override @@ -84,14 +85,11 @@ public class FetchAzureBlobStorage extends AbstractAzureBlobProcessor { // TODO - we may be able do fancier things with ranges and // distribution of download over threads, investigate - flowFile = session.write(flowFile, new OutputStreamCallback() { - @Override - public void process(OutputStream os) throws IOException { - try { - blob.download(os); - } catch (StorageException e) { - throw new IOException(e); - } + flowFile = session.write(flowFile, os -> { + try { + blob.download(os); + } catch (StorageException e) { + throw new IOException(e); } }); http://git-wip-us.apache.org/repos/asf/nifi/blob/f30c8169/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java index f4a793b..f8a6c4d 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage.java @@ -46,12 +46,10 @@ import org.apache.nifi.processors.azure.storage.utils.BlobInfo.Builder; import org.apache.nifi.processors.standard.AbstractListProcessor; import com.microsoft.azure.storage.CloudStorageAccount; -import com.microsoft.azure.storage.OperationContext; import com.microsoft.azure.storage.StorageException; import com.microsoft.azure.storage.StorageUri; import com.microsoft.azure.storage.blob.BlobListingDetails; import com.microsoft.azure.storage.blob.BlobProperties; -import com.microsoft.azure.storage.blob.BlobRequestOptions; import com.microsoft.azure.storage.blob.CloudBlob; import com.microsoft.azure.storage.blob.CloudBlobClient; import com.microsoft.azure.storage.blob.CloudBlobContainer; @@ -60,14 +58,18 @@ import com.microsoft.azure.storage.blob.ListBlobItem; @TriggerSerially @Tags({ "azure", "microsoft", "cloud", "storage", "blob" }) -@SeeAlso({ FetchAzureBlobStorage.class }) +@SeeAlso({ FetchAzureBlobStorage.class, PutAzureBlobStorage.class }) @CapabilityDescription("Lists blobs in an Azure Storage container. Listing details are attached to an empty FlowFile for use with FetchAzureBlobStorage") @InputRequirement(Requirement.INPUT_FORBIDDEN) -@WritesAttributes({ @WritesAttribute(attribute = "azure.container", description = "The name of the azure container"), - @WritesAttribute(attribute = "azure.blobname", description = "The name of the azure blob"), @WritesAttribute(attribute = "azure.primaryUri", description = "Primary location for blob content"), - @WritesAttribute(attribute = "azure.secondaryUri", description = "Secondary location for blob content"), @WritesAttribute(attribute = "azure.etag", description = "Etag for the Azure blob"), - @WritesAttribute(attribute = "azure.length", description = "Length of the blob"), @WritesAttribute(attribute = "azure.timestamp", description = "The timestamp in Azure for the blob"), - @WritesAttribute(attribute = "mime.type", description = "MimeType of the content"), @WritesAttribute(attribute = "lang", description = "Language code for the content"), +@WritesAttributes({ @WritesAttribute(attribute = "azure.container", description = "The name of the Azure container"), + @WritesAttribute(attribute = "azure.blobname", description = "The name of the Azure blob"), + @WritesAttribute(attribute = "azure.primaryUri", description = "Primary location for blob content"), + @WritesAttribute(attribute = "azure.secondaryUri", description = "Secondary location for blob content"), + @WritesAttribute(attribute = "azure.etag", description = "Etag for the Azure blob"), + @WritesAttribute(attribute = "azure.length", description = "Length of the blob"), + @WritesAttribute(attribute = "azure.timestamp", description = "The timestamp in Azure for the blob"), + @WritesAttribute(attribute = "mime.type", description = "MimeType of the content"), + @WritesAttribute(attribute = "lang", description = "Language code for the content"), @WritesAttribute(attribute = "azure.blobtype", description = "This is the type of blob and can be either page or block type") }) @Stateful(scopes = { Scope.LOCAL, Scope.CLUSTER }, description = "After performing a listing of blobs, the timestamp of the newest blob is stored. " + "This allows the Processor to list only blobs that have been added or modified after " + "this date the next time that the Processor is run.") @@ -76,7 +78,7 @@ public class ListAzureBlobStorage extends AbstractListProcessor<BlobInfo> { private static final PropertyDescriptor PREFIX = new PropertyDescriptor.Builder().name("Prefix").description("Search prefix for listing").addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .expressionLanguageSupported(true).required(false).build(); - public static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(AzureConstants.ACCOUNT_NAME, AzureConstants.ACCOUNT_KEY, AzureConstants.CONTAINER, PREFIX)); + private static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(AzureConstants.ACCOUNT_NAME, AzureConstants.ACCOUNT_KEY, AzureConstants.CONTAINER, PREFIX)); @Override protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { @@ -106,8 +108,10 @@ public class ListAzureBlobStorage extends AbstractListProcessor<BlobInfo> { @Override protected boolean isListingResetNecessary(final PropertyDescriptor property) { - // TODO - implement - return false; + // re-list if configuration changed, but not when security keys are rolled (not included in the condition) + return PREFIX.equals(property) + || AzureConstants.ACCOUNT_NAME.equals(property) + || AzureConstants.CONTAINER.equals(property); } @Override @@ -128,10 +132,7 @@ public class ListAzureBlobStorage extends AbstractListProcessor<BlobInfo> { CloudBlobClient blobClient = storageAccount.createCloudBlobClient(); CloudBlobContainer container = blobClient.getContainerReference(containerName); - BlobRequestOptions blobRequestOptions = null; - OperationContext operationContext = null; - - for (ListBlobItem blob : container.listBlobs(prefix, true, EnumSet.of(BlobListingDetails.METADATA), blobRequestOptions, operationContext)) { + for (ListBlobItem blob : container.listBlobs(prefix, true, EnumSet.of(BlobListingDetails.METADATA), null, null)) { if (blob instanceof CloudBlob) { CloudBlob cloudBlob = (CloudBlob) blob; BlobProperties properties = cloudBlob.getProperties(); @@ -154,40 +155,26 @@ public class ListAzureBlobStorage extends AbstractListProcessor<BlobInfo> { return listing; } - protected static CloudStorageAccount createStorageConnection(ProcessContext context) { + protected CloudStorageAccount createStorageConnection(ProcessContext context) { final String accountName = context.getProperty(AzureConstants.ACCOUNT_NAME).evaluateAttributeExpressions().getValue(); final String accountKey = context.getProperty(AzureConstants.ACCOUNT_KEY).evaluateAttributeExpressions().getValue(); - final String storageConnectionString = String.format("DefaultEndpointsProtocol=http;AccountName=%s;AccountKey=%s", accountName, accountKey); + final String storageConnectionString = String.format(AzureConstants.FORMAT_DEFAULT_CONNECTION_STRING, accountName, accountKey); try { - return createStorageAccountFromConnectionString(storageConnectionString); + + CloudStorageAccount storageAccount; + try { + storageAccount = CloudStorageAccount.parse(storageConnectionString); + } catch (IllegalArgumentException | URISyntaxException e) { + getLogger().error("Invalid connection string URI for '{}'", new Object[]{context.getName()}, e); + throw e; + } catch (InvalidKeyException e) { + getLogger().error("Invalid connection credentials for '{}'", new Object[]{context.getName()}, e); + throw e; + } + return storageAccount; } catch (InvalidKeyException | URISyntaxException e) { throw new IllegalArgumentException(e); } } - /** - * Validates the connection string and returns the storage account. The connection string must be in the Azure connection string format. - * - * @param storageConnectionString - * Connection string for the storage service or the emulator - * @return The newly created CloudStorageAccount object - * - */ - private static CloudStorageAccount createStorageAccountFromConnectionString(String storageConnectionString) throws IllegalArgumentException, URISyntaxException, InvalidKeyException { - - CloudStorageAccount storageAccount; - try { - storageAccount = CloudStorageAccount.parse(storageConnectionString); - } catch (IllegalArgumentException | URISyntaxException e) { - System.out.println("\nConnection string specifies an invalid URI."); - System.out.println("Please confirm the connection string is in the Azure connection string format."); - throw e; - } catch (InvalidKeyException e) { - System.out.println("\nConnection string specifies an invalid key."); - System.out.println("Please confirm the AccountName and AccountKey in the connection string are valid."); - throw e; - } - return storageAccount; - } - } http://git-wip-us.apache.org/repos/asf/nifi/blob/f30c8169/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage.java index 1327a0b..e03bc25 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage.java @@ -35,7 +35,6 @@ import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.exception.ProcessException; -import org.apache.nifi.processor.io.InputStreamCallback; import org.apache.nifi.processors.azure.AbstractAzureBlobProcessor; import org.apache.nifi.processors.azure.AzureConstants; @@ -50,13 +49,12 @@ import com.microsoft.azure.storage.blob.CloudBlobContainer; @SeeAlso({ ListAzureBlobStorage.class, FetchAzureBlobStorage.class }) @CapabilityDescription("Puts content into an Azure Storage Blob") @InputRequirement(Requirement.INPUT_REQUIRED) -@WritesAttributes({ @WritesAttribute(attribute = "azure.container", description = "The name of the azure container"), - @WritesAttribute(attribute = "azure.blobname", description = "The name of the azure blob"), +@WritesAttributes({ @WritesAttribute(attribute = "azure.container", description = "The name of the Azure container"), + @WritesAttribute(attribute = "azure.blobname", description = "The name of the Azure blob"), @WritesAttribute(attribute = "azure.primaryUri", description = "Primary location for blob content"), @WritesAttribute(attribute = "azure.etag", description = "Etag for the Azure blob"), @WritesAttribute(attribute = "azure.length", description = "Length of the blob"), - @WritesAttribute(attribute = "azure.timestamp", description = "The timestamp in Azure for the blob"), - @WritesAttribute(attribute = "azure.blobtype", description = "This is the type of blob and can be either page or block type") }) + @WritesAttribute(attribute = "azure.timestamp", description = "The timestamp in Azure for the blob")}) public class PutAzureBlobStorage extends AbstractAzureBlobProcessor { public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { @@ -80,21 +78,23 @@ public class PutAzureBlobStorage extends AbstractAzureBlobProcessor { final Map<String, String> attributes = new HashMap<>(); long length = flowFile.getSize(); - session.read(flowFile, new InputStreamCallback() { - @Override - public void process(final InputStream rawIn) throws IOException { - final InputStream in = new BufferedInputStream(rawIn); - try { - blob.upload(in, length); - BlobProperties properties = blob.getProperties(); - attributes.put("azure.container", containerName); - attributes.put("azure.primaryUri", blob.getSnapshotQualifiedUri().toString()); - attributes.put("azure.etag", properties.getEtag()); - attributes.put("azure.length", String.valueOf(length)); - attributes.put("azure.timestamp", String.valueOf(properties.getLastModified())); - } catch (StorageException | URISyntaxException e) { - throw new IOException(e); - } + session.read(flowFile, rawIn -> { + InputStream in = rawIn; + if (!(in instanceof BufferedInputStream)) { + // do not double-wrap + in = new BufferedInputStream(rawIn); + } + + try { + blob.upload(in, length); + BlobProperties properties = blob.getProperties(); + attributes.put("azure.container", containerName); + attributes.put("azure.primaryUri", blob.getSnapshotQualifiedUri().toString()); + attributes.put("azure.etag", properties.getEtag()); + attributes.put("azure.length", String.valueOf(length)); + attributes.put("azure.timestamp", String.valueOf(properties.getLastModified())); + } catch (StorageException | URISyntaxException e) { + throw new IOException(e); } }); http://git-wip-us.apache.org/repos/asf/nifi/blob/f30c8169/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/BlobInfo.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/BlobInfo.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/BlobInfo.java index d429878..6907d94 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/BlobInfo.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/BlobInfo.java @@ -159,7 +159,7 @@ public class BlobInfo implements Comparable<BlobInfo>, Serializable, ListableEnt return etag.compareTo(o.etag); } - protected BlobInfo(final Builder builder) { + private BlobInfo(final Builder builder) { this.primaryUri = builder.primaryUri; this.secondaryUri = builder.secondaryUri; this.contentType = builder.contentType; http://git-wip-us.apache.org/repos/asf/nifi/blob/f30c8169/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/docs/org.apache.nifi.processors.azure.storage.FetchAzureBlobStorage/additionalDetails.html ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/docs/org.apache.nifi.processors.azure.storage.FetchAzureBlobStorage/additionalDetails.html b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/docs/org.apache.nifi.processors.azure.storage.FetchAzureBlobStorage/additionalDetails.html new file mode 100644 index 0000000..b4b8e3b --- /dev/null +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/docs/org.apache.nifi.processors.azure.storage.FetchAzureBlobStorage/additionalDetails.html @@ -0,0 +1,39 @@ +<!DOCTYPE html> +<html lang="en"> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<head> + <meta charset="utf-8" /> + <title>FetchAzureBlobStorage Processor</title> + <link rel="stylesheet" href="/nifi-docs/css/component-usage.css" type="text/css" /> +</head> + +<body> + +<h2>Apache NiFi Azure Processors</h2> + +<h3>Important Security Note</h3> +<p> + There are certain risks in allowing the account name and key to be stored as flowfile + attributes. While it does provide for a more flexible flow by allowing the account name and key + be fetched dynamically from the flow file attributes, care must be taken to restrict access to + the recorded event provenance data (e.g. by strictly controlling the provenance policy permission). + In addition, the provenance repositories may be put on encrypted disk partitions. +</p> +<p> + <a href="#" onclick="history.back()">Return to a previous page</a> +</p> +</body> +</html> http://git-wip-us.apache.org/repos/asf/nifi/blob/f30c8169/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/docs/org.apache.nifi.processors.azure.storage.ListAzureBlobStorage/additionalDetails.html ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/docs/org.apache.nifi.processors.azure.storage.ListAzureBlobStorage/additionalDetails.html b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/docs/org.apache.nifi.processors.azure.storage.ListAzureBlobStorage/additionalDetails.html new file mode 100644 index 0000000..76e8775 --- /dev/null +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/docs/org.apache.nifi.processors.azure.storage.ListAzureBlobStorage/additionalDetails.html @@ -0,0 +1,39 @@ +<!DOCTYPE html> +<html lang="en"> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<head> + <meta charset="utf-8" /> + <title>ListAzureBlobStorage Processor</title> + <link rel="stylesheet" href="/nifi-docs/css/component-usage.css" type="text/css" /> +</head> + +<body> + +<h2>Apache NiFi Azure Processors</h2> + +<h3>Important Security Note</h3> +<p> + There are certain risks in allowing the account name and key to be stored as flowfile + attributes. While it does provide for a more flexible flow by allowing the account name and key + be fetched dynamically from the flow file attributes, care must be taken to restrict access to + the recorded event provenance data (e.g. by strictly controlling the provenance policy permission). + In addition, the provenance repositories may be put on encrypted disk partitions. +</p> +<p> + <a href="#" onclick="history.back()">Return to a previous page</a> +</p> +</body> +</html> http://git-wip-us.apache.org/repos/asf/nifi/blob/f30c8169/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/docs/org.apache.nifi.processors.azure.storage.PutAzureBlobStorage/additionalDetails.html ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/docs/org.apache.nifi.processors.azure.storage.PutAzureBlobStorage/additionalDetails.html b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/docs/org.apache.nifi.processors.azure.storage.PutAzureBlobStorage/additionalDetails.html new file mode 100644 index 0000000..0a7ff35 --- /dev/null +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/docs/org.apache.nifi.processors.azure.storage.PutAzureBlobStorage/additionalDetails.html @@ -0,0 +1,39 @@ +<!DOCTYPE html> +<html lang="en"> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<head> + <meta charset="utf-8" /> + <title>PutAzureBlobStorage Processor</title> + <link rel="stylesheet" href="/nifi-docs/css/component-usage.css" type="text/css" /> +</head> + +<body> + +<h2>Apache NiFi Azure Processors</h2> + +<h3>Important Security Note</h3> +<p> + There are certain risks in allowing the account name and key to be stored as flowfile + attributes. While it does provide for a more flexible flow by allowing the account name and key + be fetched dynamically from the flow file attributes, care must be taken to restrict access to + the recorded event provenance data (e.g. by strictly controlling the provenance policy permission). + In addition, the provenance repositories may be put on encrypted disk partitions. +</p> +<p> + <a href="#" onclick="history.back()">Return to a previous page</a> +</p> +</body> +</html> http://git-wip-us.apache.org/repos/asf/nifi/blob/f30c8169/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureIT.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureIT.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureIT.java index 34702eb..91a8c73 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureIT.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureIT.java @@ -16,7 +16,17 @@ */ package org.apache.nifi.processors.azure.storage; -import static org.junit.Assert.fail; +import com.microsoft.azure.storage.CloudStorageAccount; +import com.microsoft.azure.storage.StorageException; +import com.microsoft.azure.storage.blob.CloudBlob; +import com.microsoft.azure.storage.blob.CloudBlobClient; +import com.microsoft.azure.storage.blob.CloudBlobContainer; +import com.microsoft.azure.storage.blob.DeleteSnapshotsOption; +import com.microsoft.azure.storage.blob.ListBlobItem; +import org.apache.nifi.processors.azure.AzureConstants; +import org.apache.nifi.util.file.FileUtils; +import org.junit.AfterClass; +import org.junit.BeforeClass; import java.io.FileInputStream; import java.io.FileNotFoundException; @@ -25,19 +35,7 @@ import java.net.URISyntaxException; import java.security.InvalidKeyException; import java.util.Properties; -import org.apache.nifi.util.file.FileUtils; -import org.junit.AfterClass; -import org.junit.BeforeClass; - -import com.microsoft.azure.storage.CloudStorageAccount; -import com.microsoft.azure.storage.StorageException; -import com.microsoft.azure.storage.blob.CloudBlob; -import com.microsoft.azure.storage.blob.CloudBlobClient; -import com.microsoft.azure.storage.blob.CloudBlobContainer; -import com.microsoft.azure.storage.blob.DeleteSnapshotsOption; -import com.microsoft.azure.storage.blob.ListBlobItem; -import com.microsoft.azure.storage.table.CloudTable; -import com.microsoft.azure.storage.table.CloudTableClient; +import static org.junit.Assert.fail; public abstract class AbstractAzureIT { protected static final String CREDENTIALS_FILE = System.getProperty("user.home") + "/azure-credentials.PROPERTIES"; @@ -90,17 +88,10 @@ public abstract class AbstractAzureIT { } protected static CloudBlobContainer getContainer() throws InvalidKeyException, URISyntaxException, StorageException { - String storageConnectionString = String.format("DefaultEndpointsProtocol=http;AccountName=%s;AccountKey=%s", getAccountName(), getAccountKey()); + String storageConnectionString = String.format(AzureConstants.FORMAT_DEFAULT_CONNECTION_STRING, getAccountName(), getAccountKey()); CloudStorageAccount storageAccount = CloudStorageAccount.parse(storageConnectionString); CloudBlobClient blobClient = storageAccount.createCloudBlobClient(); return blobClient.getContainerReference(TEST_CONTAINER_NAME); } - protected static CloudTable getTable() throws InvalidKeyException, URISyntaxException, StorageException { - String storageConnectionString = String.format("DefaultEndpointsProtocol=http;AccountName=%s;AccountKey=%s", getAccountName(), getAccountKey()); - CloudStorageAccount storageAccount = CloudStorageAccount.parse(storageConnectionString); - CloudTableClient tableClient = storageAccount.createCloudTableClient(); - return tableClient.getTableReference(TEST_TABLE_NAME); - } - } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/f30c8169/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITFetchAzureBlobStorage.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITFetchAzureBlobStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITFetchAzureBlobStorage.java index 1e8a8f7..7dc8830 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITFetchAzureBlobStorage.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITFetchAzureBlobStorage.java @@ -23,6 +23,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import org.apache.nifi.processors.azure.AbstractAzureProcessor; import org.apache.nifi.processors.azure.AzureConstants; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; @@ -51,7 +52,7 @@ public class ITFetchAzureBlobStorage extends AbstractAzureIT { runner.enqueue(new byte[0], attributes); runner.run(); - runner.assertAllFlowFilesTransferred(FetchAzureBlobStorage.REL_SUCCESS, 1); + runner.assertAllFlowFilesTransferred(AbstractAzureProcessor.REL_SUCCESS, 1); List<MockFlowFile> flowFilesForRelationship = runner.getFlowFilesForRelationship(FetchAzureBlobStorage.REL_SUCCESS); for (MockFlowFile flowFile : flowFilesForRelationship) { flowFile.assertContentEquals("0123456789".getBytes());