This is an automated email from the ASF dual-hosted git repository. turcsanyi pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push: new 58118cf NIFI-7334 Adding FetchDataLakeStorage Processor to provide native support for Azure Data lake Gen 2 Storage. 58118cf is described below commit 58118cf904cfb58ea119e5507a9a9549cda53bd9 Author: muazmaz <muza...@microsoft.com> AuthorDate: Tue Apr 14 23:36:19 2020 -0700 NIFI-7334 Adding FetchDataLakeStorage Processor to provide native support for Azure Data lake Gen 2 Storage. NIFI-7334 Update to FetchDataLakeStorage Processor This closes #4212. Signed-off-by: Peter Turcsanyi <turcsa...@apache.org> --- .../AbstractAzureDataLakeStorageProcessor.java | 2 +- .../azure/storage/DeleteAzureDataLakeStorage.java | 2 +- ...Storage.java => FetchAzureDataLakeStorage.java} | 19 ++++---- .../azure/storage/PutAzureDataLakeStorage.java | 2 +- .../services/org.apache.nifi.processor.Processor | 3 +- .../azure/storage/ITFetchAzureDataLakeStorage.java | 56 ++++++++++++++++++++++ 6 files changed, 72 insertions(+), 12 deletions(-) diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureDataLakeStorageProcessor.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureDataLakeStorageProcessor.java index 83c5c14..40d276c 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureDataLakeStorageProcessor.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureDataLakeStorageProcessor.java @@ -107,7 +107,7 @@ public abstract class AbstractAzureDataLakeStorageProcessor extends AbstractProc .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .required(true) - .defaultValue("nifi.${uuid}") + .defaultValue("${azure.filename}") .build(); public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description( diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/DeleteAzureDataLakeStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/DeleteAzureDataLakeStorage.java index 5cbf7f0..8403841 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/DeleteAzureDataLakeStorage.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/DeleteAzureDataLakeStorage.java @@ -35,7 +35,7 @@ import com.azure.storage.file.datalake.DataLakeServiceClient; import java.util.concurrent.TimeUnit; @Tags({"azure", "microsoft", "cloud", "storage", "adlsgen2", "datalake"}) -@SeeAlso({PutAzureDataLakeStorage.class}) +@SeeAlso({PutAzureDataLakeStorage.class, FetchAzureDataLakeStorage.class}) @CapabilityDescription("Deletes the provided file from Azure Data Lake Storage") @InputRequirement(Requirement.INPUT_REQUIRED) public class DeleteAzureDataLakeStorage extends AbstractAzureDataLakeStorageProcessor { diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/DeleteAzureDataLakeStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureDataLakeStorage.java similarity index 87% copy from nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/DeleteAzureDataLakeStorage.java copy to nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureDataLakeStorage.java index 5cbf7f0..d3068fb 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/DeleteAzureDataLakeStorage.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureDataLakeStorage.java @@ -16,6 +16,8 @@ */ package org.apache.nifi.processors.azure.storage; +import java.util.concurrent.TimeUnit; + import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.documentation.CapabilityDescription; @@ -28,17 +30,16 @@ import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor; import com.azure.storage.file.datalake.DataLakeDirectoryClient; -import com.azure.storage.file.datalake.DataLakeFileClient; import com.azure.storage.file.datalake.DataLakeFileSystemClient; import com.azure.storage.file.datalake.DataLakeServiceClient; +import com.azure.storage.file.datalake.DataLakeFileClient; -import java.util.concurrent.TimeUnit; @Tags({"azure", "microsoft", "cloud", "storage", "adlsgen2", "datalake"}) -@SeeAlso({PutAzureDataLakeStorage.class}) -@CapabilityDescription("Deletes the provided file from Azure Data Lake Storage") +@SeeAlso({PutAzureDataLakeStorage.class, DeleteAzureDataLakeStorage.class}) +@CapabilityDescription("Fetch the provided file from Azure Data Lake Storage") @InputRequirement(Requirement.INPUT_REQUIRED) -public class DeleteAzureDataLakeStorage extends AbstractAzureDataLakeStorageProcessor { +public class FetchAzureDataLakeStorage extends AbstractAzureDataLakeStorageProcessor { @Override public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { @@ -49,6 +50,7 @@ public class DeleteAzureDataLakeStorage extends AbstractAzureDataLakeStorageProc } final long startNanos = System.nanoTime(); + try { final String fileSystem = context.getProperty(FILESYSTEM).evaluateAttributeExpressions(flowFile).getValue(); final String directory = context.getProperty(DIRECTORY).evaluateAttributeExpressions(flowFile).getValue(); @@ -56,15 +58,16 @@ public class DeleteAzureDataLakeStorage extends AbstractAzureDataLakeStorageProc final DataLakeServiceClient storageClient = getStorageClient(context, flowFile); final DataLakeFileSystemClient dataLakeFileSystemClient = storageClient.getFileSystemClient(fileSystem); final DataLakeDirectoryClient directoryClient = dataLakeFileSystemClient.getDirectoryClient(directory); - final DataLakeFileClient fileClient = directoryClient.getFileClient(fileName); - fileClient.delete(); + + flowFile = session.write(flowFile, os -> fileClient.read(os)); + session.getProvenanceReporter().modifyContent(flowFile); session.transfer(flowFile, REL_SUCCESS); final long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos); session.getProvenanceReporter().send(flowFile, fileClient.getFileUrl(), transferMillis); } catch (Exception e) { - getLogger().error("Failed to delete the specified file from Azure Data Lake Storage, due to {}", e); + getLogger().error("Failure to fetch file from Azure Data Lake Storage, due to {}", e); flowFile = session.penalize(flowFile); session.transfer(flowFile, REL_FAILURE); } diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureDataLakeStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureDataLakeStorage.java index f67c98e..b59c366 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureDataLakeStorage.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureDataLakeStorage.java @@ -42,7 +42,7 @@ import org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor; @Tags({"azure", "microsoft", "cloud", "storage", "adlsgen2", "datalake"}) -@SeeAlso({DeleteAzureDataLakeStorage.class}) +@SeeAlso({DeleteAzureDataLakeStorage.class, FetchAzureDataLakeStorage.class}) @CapabilityDescription("Puts content into an Azure Data Lake Storage Gen 2") @WritesAttributes({@WritesAttribute(attribute = "azure.filesystem", description = "The name of the Azure File System"), @WritesAttribute(attribute = "azure.directory", description = "The name of the Azure Directory"), diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor index 9f8417d..6e09330 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -22,4 +22,5 @@ org.apache.nifi.processors.azure.storage.DeleteAzureBlobStorage org.apache.nifi.processors.azure.storage.queue.PutAzureQueueStorage org.apache.nifi.processors.azure.storage.queue.GetAzureQueueStorage org.apache.nifi.processors.azure.storage.PutAzureDataLakeStorage -org.apache.nifi.processors.azure.storage.DeleteAzureDataLakeStorage \ No newline at end of file +org.apache.nifi.processors.azure.storage.DeleteAzureDataLakeStorage +org.apache.nifi.processors.azure.storage.FetchAzureDataLakeStorage \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITFetchAzureDataLakeStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITFetchAzureDataLakeStorage.java new file mode 100644 index 0000000..93a414f --- /dev/null +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITFetchAzureDataLakeStorage.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.azure.storage; + +import org.apache.nifi.processor.Processor; +import org.apache.nifi.util.MockFlowFile; +import org.junit.Before; +import org.junit.Test; + +import java.util.List; + +public class ITFetchAzureDataLakeStorage extends AbstractAzureBlobStorageIT { + + @Override + protected Class<? extends Processor> getProcessorClass() { + return FetchAzureDataLakeStorage.class; + } + + @Before + public void setUp() throws Exception { + runner.setProperty(PutAzureDataLakeStorage.FILE, TEST_FILE_NAME); + + } + + @Test + public void testFetchFile() throws Exception { + runner.assertValid(); + runner.enqueue(new byte[0]); + runner.run(); + + assertResult(); + } + + private void assertResult() throws Exception { + runner.assertAllFlowFilesTransferred(PutAzureDataLakeStorage.REL_SUCCESS, 1); + List<MockFlowFile> flowFilesForRelationship = runner.getFlowFilesForRelationship(PutAzureDataLakeStorage.REL_SUCCESS); + for (MockFlowFile flowFile : flowFilesForRelationship) { + flowFile.assertContentEquals("0123456789".getBytes()); + flowFile.assertAttributeEquals("azure.length", "10"); + } + } +}