This is an automated email from the ASF dual-hosted git repository. mbathori pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push: new 03bba7049a NIFI-12672 Added AzureFileResourceService 03bba7049a is described below commit 03bba7049abcb31e3551e7f9353fa9f5436d9e66 Author: Balázs Gerner <balazsger...@gmail.com> AuthorDate: Thu Feb 1 15:51:00 2024 +0100 NIFI-12672 Added AzureFileResourceService This closes #8359. Signed-off-by: Mark Bathori <mbath...@apache.org> --- .../azure/AbstractAzureBlobProcessor_v12.java | 11 +- .../AbstractAzureDataLakeStorageProcessor.java | 117 +--------- .../azure/storage/CopyAzureBlobStorage_v12.java | 3 +- .../azure/storage/DeleteAzureBlobStorage_v12.java | 3 +- .../azure/storage/DeleteAzureDataLakeStorage.java | 22 +- .../azure/storage/FetchAzureBlobStorage_v12.java | 3 +- .../azure/storage/FetchAzureDataLakeStorage.java | 14 +- .../azure/storage/ListAzureBlobStorage_v12.java | 8 +- .../azure/storage/ListAzureDataLakeStorage.java | 14 +- .../azure/storage/MoveAzureDataLakeStorage.java | 18 +- .../azure/storage/PutAzureBlobStorage_v12.java | 3 +- .../azure/storage/PutAzureDataLakeStorage.java | 16 +- .../azure/storage/utils/AzureStorageUtils.java | 135 +++++++++++ .../AzureBlobStorageFileResourceService.java | 140 ++++++++++++ .../AzureDataLakeStorageFileResourceService.java | 150 ++++++++++++ .../org.apache.nifi.controller.ControllerService | 2 + .../storage/AbstractAzureBlobStorage_v12IT.java | 3 +- .../storage/AbstractAzureDataLakeStorageIT.java | 5 +- .../storage/ITDeleteAzureDataLakeStorage.java | 5 +- .../azure/storage/ITFetchAzureDataLakeStorage.java | 7 +- .../azure/storage/ITListAzureDataLakeStorage.java | 71 +++--- .../azure/storage/ITMoveAzureDataLakeStorage.java | 9 +- .../azure/storage/ITPutAzureDataLakeStorage.java | 23 +- .../storage/TestAbstractAzureDataLakeStorage.java | 8 +- .../AzureBlobStorageFileResourceServiceTest.java | 191 ++++++++++++++++ ...zureDataLakeStorageFileResourceServiceTest.java | 251 +++++++++++++++++++++ 26 files changed, 1008 insertions(+), 224 deletions(-) diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureBlobProcessor_v12.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureBlobProcessor_v12.java index 89fee5701c..c72e41fe62 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureBlobProcessor_v12.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureBlobProcessor_v12.java @@ -39,6 +39,7 @@ import java.util.Map; import java.util.Set; import java.util.function.Supplier; +import static org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.BLOB_STORAGE_CREDENTIALS_SERVICE; import static org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.getProxyOptions; import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_NAME_BLOBNAME; import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_NAME_BLOBTYPE; @@ -52,14 +53,6 @@ import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR public abstract class AbstractAzureBlobProcessor_v12 extends AbstractProcessor { - public static final PropertyDescriptor STORAGE_CREDENTIALS_SERVICE = new PropertyDescriptor.Builder() - .name("storage-credentials-service") - .displayName("Storage Credentials") - .description("Controller Service used to obtain Azure Blob Storage Credentials.") - .identifiesControllerService(AzureStorageCredentialsService_v12.class) - .required(true) - .build(); - public static final PropertyDescriptor BLOB_NAME = new PropertyDescriptor.Builder() .name("blob-name") .displayName("Blob Name") @@ -98,7 +91,7 @@ public abstract class AbstractAzureBlobProcessor_v12 extends AbstractProcessor { } protected BlobServiceClient getStorageClient(PropertyContext context, FlowFile flowFile) { - return getStorageClient(context, STORAGE_CREDENTIALS_SERVICE, flowFile); + return getStorageClient(context, BLOB_STORAGE_CREDENTIALS_SERVICE, flowFile); } protected BlobServiceClient getStorageClient(PropertyContext context, PropertyDescriptor storageCredentialsServiceProperty, FlowFile flowFile) { diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureDataLakeStorageProcessor.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureDataLakeStorageProcessor.java index bb62bd5400..fd357b49a7 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureDataLakeStorageProcessor.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/AbstractAzureDataLakeStorageProcessor.java @@ -17,69 +17,25 @@ package org.apache.nifi.processors.azure; import com.azure.storage.file.datalake.DataLakeServiceClient; -import org.apache.commons.lang3.StringUtils; import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.annotation.lifecycle.OnStopped; -import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.components.ValidationContext; -import org.apache.nifi.components.ValidationResult; -import org.apache.nifi.components.Validator; import org.apache.nifi.context.PropertyContext; -import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.Relationship; -import org.apache.nifi.processor.exception.ProcessException; -import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils; import org.apache.nifi.processors.azure.storage.utils.DataLakeServiceClientFactory; import org.apache.nifi.services.azure.storage.ADLSCredentialsDetails; import org.apache.nifi.services.azure.storage.ADLSCredentialsService; -import java.util.Collections; import java.util.Map; import java.util.Set; -import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_FILENAME; +import static org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.ADLS_CREDENTIALS_SERVICE; public abstract class AbstractAzureDataLakeStorageProcessor extends AbstractProcessor { - public static final PropertyDescriptor ADLS_CREDENTIALS_SERVICE = new PropertyDescriptor.Builder() - .name("adls-credentials-service") - .displayName("ADLS Credentials") - .description("Controller Service used to obtain Azure Credentials.") - .identifiesControllerService(ADLSCredentialsService.class) - .required(true) - .build(); - - public static final PropertyDescriptor FILESYSTEM = new PropertyDescriptor.Builder() - .name("filesystem-name").displayName("Filesystem Name") - .description("Name of the Azure Storage File System (also called Container). It is assumed to be already existing.") - .addValidator(StandardValidators.NON_BLANK_VALIDATOR) - .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) - .required(true) - .build(); - - public static final PropertyDescriptor DIRECTORY = new PropertyDescriptor.Builder() - .name("directory-name") - .displayName("Directory Name") - .description("Name of the Azure Storage Directory. The Directory Name cannot contain a leading '/'. The root directory can be designated by the empty string value. " + - "In case of the PutAzureDataLakeStorage processor, the directory will be created if not already existing.") - .addValidator(new DirectoryValidator()) - .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) - .required(true) - .build(); - - public static final PropertyDescriptor FILE = new PropertyDescriptor.Builder() - .name("file-name").displayName("File Name") - .description("The filename") - .addValidator(StandardValidators.NON_BLANK_VALIDATOR) - .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) - .required(true) - .defaultValue(String.format("${%s}", ATTR_NAME_FILENAME)) - .build(); - public static final Relationship REL_SUCCESS = new Relationship.Builder() .name("success") .description("Files that have been successfully written to Azure storage are transferred to this relationship") @@ -111,77 +67,12 @@ public abstract class AbstractAzureDataLakeStorageProcessor extends AbstractProc } public DataLakeServiceClient getStorageClient(PropertyContext context, FlowFile flowFile) { - final Map<String, String> attributes = flowFile != null ? flowFile.getAttributes() : Collections.emptyMap(); + final Map<String, String> attributes = flowFile != null ? flowFile.getAttributes() : Map.of(); - final ADLSCredentialsService credentialsService = context.getProperty(ADLS_CREDENTIALS_SERVICE).asControllerService(ADLSCredentialsService.class); + final ADLSCredentialsService credentialsService = context.getProperty(ADLS_CREDENTIALS_SERVICE) + .asControllerService(ADLSCredentialsService.class); final ADLSCredentialsDetails credentialsDetails = credentialsService.getCredentialsDetails(attributes); return clientFactory.getStorageClient(credentialsDetails); } - - public static String evaluateFileSystemProperty(ProcessContext context, FlowFile flowFile) { - return evaluateFileSystemProperty(context, flowFile, FILESYSTEM); - } - - public static String evaluateFileSystemProperty(ProcessContext context, FlowFile flowFile, PropertyDescriptor property) { - String fileSystem = context.getProperty(property).evaluateAttributeExpressions(flowFile).getValue(); - if (StringUtils.isBlank(fileSystem)) { - throw new ProcessException(String.format("'%1$s' property evaluated to blank string. '%s' must be specified as a non-blank string.", property.getDisplayName())); - } - return fileSystem; - } - - public static String evaluateDirectoryProperty(ProcessContext context, FlowFile flowFile) { - return evaluateDirectoryProperty(context, flowFile, DIRECTORY); - } - - public static String evaluateDirectoryProperty(ProcessContext context, FlowFile flowFile, PropertyDescriptor property) { - String directory = context.getProperty(property).evaluateAttributeExpressions(flowFile).getValue(); - if (directory.startsWith("/")) { - throw new ProcessException(String.format("'%1$s' starts with '/'. '%s' cannot contain a leading '/'.", property.getDisplayName())); - } else if (StringUtils.isNotEmpty(directory) && StringUtils.isWhitespace(directory)) { - throw new ProcessException(String.format("'%1$s' contains whitespace characters only.", property.getDisplayName())); - } - return directory; - } - - public static String evaluateFileNameProperty(ProcessContext context, FlowFile flowFile) { - String fileName = context.getProperty(FILE).evaluateAttributeExpressions(flowFile).getValue(); - if (StringUtils.isBlank(fileName)) { - throw new ProcessException(String.format("'%1$s' property evaluated to blank string. '%s' must be specified as a non-blank string.", FILE.getDisplayName())); - } - return fileName; - } - - public static class DirectoryValidator implements Validator { - private String displayName; - - public DirectoryValidator() { - this.displayName = null; - } - - public DirectoryValidator(String displayName) { - this.displayName = displayName; - } - - @Override - public ValidationResult validate(String subject, String input, ValidationContext context) { - displayName = displayName == null ? DIRECTORY.getDisplayName() : displayName; - ValidationResult.Builder builder = new ValidationResult.Builder() - .subject(displayName) - .input(input); - - if (context.isExpressionLanguagePresent(input)) { - builder.valid(true).explanation("Expression Language Present"); - } else if (input.startsWith("/")) { - builder.valid(false).explanation(String.format("'%s' cannot contain a leading '/'", displayName)); - } else if (StringUtils.isNotEmpty(input) && StringUtils.isWhitespace(input)) { - builder.valid(false).explanation(String.format("'%s' cannot contain whitespace characters only", displayName)); - } else { - builder.valid(true); - } - - return builder.build(); - } - } } diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/CopyAzureBlobStorage_v12.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/CopyAzureBlobStorage_v12.java index a1c7b07010..91ad7005b9 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/CopyAzureBlobStorage_v12.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/CopyAzureBlobStorage_v12.java @@ -79,6 +79,7 @@ import static com.azure.storage.blob.specialized.BlockBlobClient.MAX_STAGE_BLOCK import static com.azure.storage.blob.specialized.BlockBlobClient.MAX_UPLOAD_BLOB_BYTES_LONG; import static com.azure.storage.common.implementation.Constants.STORAGE_SCOPE; import static java.net.HttpURLConnection.HTTP_ACCEPTED; +import static org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.BLOB_STORAGE_CREDENTIALS_SERVICE; import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_DESCRIPTION_BLOBNAME; import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_DESCRIPTION_BLOBTYPE; import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_DESCRIPTION_CONTAINER; @@ -148,7 +149,7 @@ public class CopyAzureBlobStorage_v12 extends AbstractAzureBlobProcessor_v12 { .build(); public static final PropertyDescriptor DESTINATION_STORAGE_CREDENTIALS_SERVICE = new PropertyDescriptor.Builder() - .fromPropertyDescriptor(STORAGE_CREDENTIALS_SERVICE) + .fromPropertyDescriptor(BLOB_STORAGE_CREDENTIALS_SERVICE) .displayName("Destination Storage Credentials") .build(); diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/DeleteAzureBlobStorage_v12.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/DeleteAzureBlobStorage_v12.java index f241ffd05a..a8e889842b 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/DeleteAzureBlobStorage_v12.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/DeleteAzureBlobStorage_v12.java @@ -38,6 +38,7 @@ import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils; import java.util.List; import java.util.concurrent.TimeUnit; +import static org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.BLOB_STORAGE_CREDENTIALS_SERVICE; import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_NAME_BLOBNAME; import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_NAME_CONTAINER; @@ -73,7 +74,7 @@ public class DeleteAzureBlobStorage_v12 extends AbstractAzureBlobProcessor_v12 { .build(); private static final List<PropertyDescriptor> PROPERTIES = List.of( - STORAGE_CREDENTIALS_SERVICE, + BLOB_STORAGE_CREDENTIALS_SERVICE, CONTAINER, BLOB_NAME, DELETE_SNAPSHOTS_OPTION, diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/DeleteAzureDataLakeStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/DeleteAzureDataLakeStorage.java index 11d09514d6..080012600e 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/DeleteAzureDataLakeStorage.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/DeleteAzureDataLakeStorage.java @@ -29,19 +29,22 @@ import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.exception.ProcessException; -import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor; import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils; import java.time.Duration; import java.util.List; -import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_FILENAME; +import static org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.ADLS_CREDENTIALS_SERVICE; +import static org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.DIRECTORY; +import static org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.FILESYSTEM; +import static org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.evaluateDirectoryProperty; +import static org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.evaluateFileProperty; +import static org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.evaluateFileSystemProperty; @Tags({"azure", "microsoft", "cloud", "storage", "adlsgen2", "datalake"}) @SeeAlso({PutAzureDataLakeStorage.class, FetchAzureDataLakeStorage.class, ListAzureDataLakeStorage.class}) @@ -62,12 +65,7 @@ public class DeleteAzureDataLakeStorage extends AbstractAzureDataLakeStorageProc .build(); public static final PropertyDescriptor FILE = new PropertyDescriptor.Builder() - .name("file-name").displayName("File Name") - .description("The filename") - .addValidator(StandardValidators.NON_BLANK_VALIDATOR) - .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) - .required(true) - .defaultValue(String.format("${%s}", ATTR_NAME_FILENAME)) + .fromPropertyDescriptor(AzureStorageUtils.FILE) .dependsOn(FILESYSTEM_OBJECT_TYPE, FS_TYPE_FILE) .build(); @@ -90,14 +88,14 @@ public class DeleteAzureDataLakeStorage extends AbstractAzureDataLakeStorageProc final boolean isFile = context.getProperty(FILESYSTEM_OBJECT_TYPE).getValue().equals(FS_TYPE_FILE.getValue()); final DataLakeServiceClient storageClient = getStorageClient(context, flowFile); - final String fileSystem = evaluateFileSystemProperty(context, flowFile); + final String fileSystem = evaluateFileSystemProperty(FILESYSTEM, context, flowFile); final DataLakeFileSystemClient fileSystemClient = storageClient.getFileSystemClient(fileSystem); - final String directory = evaluateDirectoryProperty(context, flowFile); + final String directory = evaluateDirectoryProperty(DIRECTORY, context, flowFile); final DataLakeDirectoryClient directoryClient = fileSystemClient.getDirectoryClient(directory); if (isFile) { - final String fileName = evaluateFileNameProperty(context, flowFile); + final String fileName = evaluateFileProperty(context, flowFile); final DataLakeFileClient fileClient = directoryClient.getFileClient(fileName); fileClient.delete(); session.transfer(flowFile, REL_SUCCESS); diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureBlobStorage_v12.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureBlobStorage_v12.java index 45100a65c3..fb0757403e 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureBlobStorage_v12.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureBlobStorage_v12.java @@ -49,6 +49,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; +import static org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.BLOB_STORAGE_CREDENTIALS_SERVICE; import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_DESCRIPTION_BLOBNAME; import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_DESCRIPTION_BLOBTYPE; import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_DESCRIPTION_CONTAINER; @@ -140,7 +141,7 @@ public class FetchAzureBlobStorage_v12 extends AbstractAzureBlobProcessor_v12 im .build(); private static final List<PropertyDescriptor> PROPERTIES = List.of( - STORAGE_CREDENTIALS_SERVICE, + BLOB_STORAGE_CREDENTIALS_SERVICE, CONTAINER, BLOB_NAME, RANGE_START, diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureDataLakeStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureDataLakeStorage.java index 2018a90dc0..bd53b35561 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureDataLakeStorage.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/FetchAzureDataLakeStorage.java @@ -47,6 +47,14 @@ import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils; import java.util.List; import java.util.concurrent.TimeUnit; +import static org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.ADLS_CREDENTIALS_SERVICE; +import static org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.DIRECTORY; +import static org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.FILE; +import static org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.FILESYSTEM; +import static org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.evaluateDirectoryProperty; +import static org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.evaluateFileProperty; +import static org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.evaluateFileSystemProperty; + @Tags({"azure", "microsoft", "cloud", "storage", "adlsgen2", "datalake"}) @SeeAlso({PutAzureDataLakeStorage.class, DeleteAzureDataLakeStorage.class, ListAzureDataLakeStorage.class}) @CapabilityDescription("Fetch the specified file from Azure Data Lake Storage") @@ -149,9 +157,9 @@ public class FetchAzureDataLakeStorage extends AbstractAzureDataLakeStorageProce final DownloadRetryOptions retryOptions = new DownloadRetryOptions(); retryOptions.setMaxRetryRequests(numRetries); - final String fileSystem = evaluateFileSystemProperty(context, flowFile); - final String directory = evaluateDirectoryProperty(context, flowFile); - final String fileName = evaluateFileNameProperty(context, flowFile); + final String fileSystem = evaluateFileSystemProperty(FILESYSTEM, context, flowFile); + final String directory = evaluateDirectoryProperty(DIRECTORY, context, flowFile); + final String fileName = evaluateFileProperty(context, flowFile); final DataLakeServiceClient storageClient = getStorageClient(context, flowFile); final DataLakeFileSystemClient fileSystemClient = storageClient.getFileSystemClient(fileSystem); final DataLakeDirectoryClient directoryClient = fileSystemClient.getDirectoryClient(directory); diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage_v12.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage_v12.java index 20aa86c0ac..6074dcf549 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage_v12.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureBlobStorage_v12.java @@ -59,7 +59,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import static org.apache.nifi.processors.azure.AbstractAzureBlobProcessor_v12.STORAGE_CREDENTIALS_SERVICE; +import static org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.BLOB_STORAGE_CREDENTIALS_SERVICE; import static org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.getProxyOptions; import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_DESCRIPTION_BLOBNAME; import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_DESCRIPTION_BLOBTYPE; @@ -135,7 +135,7 @@ public class ListAzureBlobStorage_v12 extends AbstractListAzureProcessor<BlobInf .build(); private static final List<PropertyDescriptor> PROPERTIES = List.of( - STORAGE_CREDENTIALS_SERVICE, + BLOB_STORAGE_CREDENTIALS_SERVICE, CONTAINER, BLOB_NAME_PREFIX, RECORD_WRITER, @@ -202,7 +202,7 @@ public class ListAzureBlobStorage_v12 extends AbstractListAzureProcessor<BlobInf @Override protected boolean isListingResetNecessary(final PropertyDescriptor property) { - return STORAGE_CREDENTIALS_SERVICE.equals(property) + return BLOB_STORAGE_CREDENTIALS_SERVICE.equals(property) || CONTAINER.equals(property) || BLOB_NAME_PREFIX.equals(property) || LISTING_STRATEGY.equals(property); @@ -217,7 +217,7 @@ public class ListAzureBlobStorage_v12 extends AbstractListAzureProcessor<BlobInf try { final List<BlobInfo> listing = new ArrayList<>(); - final AzureStorageCredentialsService_v12 credentialsService = context.getProperty(STORAGE_CREDENTIALS_SERVICE).asControllerService(AzureStorageCredentialsService_v12.class); + final AzureStorageCredentialsService_v12 credentialsService = context.getProperty(BLOB_STORAGE_CREDENTIALS_SERVICE).asControllerService(AzureStorageCredentialsService_v12.class); final AzureStorageCredentialsDetails_v12 credentialsDetails = credentialsService.getCredentialsDetails(Collections.emptyMap()); final BlobServiceClient storageClient = clientFactory.getStorageClient(credentialsDetails); diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureDataLakeStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureDataLakeStorage.java index 425b86b4e9..f1a50ca4e9 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureDataLakeStorage.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/ListAzureDataLakeStorage.java @@ -62,12 +62,7 @@ import java.util.regex.Pattern; import static org.apache.nifi.processor.util.list.ListedEntityTracker.INITIAL_LISTING_TARGET; import static org.apache.nifi.processor.util.list.ListedEntityTracker.TRACKING_STATE_CACHE; import static org.apache.nifi.processor.util.list.ListedEntityTracker.TRACKING_TIME_WINDOW; -import static org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor.ADLS_CREDENTIALS_SERVICE; -import static org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor.DIRECTORY; -import static org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor.FILESYSTEM; import static org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor.TEMP_FILE_DIRECTORY; -import static org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor.evaluateDirectoryProperty; -import static org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor.evaluateFileSystemProperty; import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_DESCRIPTION_DIRECTORY; import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_DESCRIPTION_ETAG; import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_DESCRIPTION_FILENAME; @@ -82,6 +77,11 @@ import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_FILE_PATH; import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_LAST_MODIFIED; import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_LENGTH; +import static org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.ADLS_CREDENTIALS_SERVICE; +import static org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.DIRECTORY; +import static org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.FILESYSTEM; +import static org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.evaluateDirectoryProperty; +import static org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.evaluateFileSystemProperty; @PrimaryNodeOnly @TriggerSerially @@ -265,8 +265,8 @@ public class ListAzureDataLakeStorage extends AbstractListAzureProcessor<ADLSFil private List<ADLSFileInfo> performListing(final ProcessContext context, final Long minTimestamp, final ListingMode listingMode, final boolean applyFilters) throws IOException { try { - final String fileSystem = evaluateFileSystemProperty(context, null); - final String baseDirectory = evaluateDirectoryProperty(context, null); + final String fileSystem = evaluateFileSystemProperty(FILESYSTEM, context); + final String baseDirectory = evaluateDirectoryProperty(DIRECTORY, context); final boolean recurseSubdirectories = context.getProperty(RECURSE_SUBDIRECTORIES).asBoolean(); final Pattern filePattern = listingMode == ListingMode.EXECUTION ? this.filePattern : getPattern(context, FILE_FILTER); diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/MoveAzureDataLakeStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/MoveAzureDataLakeStorage.java index 4d4a11f9bc..d388ab3289 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/MoveAzureDataLakeStorage.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/MoveAzureDataLakeStorage.java @@ -37,6 +37,7 @@ import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor; import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils; +import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.DirectoryValidator; import java.util.HashMap; import java.util.List; @@ -57,6 +58,13 @@ import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_PRIMARY_URI; import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_SOURCE_DIRECTORY; import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_SOURCE_FILESYSTEM; +import static org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.ADLS_CREDENTIALS_SERVICE; +import static org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.DIRECTORY; +import static org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.FILE; +import static org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.FILESYSTEM; +import static org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.evaluateDirectoryProperty; +import static org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.evaluateFileProperty; +import static org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.evaluateFileSystemProperty; @Tags({"azure", "microsoft", "cloud", "storage", "adlsgen2", "datalake"}) @SeeAlso({DeleteAzureDataLakeStorage.class, FetchAzureDataLakeStorage.class, ListAzureDataLakeStorage.class}) @@ -147,11 +155,11 @@ public class MoveAzureDataLakeStorage extends AbstractAzureDataLakeStorageProces final long startNanos = System.nanoTime(); try { - final String sourceFileSystem = evaluateFileSystemProperty(context, flowFile, SOURCE_FILESYSTEM); - final String sourceDirectory = evaluateDirectoryProperty(context, flowFile, SOURCE_DIRECTORY); - final String destinationFileSystem = evaluateFileSystemProperty(context, flowFile, DESTINATION_FILESYSTEM); - final String destinationDirectory = evaluateDirectoryProperty(context, flowFile, DESTINATION_DIRECTORY); - final String fileName = evaluateFileNameProperty(context, flowFile); + final String sourceFileSystem = evaluateFileSystemProperty(SOURCE_FILESYSTEM, context, flowFile); + final String sourceDirectory = evaluateDirectoryProperty(SOURCE_DIRECTORY, context, flowFile); + final String destinationFileSystem = evaluateFileSystemProperty(DESTINATION_FILESYSTEM, context, flowFile); + final String destinationDirectory = evaluateDirectoryProperty(DESTINATION_DIRECTORY, context, flowFile); + final String fileName = evaluateFileProperty(context, flowFile); final String destinationPath; if (!destinationDirectory.isEmpty() && !sourceDirectory.isEmpty()) { diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java index 3d2a36e67d..8f13f7e302 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureBlobStorage_v12.java @@ -59,6 +59,7 @@ import java.util.concurrent.TimeUnit; import static com.azure.core.http.ContentType.APPLICATION_OCTET_STREAM; import static com.azure.core.util.FluxUtil.toFluxByteBuffer; +import static org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.BLOB_STORAGE_CREDENTIALS_SERVICE; import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_DESCRIPTION_BLOBNAME; import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_DESCRIPTION_BLOBTYPE; import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_DESCRIPTION_CONTAINER; @@ -104,7 +105,7 @@ import static org.apache.nifi.processors.transfer.ResourceTransferUtils.getFileR public class PutAzureBlobStorage_v12 extends AbstractAzureBlobProcessor_v12 implements ClientSideEncryptionSupport { private static final List<PropertyDescriptor> PROPERTIES = List.of( - STORAGE_CREDENTIALS_SERVICE, + BLOB_STORAGE_CREDENTIALS_SERVICE, AzureStorageUtils.CONTAINER, AzureStorageUtils.CREATE_CONTAINER, AzureStorageUtils.CONFLICT_RESOLUTION, diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureDataLakeStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureDataLakeStorage.java index 817435ed2f..7651ad6a99 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureDataLakeStorage.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureDataLakeStorage.java @@ -39,6 +39,7 @@ import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor; import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils; +import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.DirectoryValidator; import org.apache.nifi.processors.transfer.ResourceTransferSource; import org.apache.nifi.util.StringUtils; @@ -61,6 +62,13 @@ import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_FILESYSTEM; import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_LENGTH; import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_PRIMARY_URI; +import static org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.ADLS_CREDENTIALS_SERVICE; +import static org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.DIRECTORY; +import static org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.FILE; +import static org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.FILESYSTEM; +import static org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.evaluateDirectoryProperty; +import static org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.evaluateFileProperty; +import static org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.evaluateFileSystemProperty; import static org.apache.nifi.processors.transfer.ResourceTransferProperties.FILE_RESOURCE_SERVICE; import static org.apache.nifi.processors.transfer.ResourceTransferProperties.RESOURCE_TRANSFER_SOURCE; import static org.apache.nifi.processors.transfer.ResourceTransferUtils.getFileResource; @@ -128,11 +136,11 @@ public class PutAzureDataLakeStorage extends AbstractAzureDataLakeStorageProcess final long startNanos = System.nanoTime(); try { - final String fileSystem = evaluateFileSystemProperty(context, flowFile); - final String originalDirectory = evaluateDirectoryProperty(context, flowFile); - final String tempPath = evaluateDirectoryProperty(context, flowFile, BASE_TEMPORARY_PATH); + final String fileSystem = evaluateFileSystemProperty(FILESYSTEM, context, flowFile); + final String originalDirectory = evaluateDirectoryProperty(DIRECTORY, context, flowFile); + final String tempPath = evaluateDirectoryProperty(BASE_TEMPORARY_PATH, context, flowFile); final String tempDirectory = createPath(tempPath, TEMP_FILE_DIRECTORY); - final String fileName = evaluateFileNameProperty(context, flowFile); + final String fileName = evaluateFileProperty(context, flowFile); final DataLakeFileSystemClient fileSystemClient = getFileSystemClient(context, flowFile, fileSystem); final DataLakeDirectoryClient directoryClient = fileSystemClient.getDirectoryClient(originalDirectory); diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/AzureStorageUtils.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/AzureStorageUtils.java index ed79f4436d..7a0165d7de 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/AzureStorageUtils.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/AzureStorageUtils.java @@ -17,16 +17,22 @@ package org.apache.nifi.processors.azure.storage.utils; import com.azure.core.http.ProxyOptions; +import org.apache.commons.lang3.StringUtils; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; import org.apache.nifi.context.PropertyContext; import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.proxy.ProxyConfiguration; import org.apache.nifi.proxy.ProxySpec; import org.apache.nifi.proxy.SocksVersion; +import org.apache.nifi.services.azure.storage.ADLSCredentialsService; import org.apache.nifi.services.azure.storage.AzureStorageConflictResolutionStrategy; +import org.apache.nifi.services.azure.storage.AzureStorageCredentialsService_v12; import org.apache.nifi.services.azure.storage.AzureStorageCredentialsType; import reactor.netty.http.client.HttpClient; @@ -34,6 +40,9 @@ import java.net.InetSocketAddress; import java.net.Proxy; import java.util.Collection; import java.util.EnumSet; +import java.util.Map; + +import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_FILENAME; public final class AzureStorageUtils { public static final String STORAGE_ACCOUNT_NAME_PROPERTY_DESCRIPTOR_NAME = "storage-account-name"; @@ -41,6 +50,22 @@ public final class AzureStorageUtils { public static final String STORAGE_SAS_TOKEN_PROPERTY_DESCRIPTOR_NAME = "storage-sas-token"; public static final String STORAGE_ENDPOINT_SUFFIX_PROPERTY_DESCRIPTOR_NAME = "storage-endpoint-suffix"; + public static final PropertyDescriptor ADLS_CREDENTIALS_SERVICE = new PropertyDescriptor.Builder() + .name("adls-credentials-service") + .displayName("ADLS Credentials") + .description("Controller Service used to obtain Azure Credentials.") + .identifiesControllerService(ADLSCredentialsService.class) + .required(true) + .build(); + + public static final PropertyDescriptor BLOB_STORAGE_CREDENTIALS_SERVICE = new PropertyDescriptor.Builder() + .name("storage-credentials-service") + .displayName("Storage Credentials") + .description("Controller Service used to obtain Azure Blob Storage Credentials.") + .identifiesControllerService(AzureStorageCredentialsService_v12.class) + .required(true) + .build(); + public static final PropertyDescriptor CREDENTIALS_TYPE = new PropertyDescriptor.Builder() .name("credentials-type") .displayName("Credentials Type") @@ -54,6 +79,33 @@ public final class AzureStorageUtils { .defaultValue(AzureStorageCredentialsType.SAS_TOKEN) .build(); + public static final PropertyDescriptor FILESYSTEM = new PropertyDescriptor.Builder() + .name("filesystem-name").displayName("Filesystem Name") + .description("Name of the Azure Storage File System (also called Container). It is assumed to be already existing.") + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .required(true) + .build(); + + public static final PropertyDescriptor DIRECTORY = new PropertyDescriptor.Builder() + .name("directory-name") + .displayName("Directory Name") + .description("Name of the Azure Storage Directory. The Directory Name cannot contain a leading '/'. The root directory can be designated by the empty string value. " + + "In case of the PutAzureDataLakeStorage processor, the directory will be created if not already existing.") + .addValidator(new DirectoryValidator()) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .required(true) + .build(); + + public static final PropertyDescriptor FILE = new PropertyDescriptor.Builder() + .name("file-name").displayName("File Name") + .description("The filename") + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .required(true) + .defaultValue(String.format("${%s}", ATTR_NAME_FILENAME)) + .build(); + public static final String ACCOUNT_KEY_BASE_DESCRIPTION = "The storage account key. This is an admin-like password providing access to every container in this account. It is recommended " + "one uses Shared Access Signature (SAS) token, Managed Identity or Service Principal instead for fine-grained control with policies."; @@ -215,6 +267,57 @@ public final class AzureStorageUtils { ProxyConfiguration.validateProxySpec(context, results, PROXY_SPECS); } + public static String evaluateFileSystemProperty(PropertyDescriptor property, PropertyContext context) { + return evaluateFileSystemProperty(property, context, (Map<String, String>) null); + } + + public static String evaluateFileSystemProperty(PropertyDescriptor property, PropertyContext context, FlowFile flowFile) { + return evaluateFileSystemProperty(property, context, flowFile.getAttributes()); + } + + public static String evaluateFileSystemProperty(PropertyDescriptor property, PropertyContext context, Map<String, String> attributes) { + final String fileSystem = evaluateProperty(property, context, attributes); + if (StringUtils.isBlank(fileSystem)) { + throw new ProcessException(String.format("'%1$s' property evaluated to blank string. '%s' must be specified as a non-blank string.", + property.getDisplayName())); + } + return fileSystem; + } + + public static String evaluateDirectoryProperty(PropertyDescriptor property, PropertyContext context) { + return evaluateDirectoryProperty(property, context, (Map<String, String>) null); + } + + public static String evaluateDirectoryProperty(PropertyDescriptor property, PropertyContext context, FlowFile flowFile) { + return evaluateDirectoryProperty(property, context, flowFile.getAttributes()); + } + + public static String evaluateDirectoryProperty(PropertyDescriptor property, PropertyContext context, Map<String, String> attributes) { + final String directory = evaluateProperty(property, context, attributes); + if (directory.startsWith("/")) { + throw new ProcessException(String.format("'%1$s' starts with '/'. '%s' cannot contain a leading '/'.", property.getDisplayName())); + } else if (StringUtils.isNotEmpty(directory) && StringUtils.isWhitespace(directory)) { + throw new ProcessException(String.format("'%1$s' contains whitespace characters only.", property.getDisplayName())); + } + return directory; + } + + public static String evaluateFileProperty(PropertyContext context, FlowFile flowFile) { + return evaluateFileProperty(context, flowFile.getAttributes()); + } + + public static String evaluateFileProperty(PropertyContext context, Map<String, String> attributes) { + final String fileName = evaluateProperty(FILE, context, attributes); + if (StringUtils.isBlank(fileName)) { + throw new ProcessException(String.format("'%1$s' property evaluated to blank string. '%s' must be specified as a non-blank string.", FILE.getDisplayName())); + } + return fileName; + } + + private static String evaluateProperty(PropertyDescriptor propertyDescriptor, PropertyContext context, Map<String, String> attributes) { + return context.getProperty(propertyDescriptor).evaluateAttributeExpressions(attributes).getValue(); + } + /** * * Creates the {@link ProxyOptions proxy options} that {@link HttpClient} will use. @@ -252,4 +355,36 @@ public final class AzureStorageUtils { throw new IllegalArgumentException("Unsupported proxy type: " + proxyConfiguration.getProxyType()); } } + + public static class DirectoryValidator implements Validator { + private String displayName; + + public DirectoryValidator() { + this.displayName = null; + } + + public DirectoryValidator(String displayName) { + this.displayName = displayName; + } + + @Override + public ValidationResult validate(String subject, String input, ValidationContext context) { + displayName = displayName == null ? DIRECTORY.getDisplayName() : displayName; + ValidationResult.Builder builder = new ValidationResult.Builder() + .subject(displayName) + .input(input); + + if (context.isExpressionLanguagePresent(input)) { + builder.valid(true).explanation("Expression Language Present"); + } else if (input.startsWith("/")) { + builder.valid(false).explanation(String.format("'%s' cannot contain a leading '/'", displayName)); + } else if (StringUtils.isNotEmpty(input) && StringUtils.isWhitespace(input)) { + builder.valid(false).explanation(String.format("'%s' cannot contain whitespace characters only", displayName)); + } else { + builder.valid(true); + } + + return builder.build(); + } + } } diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/storage/AzureBlobStorageFileResourceService.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/storage/AzureBlobStorageFileResourceService.java new file mode 100644 index 0000000000..ef2b0fb1a4 --- /dev/null +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/storage/AzureBlobStorageFileResourceService.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.services.azure.storage; + +import com.azure.storage.blob.BlobClient; +import com.azure.storage.blob.BlobContainerClient; +import com.azure.storage.blob.BlobServiceClient; +import com.azure.storage.blob.models.BlobStorageException; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.documentation.UseCase; +import org.apache.nifi.annotation.lifecycle.OnDisabled; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.fileresource.service.api.FileResource; +import org.apache.nifi.fileresource.service.api.FileResourceService; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processors.azure.AbstractAzureBlobProcessor_v12; +import org.apache.nifi.processors.azure.storage.FetchAzureBlobStorage_v12; +import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils; +import org.apache.nifi.processors.azure.storage.utils.BlobServiceClientFactory; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +import static org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.BLOB_STORAGE_CREDENTIALS_SERVICE; +import static org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.getProxyOptions; +import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_NAME_BLOBNAME; +import static org.apache.nifi.processors.azure.storage.utils.BlobAttributes.ATTR_NAME_CONTAINER; +import static org.apache.nifi.util.StringUtils.isBlank; + +@Tags({"azure", "microsoft", "cloud", "storage", "file", "resource", "blob"}) +@SeeAlso({FetchAzureBlobStorage_v12.class}) +@CapabilityDescription("Provides an Azure Blob Storage file resource for other components.") +@UseCase( + description = "Fetch a specific file from Azure Blob Storage." + + " The service provides higher performance compared to fetch processors when the data should be moved between different storages without any transformation.", + configuration = """ + "Container Name" = "${azure.container}" + "Blob Name" = "${azure.blobname}" + + The "Storage Credentials" property should specify an instance of the AzureStorageCredentialsService_v12 in order to provide credentials for accessing the storage container. + """ +) +public class AzureBlobStorageFileResourceService extends AbstractControllerService implements FileResourceService { + + public static final PropertyDescriptor CONTAINER = new PropertyDescriptor.Builder() + .fromPropertyDescriptor(AzureStorageUtils.CONTAINER) + .defaultValue(String.format("${%s}", ATTR_NAME_CONTAINER)) + .build(); + + public static final PropertyDescriptor BLOB_NAME = new PropertyDescriptor.Builder() + .fromPropertyDescriptor(AbstractAzureBlobProcessor_v12.BLOB_NAME) + .defaultValue(String.format("${%s}", ATTR_NAME_BLOBNAME)) + .build(); + + private static final List<PropertyDescriptor> PROPERTIES = List.of( + BLOB_STORAGE_CREDENTIALS_SERVICE, + CONTAINER, + BLOB_NAME + ); + + private volatile BlobServiceClientFactory clientFactory; + private volatile ConfigurationContext context; + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return PROPERTIES; + } + + @OnEnabled + public void onEnabled(final ConfigurationContext context) { + this.clientFactory = new BlobServiceClientFactory(getLogger(), getProxyOptions(context)); + this.context = context; + } + + @OnDisabled + public void onDisabled() { + this.clientFactory = null; + this.context = null; + } + + @Override + public FileResource getFileResource(Map<String, String> attributes) { + final BlobServiceClient client = getStorageClient(attributes); + try { + return fetchBlob(client, attributes); + } catch (final BlobStorageException | IOException e) { + throw new ProcessException("Failed to fetch blob from Azure Blob Storage", e); + } + } + + protected BlobServiceClient getStorageClient(Map<String, String> attributes) { + final AzureStorageCredentialsService_v12 credentialsService = context.getProperty(BLOB_STORAGE_CREDENTIALS_SERVICE) + .asControllerService(AzureStorageCredentialsService_v12.class); + return clientFactory.getStorageClient(credentialsService.getCredentialsDetails(attributes)); + } + + /** + * Fetching blob from the provided container. + * + * @param storageClient azure blob storage client + * @param attributes configuration attributes + * @return fetched blob as FileResource + * @throws IOException exception caused by missing parameters or blob not found + */ + private FileResource fetchBlob(final BlobServiceClient storageClient, final Map<String, String> attributes) throws IOException { + final String containerName = context.getProperty(CONTAINER).evaluateAttributeExpressions(attributes).getValue(); + final String blobName = context.getProperty(BLOB_NAME).evaluateAttributeExpressions(attributes).getValue(); + + if (isBlank(containerName) || isBlank(blobName)) { + throw new ProcessException("Container name and blob name cannot be empty"); + } + + final BlobContainerClient containerClient = storageClient.getBlobContainerClient(containerName); + final BlobClient blobClient = containerClient.getBlobClient(blobName); + if (!blobClient.exists()) { + throw new ProcessException(String.format("Blob %s/%s not found", containerName, blobName)); + } + return new FileResource(blobClient.openInputStream(), blobClient.getProperties().getBlobSize()); + } +} diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/storage/AzureDataLakeStorageFileResourceService.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/storage/AzureDataLakeStorageFileResourceService.java new file mode 100644 index 0000000000..f9c9ae8188 --- /dev/null +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/services/azure/storage/AzureDataLakeStorageFileResourceService.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.services.azure.storage; + +import com.azure.storage.file.datalake.DataLakeDirectoryClient; +import com.azure.storage.file.datalake.DataLakeFileClient; +import com.azure.storage.file.datalake.DataLakeFileSystemClient; +import com.azure.storage.file.datalake.DataLakeServiceClient; +import com.azure.storage.file.datalake.models.DataLakeStorageException; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.documentation.UseCase; +import org.apache.nifi.annotation.lifecycle.OnDisabled; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.fileresource.service.api.FileResource; +import org.apache.nifi.fileresource.service.api.FileResourceService; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processors.azure.storage.FetchAzureDataLakeStorage; +import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils; +import org.apache.nifi.processors.azure.storage.utils.DataLakeServiceClientFactory; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_DIRECTORY; +import static org.apache.nifi.processors.azure.storage.utils.ADLSAttributes.ATTR_NAME_FILESYSTEM; +import static org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.ADLS_CREDENTIALS_SERVICE; +import static org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.FILE; +import static org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.evaluateDirectoryProperty; +import static org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.evaluateFileProperty; +import static org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.evaluateFileSystemProperty; +import static org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.getProxyOptions; + +@Tags({"azure", "microsoft", "cloud", "storage", "adlsgen2", "file", "resource", "datalake"}) +@SeeAlso({FetchAzureDataLakeStorage.class}) +@CapabilityDescription("Provides an Azure Data Lake Storage (ADLS) file resource for other components.") +@UseCase( + description = "Fetch the specified file from Azure Data Lake Storage." + + " The service provides higher performance compared to fetch processors when the data should be moved between different storages without any transformation.", + configuration = """ + "Filesystem Name" = "${azure.filesystem}" + "Directory Name" = "${azure.directory}" + "File Name" = "${azure.filename}" + + The "ADLS Credentials" property should specify an instance of the ADLSCredentialsService in order to provide credentials for accessing the filesystem. + """ +) +public class AzureDataLakeStorageFileResourceService extends AbstractControllerService implements FileResourceService { + + public static final PropertyDescriptor FILESYSTEM = new PropertyDescriptor.Builder() + .fromPropertyDescriptor(AzureStorageUtils.FILESYSTEM) + .defaultValue(String.format("${%s}", ATTR_NAME_FILESYSTEM)) + .build(); + + public static final PropertyDescriptor DIRECTORY = new PropertyDescriptor.Builder() + .fromPropertyDescriptor(AzureStorageUtils.DIRECTORY) + .defaultValue(String.format("${%s}", ATTR_NAME_DIRECTORY)) + .build(); + + private static final List<PropertyDescriptor> PROPERTIES = List.of( + ADLS_CREDENTIALS_SERVICE, + FILESYSTEM, + DIRECTORY, + FILE + ); + + private volatile DataLakeServiceClientFactory clientFactory; + private volatile ConfigurationContext context; + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return PROPERTIES; + } + + @OnEnabled + public void onEnabled(final ConfigurationContext context) { + this.clientFactory = new DataLakeServiceClientFactory(getLogger(), getProxyOptions(context)); + this.context = context; + } + + @OnDisabled + public void onDisabled() { + this.clientFactory = null; + this.context = null; + } + + @Override + public FileResource getFileResource(Map<String, String> attributes) { + final DataLakeServiceClient client = getStorageClient(attributes); + try { + return fetchFile(client, attributes); + } catch (final DataLakeStorageException | IOException e) { + throw new ProcessException("Failed to fetch file from ADLS Storage", e); + } + } + + protected DataLakeServiceClient getStorageClient(Map<String, String> attributes) { + final ADLSCredentialsService credentialsService = context.getProperty(ADLS_CREDENTIALS_SERVICE) + .asControllerService(ADLSCredentialsService.class); + return clientFactory.getStorageClient(credentialsService.getCredentialsDetails(attributes)); + } + + /** + * Fetching file from the provided filesystem and directory in ADLS. + * + * @param storageClient azure data lake service client + * @param attributes configuration attributes + * @return fetched file as FileResource + * @throws IOException exception caused by missing parameters or blob not found + */ + private FileResource fetchFile(final DataLakeServiceClient storageClient, final Map<String, String> attributes) throws IOException { + final String fileSystem = evaluateFileSystemProperty(FILESYSTEM, context, attributes); + final String directory = evaluateDirectoryProperty(DIRECTORY, context, attributes); + final String file = evaluateFileProperty(context, attributes); + + final DataLakeFileSystemClient fileSystemClient = storageClient.getFileSystemClient(fileSystem); + final DataLakeDirectoryClient directoryClient = fileSystemClient.getDirectoryClient(directory); + final DataLakeFileClient fileClient = directoryClient.getFileClient(file); + + if (fileClient.getProperties().isDirectory()) { + throw new ProcessException(FILE.getDisplayName() + " (" + file + ") points to a directory. Full path: " + fileClient.getFilePath()); + } + + if (!fileClient.exists()) { + throw new ProcessException(String.format("File %s/%s not found in file system: %s", directory, file, fileSystem)); + } + + return new FileResource(fileClient.openInputStream().getInputStream(), + fileClient.getProperties().getFileSize()); + } +} diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService index 92f9ddbe5f..2823c4f3fc 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService @@ -21,3 +21,5 @@ org.apache.nifi.services.azure.data.explorer.StandardKustoIngestService org.apache.nifi.services.azure.storage.AzureStorageCredentialsControllerService_v12 org.apache.nifi.services.azure.storage.AzureStorageCredentialsControllerServiceLookup_v12 org.apache.nifi.services.azure.StandardAzureCredentialsControllerService +org.apache.nifi.services.azure.storage.AzureBlobStorageFileResourceService +org.apache.nifi.services.azure.storage.AzureDataLakeStorageFileResourceService diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureBlobStorage_v12IT.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureBlobStorage_v12IT.java index 058ee2f0a7..1899dad4bc 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureBlobStorage_v12IT.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureBlobStorage_v12IT.java @@ -51,6 +51,7 @@ import java.util.Map; import java.util.UUID; import static org.apache.nifi.processors.azure.AzureServiceEndpoints.DEFAULT_BLOB_ENDPOINT_SUFFIX; +import static org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.BLOB_STORAGE_CREDENTIALS_SERVICE; public abstract class AbstractAzureBlobStorage_v12IT extends AbstractAzureStorageIT { protected static final String SERVICE_ID = "credentials-service"; @@ -90,7 +91,7 @@ public abstract class AbstractAzureBlobStorage_v12IT extends AbstractAzureStorag runner.setProperty(service, AzureStorageUtils.ACCOUNT_KEY, getAccountKey()); runner.enableControllerService(service); - runner.setProperty(AbstractAzureBlobProcessor_v12.STORAGE_CREDENTIALS_SERVICE, SERVICE_ID); + runner.setProperty(BLOB_STORAGE_CREDENTIALS_SERVICE, SERVICE_ID); } @BeforeEach diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureDataLakeStorageIT.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureDataLakeStorageIT.java index e683e6f697..8e94485a1f 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureDataLakeStorageIT.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/AbstractAzureDataLakeStorageIT.java @@ -23,7 +23,6 @@ import com.azure.storage.file.datalake.DataLakeFileSystemClient; import com.azure.storage.file.datalake.DataLakeServiceClient; import com.azure.storage.file.datalake.DataLakeServiceClientBuilder; import org.apache.commons.lang3.StringUtils; -import org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor; import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils; import org.apache.nifi.services.azure.storage.ADLSCredentialsControllerService; import org.apache.nifi.services.azure.storage.ADLSCredentialsService; @@ -58,14 +57,14 @@ public abstract class AbstractAzureDataLakeStorageIT extends AbstractAzureStorag runner.setProperty(service, AzureStorageUtils.ACCOUNT_KEY, getAccountKey()); runner.enableControllerService(service); - runner.setProperty(AbstractAzureDataLakeStorageProcessor.ADLS_CREDENTIALS_SERVICE, "ADLSCredentials"); + runner.setProperty(AzureStorageUtils.ADLS_CREDENTIALS_SERVICE, "ADLSCredentials"); } @BeforeEach public void setUpAzureDataLakeStorageIT() { fileSystemName = String.format("%s-%s", FILESYSTEM_NAME_PREFIX, UUID.randomUUID()); - runner.setProperty(AbstractAzureDataLakeStorageProcessor.FILESYSTEM, fileSystemName); + runner.setProperty(AzureStorageUtils.FILESYSTEM, fileSystemName); DataLakeServiceClient storageClient = createStorageClient(); fileSystemClient = storageClient.createFileSystem(fileSystemName); diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITDeleteAzureDataLakeStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITDeleteAzureDataLakeStorage.java index 0443e78ac5..fc30522ef6 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITDeleteAzureDataLakeStorage.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITDeleteAzureDataLakeStorage.java @@ -21,6 +21,7 @@ import com.azure.storage.file.datalake.DataLakeFileClient; import com.azure.storage.file.datalake.models.DataLakeStorageException; import org.apache.nifi.processor.Processor; import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils; import org.apache.nifi.provenance.ProvenanceEventType; import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.util.MockFlowFile; @@ -460,8 +461,8 @@ public class ITDeleteAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT private void setRunnerProperties(String fileSystem, String directory, String filename) { runner.setProperty(DeleteAzureDataLakeStorage.FILESYSTEM_OBJECT_TYPE, filename != null ? FS_TYPE_FILE : FS_TYPE_DIRECTORY); - runner.setProperty(DeleteAzureDataLakeStorage.FILESYSTEM, fileSystem); - runner.setProperty(DeleteAzureDataLakeStorage.DIRECTORY, directory); + runner.setProperty(AzureStorageUtils.FILESYSTEM, fileSystem); + runner.setProperty(AzureStorageUtils.DIRECTORY, directory); if (filename != null) { runner.setProperty(DeleteAzureDataLakeStorage.FILE, filename); } diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITFetchAzureDataLakeStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITFetchAzureDataLakeStorage.java index 54403b6183..767ad58d15 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITFetchAzureDataLakeStorage.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITFetchAzureDataLakeStorage.java @@ -19,6 +19,7 @@ package org.apache.nifi.processors.azure.storage; import com.azure.storage.file.datalake.models.DataLakeStorageException; import org.apache.nifi.processor.Processor; import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils; import org.apache.nifi.provenance.ProvenanceEventRecord; import org.apache.nifi.provenance.ProvenanceEventType; import org.apache.nifi.reporting.InitializationException; @@ -518,9 +519,9 @@ public class ITFetchAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT } private void setRunnerProperties(String fileSystem, String directory, String filename, String rangeStart, String rangeLength) { - runner.setProperty(FetchAzureDataLakeStorage.FILESYSTEM, fileSystem); - runner.setProperty(FetchAzureDataLakeStorage.DIRECTORY, directory); - runner.setProperty(FetchAzureDataLakeStorage.FILE, filename); + runner.setProperty(AzureStorageUtils.FILESYSTEM, fileSystem); + runner.setProperty(AzureStorageUtils.DIRECTORY, directory); + runner.setProperty(AzureStorageUtils.FILE, filename); if (rangeStart != null) { runner.setProperty(FetchAzureDataLakeStorage.RANGE_START, rangeStart); diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITListAzureDataLakeStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITListAzureDataLakeStorage.java index f4259e5605..abfd5f5feb 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITListAzureDataLakeStorage.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITListAzureDataLakeStorage.java @@ -18,6 +18,7 @@ package org.apache.nifi.processors.azure.storage; import org.apache.nifi.processor.Processor; import org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor; +import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils; import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.serialization.record.MockRecordWriter; import org.apache.nifi.util.MockFlowFile; @@ -99,7 +100,7 @@ public class ITListAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT { @Test public void testListRootRecursive() { - runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, ""); + runner.setProperty(AzureStorageUtils.DIRECTORY, ""); runProcessor(); @@ -108,7 +109,7 @@ public class ITListAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT { @Test public void testListRootRecursiveWithTempFiles() throws Exception { - runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, ""); + runner.setProperty(AzureStorageUtils.DIRECTORY, ""); runner.setProperty(ListAzureDataLakeStorage.INCLUDE_TEMPORARY_FILES, "true"); runProcessor(); @@ -122,7 +123,7 @@ public class ITListAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT { @Test public void testListRootRecursiveUsingProxyConfigurationService() throws Exception { - runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, ""); + runner.setProperty(AzureStorageUtils.DIRECTORY, ""); configureProxyService(); runProcessor(); @@ -132,7 +133,7 @@ public class ITListAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT { @Test public void testListRootNonRecursive() { - runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, ""); + runner.setProperty(AzureStorageUtils.DIRECTORY, ""); runner.setProperty(ListAzureDataLakeStorage.RECURSE_SUBDIRECTORIES, "false"); runProcessor(); @@ -142,7 +143,7 @@ public class ITListAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT { @Test public void testListRootNonRecursiveWithTempFiles() throws Exception { - runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, ""); + runner.setProperty(AzureStorageUtils.DIRECTORY, ""); runner.setProperty(ListAzureDataLakeStorage.RECURSE_SUBDIRECTORIES, "false"); runner.setProperty(ListAzureDataLakeStorage.INCLUDE_TEMPORARY_FILES, "true"); @@ -153,7 +154,7 @@ public class ITListAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT { @Test public void testListSubdirectoryRecursive() { - runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, "dir1"); + runner.setProperty(AzureStorageUtils.DIRECTORY, "dir1"); runProcessor(); @@ -162,7 +163,7 @@ public class ITListAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT { @Test public void testListSubdirectoryRecursiveWithTempFiles() throws Exception { - runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, "dir1"); + runner.setProperty(AzureStorageUtils.DIRECTORY, "dir1"); runner.setProperty(ListAzureDataLakeStorage.INCLUDE_TEMPORARY_FILES, "true"); runProcessor(); @@ -174,7 +175,7 @@ public class ITListAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT { @Test public void testListSubdirectoryNonRecursive() { - runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, "dir1"); + runner.setProperty(AzureStorageUtils.DIRECTORY, "dir1"); runner.setProperty(ListAzureDataLakeStorage.RECURSE_SUBDIRECTORIES, "false"); runProcessor(); @@ -184,7 +185,7 @@ public class ITListAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT { @Test public void testListSubdirectoryNonRecursiveWithTempFiles() throws Exception { - runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, "dir1"); + runner.setProperty(AzureStorageUtils.DIRECTORY, "dir1"); runner.setProperty(ListAzureDataLakeStorage.RECURSE_SUBDIRECTORIES, "false"); runner.setProperty(ListAzureDataLakeStorage.INCLUDE_TEMPORARY_FILES, "true"); @@ -195,7 +196,7 @@ public class ITListAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT { @Test public void testListWithFileFilter() { - runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, ""); + runner.setProperty(AzureStorageUtils.DIRECTORY, ""); runner.setProperty(ListAzureDataLakeStorage.FILE_FILTER, ".*file1.*$"); runProcessor(); @@ -205,7 +206,7 @@ public class ITListAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT { @Test public void testListWithFileFilterWithTempFiles() throws Exception { - runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, ""); + runner.setProperty(AzureStorageUtils.DIRECTORY, ""); runner.setProperty(ListAzureDataLakeStorage.FILE_FILTER, ".*file1.*$"); runner.setProperty(ListAzureDataLakeStorage.INCLUDE_TEMPORARY_FILES, "true"); @@ -219,7 +220,7 @@ public class ITListAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT { @Test public void testListWithFileFilterWithEL() { - runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, ""); + runner.setProperty(AzureStorageUtils.DIRECTORY, ""); runner.setProperty(ListAzureDataLakeStorage.FILE_FILTER, ".*file${suffix}$"); runner.setEnvironmentVariableValue("suffix", "1.*"); @@ -230,7 +231,7 @@ public class ITListAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT { @Test public void testListWithFileFilterWithELWithTempFiles() throws Exception { - runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, ""); + runner.setProperty(AzureStorageUtils.DIRECTORY, ""); runner.setProperty(ListAzureDataLakeStorage.FILE_FILTER, ".*file${suffix}$"); runner.setEnvironmentVariableValue("suffix", "1.*"); runner.setProperty(ListAzureDataLakeStorage.INCLUDE_TEMPORARY_FILES, "true"); @@ -245,7 +246,7 @@ public class ITListAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT { @Test public void testListRootWithPathFilter() { - runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, ""); + runner.setProperty(AzureStorageUtils.DIRECTORY, ""); runner.setProperty(ListAzureDataLakeStorage.PATH_FILTER, "^dir1.*$"); runProcessor(); @@ -255,7 +256,7 @@ public class ITListAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT { @Test public void testListRootWithPathFilterWithTempFiles() throws Exception { - runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, ""); + runner.setProperty(AzureStorageUtils.DIRECTORY, ""); runner.setProperty(ListAzureDataLakeStorage.PATH_FILTER, "^dir1.*$"); runner.setProperty(ListAzureDataLakeStorage.INCLUDE_TEMPORARY_FILES, "true"); @@ -268,7 +269,7 @@ public class ITListAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT { @Test public void testListRootWithPathFilterWithEL() { - runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, ""); + runner.setProperty(AzureStorageUtils.DIRECTORY, ""); runner.setProperty(ListAzureDataLakeStorage.PATH_FILTER, "${prefix}${suffix}"); runner.setEnvironmentVariableValue("prefix", "^dir"); runner.setEnvironmentVariableValue("suffix", "1.*$"); @@ -280,7 +281,7 @@ public class ITListAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT { @Test public void testListRootWithPathFilterWithELWithTempFiles() throws Exception { - runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, ""); + runner.setProperty(AzureStorageUtils.DIRECTORY, ""); runner.setProperty(ListAzureDataLakeStorage.PATH_FILTER, "${prefix}${suffix}"); runner.setEnvironmentVariableValue("prefix", "^dir"); runner.setEnvironmentVariableValue("suffix", "1.*$"); @@ -295,7 +296,7 @@ public class ITListAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT { @Test public void testListSubdirectoryWithPathFilter() { - runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, "dir1"); + runner.setProperty(AzureStorageUtils.DIRECTORY, "dir1"); runner.setProperty(ListAzureDataLakeStorage.PATH_FILTER, "dir1.*"); runProcessor(); @@ -305,7 +306,7 @@ public class ITListAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT { @Test public void testListSubdirectoryWithPathFilterWithTempFiles() throws Exception { - runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, "dir1"); + runner.setProperty(AzureStorageUtils.DIRECTORY, "dir1"); runner.setProperty(ListAzureDataLakeStorage.PATH_FILTER, "dir1.*"); runner.setProperty(ListAzureDataLakeStorage.INCLUDE_TEMPORARY_FILES, "true"); @@ -316,7 +317,7 @@ public class ITListAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT { @Test public void testListRootWithFileAndPathFilter() { - runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, ""); + runner.setProperty(AzureStorageUtils.DIRECTORY, ""); runner.setProperty(ListAzureDataLakeStorage.FILE_FILTER, ".*11"); runner.setProperty(ListAzureDataLakeStorage.PATH_FILTER, "dir1.*"); @@ -327,7 +328,7 @@ public class ITListAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT { @Test public void testListRootWithFileAndPathFilterWithTempFiles() throws Exception { - runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, ""); + runner.setProperty(AzureStorageUtils.DIRECTORY, ""); runner.setProperty(ListAzureDataLakeStorage.FILE_FILTER, ".*11"); runner.setProperty(ListAzureDataLakeStorage.PATH_FILTER, "dir1.*"); runner.setProperty(ListAzureDataLakeStorage.INCLUDE_TEMPORARY_FILES, "true"); @@ -340,7 +341,7 @@ public class ITListAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT { @Test public void testListEmptyDirectory() { - runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, "dir3"); + runner.setProperty(AzureStorageUtils.DIRECTORY, "dir3"); runProcessor(); @@ -349,7 +350,7 @@ public class ITListAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT { @Test public void testListNonExistingDirectory() { - runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, "dummy"); + runner.setProperty(AzureStorageUtils.DIRECTORY, "dummy"); runProcessor(); @@ -358,8 +359,8 @@ public class ITListAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT { @Test public void testListWithNonExistingFileSystem() { - runner.setProperty(AbstractAzureDataLakeStorageProcessor.FILESYSTEM, "dummy"); - runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, ""); + runner.setProperty(AzureStorageUtils.FILESYSTEM, "dummy"); + runner.setProperty(AzureStorageUtils.DIRECTORY, ""); runProcessor(); @@ -368,7 +369,7 @@ public class ITListAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT { @Test public void testListWithRecords() throws InitializationException { - runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, "dir1"); + runner.setProperty(AzureStorageUtils.DIRECTORY, "dir1"); MockRecordWriter recordWriter = new MockRecordWriter(null, false); runner.addControllerService("record-writer", recordWriter); @@ -384,7 +385,7 @@ public class ITListAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT { @Test public void testListWithRecordsWithTempFiles() throws InitializationException { - runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, "dir1"); + runner.setProperty(AzureStorageUtils.DIRECTORY, "dir1"); MockRecordWriter recordWriter = new MockRecordWriter(null, false); runner.addControllerService("record-writer", recordWriter); @@ -402,7 +403,7 @@ public class ITListAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT { @Test public void testListWithMinAge() { - runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, ""); + runner.setProperty(AzureStorageUtils.DIRECTORY, ""); runner.setProperty(ListAzureDataLakeStorage.MIN_AGE, "1 hour"); runProcessor(); @@ -412,7 +413,7 @@ public class ITListAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT { @Test public void testListWithMinAgeWithTempFiles() throws Exception { - runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, ""); + runner.setProperty(AzureStorageUtils.DIRECTORY, ""); runner.setProperty(ListAzureDataLakeStorage.MIN_AGE, "1 hour"); runner.setProperty(ListAzureDataLakeStorage.INCLUDE_TEMPORARY_FILES, "true"); @@ -423,7 +424,7 @@ public class ITListAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT { @Test public void testListWithMaxAge() { - runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, ""); + runner.setProperty(AzureStorageUtils.DIRECTORY, ""); runner.setProperty(ListAzureDataLakeStorage.MAX_AGE, "1 hour"); runProcessor(); @@ -433,7 +434,7 @@ public class ITListAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT { @Test public void testListWithMaxAgeWithTempFiles() throws Exception { - runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, ""); + runner.setProperty(AzureStorageUtils.DIRECTORY, ""); runner.setProperty(ListAzureDataLakeStorage.MAX_AGE, "1 hour"); runner.setProperty(ListAzureDataLakeStorage.INCLUDE_TEMPORARY_FILES, "true"); @@ -448,7 +449,7 @@ public class ITListAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT { @Test public void testListWithMinSize() { - runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, ""); + runner.setProperty(AzureStorageUtils.DIRECTORY, ""); runner.setProperty(ListAzureDataLakeStorage.MIN_SIZE, "5 B"); runProcessor(); @@ -458,7 +459,7 @@ public class ITListAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT { @Test public void testListWithMinSizeWithTempFiles() throws Exception { - runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, ""); + runner.setProperty(AzureStorageUtils.DIRECTORY, ""); runner.setProperty(ListAzureDataLakeStorage.MIN_SIZE, "5 B"); runner.setProperty(ListAzureDataLakeStorage.INCLUDE_TEMPORARY_FILES, "true"); @@ -472,7 +473,7 @@ public class ITListAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT { @Test public void testListWithMaxSize() { - runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, ""); + runner.setProperty(AzureStorageUtils.DIRECTORY, ""); runner.setProperty(ListAzureDataLakeStorage.MAX_SIZE, "5 B"); runProcessor(); @@ -482,7 +483,7 @@ public class ITListAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT { @Test public void testListWithMaxSizeWithTempFiles() throws Exception { - runner.setProperty(AbstractAzureDataLakeStorageProcessor.DIRECTORY, ""); + runner.setProperty(AzureStorageUtils.DIRECTORY, ""); runner.setProperty(ListAzureDataLakeStorage.MAX_SIZE, "5 B"); runner.setProperty(ListAzureDataLakeStorage.INCLUDE_TEMPORARY_FILES, "true"); diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITMoveAzureDataLakeStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITMoveAzureDataLakeStorage.java index 37faa6fd9b..751ff36f4c 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITMoveAzureDataLakeStorage.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITMoveAzureDataLakeStorage.java @@ -20,6 +20,7 @@ import com.azure.storage.file.datalake.DataLakeDirectoryClient; import com.azure.storage.file.datalake.DataLakeFileClient; import org.apache.commons.lang3.StringUtils; import org.apache.nifi.processor.Processor; +import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils; import org.apache.nifi.provenance.ProvenanceEventRecord; import org.apache.nifi.provenance.ProvenanceEventType; import org.apache.nifi.util.MockFlowFile; @@ -66,7 +67,7 @@ public class ITMoveAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT { runner.setProperty(MoveAzureDataLakeStorage.SOURCE_DIRECTORY, SOURCE_DIRECTORY); runner.setProperty(MoveAzureDataLakeStorage.DESTINATION_FILESYSTEM, fileSystemName); runner.setProperty(MoveAzureDataLakeStorage.DESTINATION_DIRECTORY, DESTINATION_DIRECTORY); - runner.setProperty(MoveAzureDataLakeStorage.FILE, FILE_NAME); + runner.setProperty(AzureStorageUtils.FILE, FILE_NAME); } @Test @@ -187,7 +188,7 @@ public class ITMoveAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT { public void testMoveFileWithInvalidFileName() { createDirectoryAndUploadFile(SOURCE_DIRECTORY, FILE_NAME, FILE_DATA); - runner.setProperty(MoveAzureDataLakeStorage.FILE, "/file1"); + runner.setProperty(AzureStorageUtils.FILE, "/file1"); runProcessor(FILE_DATA); @@ -203,7 +204,7 @@ public class ITMoveAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT { runner.setProperty(MoveAzureDataLakeStorage.SOURCE_DIRECTORY, sourceDirectory); runner.setProperty(MoveAzureDataLakeStorage.DESTINATION_DIRECTORY, destinationDirectory); - runner.setProperty(MoveAzureDataLakeStorage.FILE, fileName); + runner.setProperty(AzureStorageUtils.FILE, fileName); runProcessor(FILE_DATA); @@ -296,7 +297,7 @@ public class ITMoveAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT { private void setELProperties() { runner.setProperty(MoveAzureDataLakeStorage.SOURCE_FILESYSTEM, String.format("${%s}", EL_FILESYSTEM)); runner.setProperty(MoveAzureDataLakeStorage.SOURCE_DIRECTORY, String.format("${%s}", EL_DIRECTORY)); - runner.setProperty(MoveAzureDataLakeStorage.FILE, String.format("${%s}", EL_FILE_NAME)); + runner.setProperty(AzureStorageUtils.FILE, String.format("${%s}", EL_FILE_NAME)); } private void runProcessor(byte[] fileData) { diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureDataLakeStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureDataLakeStorage.java index 9d22d2f88c..4a59a9b3a4 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureDataLakeStorage.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureDataLakeStorage.java @@ -22,6 +22,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.nifi.fileresource.service.StandardFileResourceService; import org.apache.nifi.fileresource.service.api.FileResourceService; import org.apache.nifi.processor.Processor; +import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils; import org.apache.nifi.processors.transfer.ResourceTransferProperties; import org.apache.nifi.processors.transfer.ResourceTransferSource; import org.apache.nifi.provenance.ProvenanceEventRecord; @@ -64,8 +65,8 @@ public class ITPutAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT { @BeforeEach public void setUp() { - runner.setProperty(PutAzureDataLakeStorage.DIRECTORY, DIRECTORY); - runner.setProperty(PutAzureDataLakeStorage.FILE, FILE_NAME); + runner.setProperty(AzureStorageUtils.DIRECTORY, DIRECTORY); + runner.setProperty(AzureStorageUtils.FILE, FILE_NAME); } @Test @@ -121,7 +122,7 @@ public class ITPutAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT { String baseDirectory = "dir1/dir2"; String fullDirectory = baseDirectory + "/dir3/dir4"; fileSystemClient.createDirectory(baseDirectory); - runner.setProperty(PutAzureDataLakeStorage.DIRECTORY, fullDirectory); + runner.setProperty(AzureStorageUtils.DIRECTORY, fullDirectory); runProcessor(FILE_DATA); @@ -131,7 +132,7 @@ public class ITPutAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT { @Test public void testPutFileToRootDirectory() throws Exception { String rootDirectory = ""; - runner.setProperty(PutAzureDataLakeStorage.DIRECTORY, rootDirectory); + runner.setProperty(AzureStorageUtils.DIRECTORY, rootDirectory); runProcessor(FILE_DATA); @@ -160,7 +161,7 @@ public class ITPutAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT { @Test public void testPutFileWithNonExistingFileSystem() { - runner.setProperty(PutAzureDataLakeStorage.FILESYSTEM, "dummy"); + runner.setProperty(AzureStorageUtils.FILESYSTEM, "dummy"); runProcessor(FILE_DATA); @@ -169,7 +170,7 @@ public class ITPutAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT { @Test public void testPutFileWithInvalidFileName() { - runner.setProperty(PutAzureDataLakeStorage.FILE, "/file1"); + runner.setProperty(AzureStorageUtils.FILE, "/file1"); runProcessor(FILE_DATA); @@ -180,8 +181,8 @@ public class ITPutAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT { public void testPutFileWithSpacesInDirectoryAndFileName() throws Exception { String directory = "dir 1"; String fileName = "file 1"; - runner.setProperty(PutAzureDataLakeStorage.DIRECTORY, directory); - runner.setProperty(PutAzureDataLakeStorage.FILE, fileName); + runner.setProperty(AzureStorageUtils.DIRECTORY, directory); + runner.setProperty(AzureStorageUtils.FILE, fileName); runProcessor(FILE_DATA); @@ -290,9 +291,9 @@ public class ITPutAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT { } private void setELProperties() { - runner.setProperty(PutAzureDataLakeStorage.FILESYSTEM, String.format("${%s}", EL_FILESYSTEM)); - runner.setProperty(PutAzureDataLakeStorage.DIRECTORY, String.format("${%s}", EL_DIRECTORY)); - runner.setProperty(PutAzureDataLakeStorage.FILE, String.format("${%s}", EL_FILE_NAME)); + runner.setProperty(AzureStorageUtils.FILESYSTEM, String.format("${%s}", EL_FILESYSTEM)); + runner.setProperty(AzureStorageUtils.DIRECTORY, String.format("${%s}", EL_DIRECTORY)); + runner.setProperty(AzureStorageUtils.FILE, String.format("${%s}", EL_FILE_NAME)); } private void runProcessor(byte[] fileData) { diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/TestAbstractAzureDataLakeStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/TestAbstractAzureDataLakeStorage.java index 4bf683fb20..68b1cf180f 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/TestAbstractAzureDataLakeStorage.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/TestAbstractAzureDataLakeStorage.java @@ -22,10 +22,10 @@ import org.apache.nifi.util.TestRunners; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import static org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor.ADLS_CREDENTIALS_SERVICE; -import static org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor.DIRECTORY; -import static org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor.FILE; -import static org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor.FILESYSTEM; +import static org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.ADLS_CREDENTIALS_SERVICE; +import static org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.DIRECTORY; +import static org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.FILE; +import static org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.FILESYSTEM; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/services/azure/storage/AzureBlobStorageFileResourceServiceTest.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/services/azure/storage/AzureBlobStorageFileResourceServiceTest.java new file mode 100644 index 0000000000..30193c3aa0 --- /dev/null +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/services/azure/storage/AzureBlobStorageFileResourceServiceTest.java @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.services.azure.storage; + +import com.azure.storage.blob.BlobClient; +import com.azure.storage.blob.BlobContainerClient; +import com.azure.storage.blob.BlobServiceClient; +import com.azure.storage.blob.models.BlobProperties; +import com.azure.storage.blob.specialized.BlobInputStream; +import org.apache.nifi.fileresource.service.api.FileResource; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.services.azure.AzureCredentialsService; +import org.apache.nifi.services.azure.StandardAzureCredentialsControllerService; +import org.apache.nifi.util.NoOpProcessor; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import java.util.Map; + +import static org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.BLOB_STORAGE_CREDENTIALS_SERVICE; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +class AzureBlobStorageFileResourceServiceTest { + private static final String CONTROLLER_SERVICE = "AzureCredentialsService"; + private static final String CONTAINER = "container-name"; + private static final String BLOB_NAME = "test-file"; + private static final long CONTENT_LENGTH = 10L; + + @Mock + private BlobServiceClient client; + + @Mock + private BlobContainerClient containerClient; + + @Mock + private BlobClient blobClient; + + @Mock + private BlobProperties blobProperties; + + @Mock + private BlobInputStream blobInputStream; + + @InjectMocks + private TestAzureBlobStorageFileResourceService service; + + private TestRunner runner; + + @BeforeEach + void setup() throws InitializationException { + runner = TestRunners.newTestRunner(NoOpProcessor.class); + runner.addControllerService("AzureBlobStorageFileResourceService", service); + } + + @Test + void testValidBlob() throws InitializationException { + setupService(); + setupMocking(CONTAINER, BLOB_NAME); + + final FileResource fileResource = service.getFileResource(Map.of()); + + assertFileResource(fileResource); + verifyMockInvocations(CONTAINER, BLOB_NAME); + } + + @Test + void testValidBlobWithEL() throws InitializationException { + String customContainer = "custom-container"; + String customBlobName = "custom-blob-name"; + String blobKey = "blob.name"; + String containerKey = "container.name"; + setupService(String.format("${%s}", blobKey), String.format("${%s}", containerKey)); + setupMocking(customContainer, customBlobName); + runner.setValidateExpressionUsage(false); + + final FileResource fileResource = service.getFileResource(Map.of( + blobKey, customBlobName, + containerKey, customContainer)); + + assertFileResource(fileResource); + verifyMockInvocations(customContainer, customBlobName); + } + + @Test + void testNonExistingBlob() throws InitializationException { + setupService(); + when(client.getBlobContainerClient(CONTAINER)).thenReturn(containerClient); + when(containerClient.getBlobClient(BLOB_NAME)).thenReturn(blobClient); + when(blobClient.exists()).thenReturn(false); + + assertThrows(ProcessException.class, + () -> service.getFileResource(Map.of()), + "Failed to fetch blob from Azure Blob Storage"); + } + + @Test + void testELWithMissingAttribute() throws InitializationException { + runner.setValidateExpressionUsage(false); + + setupService(String.format("${%s}", BLOB_NAME), String.format("${%s}", CONTAINER)); + + assertThrows(ProcessException.class, + () -> service.getFileResource(Map.of()), + "Container name and blob name cannot be empty"); + } + + private void setupService() throws InitializationException { + setupService(BLOB_NAME, CONTAINER); + } + + private void setupService(String blobName, String container) throws InitializationException { + final AzureCredentialsService credentialsService = new StandardAzureCredentialsControllerService(); + + runner.addControllerService(CONTROLLER_SERVICE, credentialsService); + runner.enableControllerService(credentialsService); + + runner.setProperty(service, BLOB_STORAGE_CREDENTIALS_SERVICE, CONTROLLER_SERVICE); + runner.setProperty(service, AzureBlobStorageFileResourceService.BLOB_NAME, blobName); + runner.setProperty(service, AzureBlobStorageFileResourceService.CONTAINER, container); + + runner.enableControllerService(service); + } + + private void setupMocking(String container, String blobName) { + when(client.getBlobContainerClient(container)).thenReturn(containerClient); + when(containerClient.getBlobClient(blobName)).thenReturn(blobClient); + when(blobClient.exists()).thenReturn(true); + when(blobClient.getProperties()).thenReturn(blobProperties); + when(blobProperties.getBlobSize()).thenReturn(CONTENT_LENGTH); + when(blobClient.openInputStream()).thenReturn(blobInputStream); + } + + private void assertFileResource(FileResource fileResource) { + assertNotNull(fileResource); + assertEquals(fileResource.getInputStream(), blobInputStream); + assertEquals(fileResource.getSize(), CONTENT_LENGTH); + } + + private void verifyMockInvocations(String customContainer, String customBlobName) { + verify(client).getBlobContainerClient(customContainer); + verify(containerClient).getBlobClient(customBlobName); + verify(blobClient).exists(); + verify(blobClient).getProperties(); + verify(blobProperties).getBlobSize(); + verify(blobClient).openInputStream(); + verifyNoMoreInteractions(containerClient, blobClient, blobProperties); + } + + private static class TestAzureBlobStorageFileResourceService extends AzureBlobStorageFileResourceService { + + private final BlobServiceClient client; + + public TestAzureBlobStorageFileResourceService(BlobServiceClient client) { + this.client = client; + } + + @Override + protected BlobServiceClient getStorageClient(Map<String, String> attributes) { + return client; + } + } + +} diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/services/azure/storage/AzureDataLakeStorageFileResourceServiceTest.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/services/azure/storage/AzureDataLakeStorageFileResourceServiceTest.java new file mode 100644 index 0000000000..b879893875 --- /dev/null +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/services/azure/storage/AzureDataLakeStorageFileResourceServiceTest.java @@ -0,0 +1,251 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.services.azure.storage; + +import com.azure.storage.file.datalake.DataLakeDirectoryClient; +import com.azure.storage.file.datalake.DataLakeFileClient; +import com.azure.storage.file.datalake.DataLakeFileSystemClient; +import com.azure.storage.file.datalake.DataLakeServiceClient; +import com.azure.storage.file.datalake.models.DataLakeFileOpenInputStreamResult; +import com.azure.storage.file.datalake.models.PathProperties; +import org.apache.nifi.fileresource.service.api.FileResource; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.util.NoOpProcessor; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import java.io.InputStream; +import java.util.Map; + +import static org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils.ADLS_CREDENTIALS_SERVICE; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +class AzureDataLakeStorageFileResourceServiceTest { + private static final String CREDENTIALS_CONTROLLER_SERVICE = "ADLSCredentialsService"; + private static final String FILE_SYSTEM = "filesystem-name"; + private static final String DIRECTORY = "test-directory"; + private static final String FILE = "test-file"; + private static final long CONTENT_LENGTH = 10L; + public static final String MSG_EMPTY_FILE_NAME = "'File Name' property evaluated to blank string. 'File Name' must be specified as a non-blank string."; + public static final String MSG_EMPTY_FILE_SYSTEM_NAME = "'Filesystem Name' property evaluated to blank string. 'Filesystem Name' must be specified as a non-blank string."; + + @Mock + private DataLakeServiceClient client; + + @Mock + private DataLakeFileSystemClient fileSystemClient; + + @Mock + private DataLakeDirectoryClient directoryClient; + + @Mock + private DataLakeFileClient fileClient; + + @Mock + private PathProperties properties; + + @Mock + private InputStream inputStream; + + @InjectMocks + private TestAzureDataLakeStorageFileResourceService service; + + private TestRunner runner; + + @BeforeEach + void setup() throws InitializationException { + runner = TestRunners.newTestRunner(NoOpProcessor.class); + runner.addControllerService("AzureDataLakeStorageFileResourceService", service); + } + + @Test + void testHappyPath() throws InitializationException { + setupService(); + setupMocking(); + + FileResource fileResource = service.getFileResource(Map.of()); + + assertFileResource(fileResource); + verifyMockInvocations(); + } + + @Test + void testHappyPathWithValidEL() throws InitializationException { + String fileSystemKey = "filesystem.name"; + String directoryKey = "directory"; + String fileNameKey = "filename"; + setupService("${" + fileSystemKey + "}", "${" + directoryKey + "}", "${" + fileNameKey + "}"); + setupMocking(); + + FileResource fileResource = service.getFileResource(Map.of( + fileSystemKey, FILE_SYSTEM, + directoryKey, DIRECTORY, + fileNameKey, FILE)); + + assertFileResource(fileResource); + verifyMockInvocations(); + } + + @Test + void testFileIsDirectory() throws InitializationException { + setupService(); + when(client.getFileSystemClient(FILE_SYSTEM)).thenReturn(fileSystemClient); + when(fileSystemClient.getDirectoryClient(DIRECTORY)).thenReturn(directoryClient); + when(directoryClient.getFileClient(FILE)).thenReturn(fileClient); + when(fileClient.getProperties()).thenReturn(properties); + when(properties.isDirectory()).thenReturn(true); + + executeAndAssertProcessException(Map.of(), "File Name (" + FILE + ") points to a directory. Full path: " + fileClient.getFilePath()); + } + + @Test + void testNonExistentFile() throws InitializationException { + setupService(); + when(client.getFileSystemClient(FILE_SYSTEM)).thenReturn(fileSystemClient); + when(fileSystemClient.getDirectoryClient(DIRECTORY)).thenReturn(directoryClient); + when(directoryClient.getFileClient(FILE)).thenReturn(fileClient); + when(fileClient.getProperties()).thenReturn(properties); + when(properties.isDirectory()).thenReturn(false); + when(fileClient.exists()).thenReturn(false); + + executeAndAssertProcessException(Map.of(), "File " + DIRECTORY + "/" + FILE + " not found in file system: " + FILE_SYSTEM); + } + + @Test + void testInvalidDirectoryValueWithLeadingSlash() throws InitializationException { + String directoryKey = "directory.name"; + String directoryValue = "/invalid-directory"; + setupService(FILE_SYSTEM, "${" + directoryKey + "}", FILE); + + executeAndAssertProcessException(Map.of(directoryKey, directoryValue), "'Directory Name' starts with '/'. 'Directory Name' cannot contain a leading '/'."); + } + + @Test + void testValidELWithMissingFileValue() throws InitializationException { + setupService(FILE_SYSTEM, DIRECTORY, "${file.name}"); + + executeAndAssertProcessException(Map.of(), MSG_EMPTY_FILE_NAME); + } + + @Test + void testInvalidFileSystem() throws InitializationException { + String fileSystemKey = "fileSystem"; + String fileSystemValue = " "; + setupService("${" + fileSystemKey + "}", DIRECTORY, FILE); + + executeAndAssertProcessException(Map.of(fileSystemKey, fileSystemValue), MSG_EMPTY_FILE_SYSTEM_NAME); + } + + @Test + void testInvalidFileName() throws InitializationException { + String fileKey = "fileSystem"; + String fileValue = " "; + setupService(FILE_SYSTEM, DIRECTORY, "${" + fileKey + "}"); + + executeAndAssertProcessException(Map.of(fileKey, fileValue), + MSG_EMPTY_FILE_NAME); + } + + @Test + void testInvalidDirectoryValueWithWhiteSpaceOnly() throws InitializationException { + String directoryKey = "directory.name"; + String directoryValue = " "; + setupService(FILE_SYSTEM, "${" + directoryKey + "}", FILE); + + executeAndAssertProcessException(Map.of(directoryKey, directoryValue), "'Directory Name' contains whitespace characters only."); + } + + private void setupService() throws InitializationException { + setupService(FILE_SYSTEM, DIRECTORY, FILE); + } + + private void setupService(String fileSystem, String directory, String fileName) throws InitializationException { + final ADLSCredentialsService credentialsService = mock(ADLSCredentialsService.class); + when(credentialsService.getIdentifier()).thenReturn(CREDENTIALS_CONTROLLER_SERVICE); + runner.addControllerService(CREDENTIALS_CONTROLLER_SERVICE, credentialsService); + runner.enableControllerService(credentialsService); + + runner.setProperty(service, ADLS_CREDENTIALS_SERVICE, CREDENTIALS_CONTROLLER_SERVICE); + runner.setProperty(service, AzureStorageUtils.FILESYSTEM, fileSystem); + runner.setProperty(service, AzureStorageUtils.DIRECTORY, directory); + runner.setProperty(service, AzureStorageUtils.FILE, fileName); + + runner.enableControllerService(service); + } + + private void setupMocking() { + when(client.getFileSystemClient(FILE_SYSTEM)).thenReturn(fileSystemClient); + when(fileSystemClient.getDirectoryClient(DIRECTORY)).thenReturn(directoryClient); + when(directoryClient.getFileClient(FILE)).thenReturn(fileClient); + when(fileClient.getProperties()).thenReturn(properties); + when(properties.isDirectory()).thenReturn(false); + when(fileClient.exists()).thenReturn(true); + when(properties.getFileSize()).thenReturn(CONTENT_LENGTH); + DataLakeFileOpenInputStreamResult result = mock(DataLakeFileOpenInputStreamResult.class); + when(fileClient.openInputStream()).thenReturn(result); + when(result.getInputStream()).thenReturn(inputStream); + } + + private void executeAndAssertProcessException(Map<String, String> arguments, String expectedMessage) { + ProcessException exception = assertThrows(ProcessException.class, + () -> service.getFileResource(arguments)); + assertEquals(expectedMessage, exception.getMessage()); + } + + private void assertFileResource(FileResource fileResource) { + assertNotNull(fileResource); + assertEquals(fileResource.getInputStream(), inputStream); + assertEquals(fileResource.getSize(), CONTENT_LENGTH); + } + + private void verifyMockInvocations() { + verify(client).getFileSystemClient(FILE_SYSTEM); + verify(fileSystemClient).getDirectoryClient(DIRECTORY); + verify(directoryClient).getFileClient(FILE); + verify(properties).isDirectory(); + verify(fileClient).exists(); + verify(fileClient).openInputStream(); + verify(properties).getFileSize(); + } + + private static class TestAzureDataLakeStorageFileResourceService extends AzureDataLakeStorageFileResourceService { + private final DataLakeServiceClient client; + + private TestAzureDataLakeStorageFileResourceService(DataLakeServiceClient client) { + this.client = client; + } + + @Override + protected DataLakeServiceClient getStorageClient(Map<String, String> attributes) { + return client; + } + } +}