This is an automated email from the ASF dual-hosted git repository. pvillard pushed a commit to branch support/nifi-1.x in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/support/nifi-1.x by this push: new 87196e23d6 NIFI-12928 Added Simple Write strategy in PutAzureDataLakeStorage 87196e23d6 is described below commit 87196e23d651fa6edeae364c2189a7779e39060e Author: Peter Turcsanyi <turcsa...@apache.org> AuthorDate: Mon Mar 25 17:34:11 2024 +0100 NIFI-12928 Added Simple Write strategy in PutAzureDataLakeStorage Signed-off-by: Pierre Villard <pierre.villard...@gmail.com> This closes #8559. --- .../azure/storage/PutAzureDataLakeStorage.java | 148 ++++++++++++++------ .../azure/storage/utils/WritingStrategy.java | 49 +++++++ .../additionalDetails.html | 44 ++++-- .../azure/storage/ITPutAzureDataLakeStorage.java | 155 +++++++++++++++------ 4 files changed, 296 insertions(+), 100 deletions(-) diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureDataLakeStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureDataLakeStorage.java index cfd660c289..c15c737f58 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureDataLakeStorage.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureDataLakeStorage.java @@ -39,6 +39,7 @@ import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor; import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils; +import org.apache.nifi.processors.azure.storage.utils.WritingStrategy; import org.apache.nifi.processors.transfer.ResourceTransferSource; import org.apache.nifi.util.StringUtils; @@ -93,6 +94,15 @@ public class PutAzureDataLakeStorage extends AbstractAzureDataLakeStorageProcess .allowableValues(FAIL_RESOLUTION, REPLACE_RESOLUTION, IGNORE_RESOLUTION) .build(); + protected static final PropertyDescriptor WRITING_STRATEGY = new PropertyDescriptor.Builder() + .name("writing-strategy") + .displayName("Writing Strategy") + .description("Defines the approach for writing the Azure file.") + .required(true) + .allowableValues(WritingStrategy.class) + .defaultValue(WritingStrategy.WRITE_AND_RENAME.getValue()) + .build(); + public static final PropertyDescriptor BASE_TEMPORARY_PATH = new PropertyDescriptor.Builder() .name("base-temporary-path") .displayName("Base Temporary Path") @@ -102,6 +112,7 @@ public class PutAzureDataLakeStorage extends AbstractAzureDataLakeStorageProcess .defaultValue("") .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .addValidator(new DirectoryValidator("Base Temporary Path")) + .dependsOn(WRITING_STRATEGY, WritingStrategy.WRITE_AND_RENAME) .build(); private static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList( @@ -109,6 +120,7 @@ public class PutAzureDataLakeStorage extends AbstractAzureDataLakeStorageProcess FILESYSTEM, DIRECTORY, FILE, + WRITING_STRATEGY, BASE_TEMPORARY_PATH, CONFLICT_RESOLUTION, RESOURCE_TRANSFER_SOURCE, @@ -131,41 +143,44 @@ public class PutAzureDataLakeStorage extends AbstractAzureDataLakeStorageProcess final long startNanos = System.nanoTime(); try { final String fileSystem = evaluateFileSystemProperty(context, flowFile); - final String originalDirectory = evaluateDirectoryProperty(context, flowFile); - final String tempPath = evaluateDirectoryProperty(context, flowFile, BASE_TEMPORARY_PATH); - final String tempDirectory = createPath(tempPath, TEMP_FILE_DIRECTORY); + final String directory = evaluateDirectoryProperty(context, flowFile); final String fileName = evaluateFileNameProperty(context, flowFile); final DataLakeFileSystemClient fileSystemClient = getFileSystemClient(context, flowFile, fileSystem); - final DataLakeDirectoryClient directoryClient = fileSystemClient.getDirectoryClient(originalDirectory); + final DataLakeDirectoryClient directoryClient = fileSystemClient.getDirectoryClient(directory); - final String tempFilePrefix = UUID.randomUUID().toString(); - final DataLakeDirectoryClient tempDirectoryClient = fileSystemClient.getDirectoryClient(tempDirectory); + final WritingStrategy writingStrategy = WritingStrategy.valueOf(context.getProperty(WRITING_STRATEGY).getValue()); final String conflictResolution = context.getProperty(CONFLICT_RESOLUTION).getValue(); final ResourceTransferSource resourceTransferSource = ResourceTransferSource.valueOf(context.getProperty(RESOURCE_TRANSFER_SOURCE).getValue()); final Optional<FileResource> fileResourceFound = getFileResource(resourceTransferSource, context, flowFile.getAttributes()); final long transferSize = fileResourceFound.map(FileResource::getSize).orElse(flowFile.getSize()); - final DataLakeFileClient tempFileClient = tempDirectoryClient.createFile(tempFilePrefix + fileName, true); - if (transferSize > 0) { - final FlowFile sourceFlowFile = flowFile; - try ( - final InputStream inputStream = new BufferedInputStream( - fileResourceFound.map(FileResource::getInputStream) - .orElseGet(() -> session.read(sourceFlowFile)) - ) - ) { - uploadContent(tempFileClient, inputStream, transferSize); - } catch (final Exception e) { - removeTempFile(tempFileClient); - throw e; + final DataLakeFileClient fileClient; + + if (writingStrategy == WritingStrategy.WRITE_AND_RENAME) { + final String tempPath = evaluateDirectoryProperty(context, flowFile, BASE_TEMPORARY_PATH); + final String tempDirectory = createPath(tempPath, TEMP_FILE_DIRECTORY); + final String tempFilePrefix = UUID.randomUUID().toString(); + + final DataLakeDirectoryClient tempDirectoryClient = fileSystemClient.getDirectoryClient(tempDirectory); + final DataLakeFileClient tempFileClient = tempDirectoryClient.createFile(tempFilePrefix + fileName, true); + + uploadFile(session, flowFile, fileResourceFound, transferSize, tempFileClient); + + createDirectoryIfNotExists(directoryClient); + + fileClient = renameFile(tempFileClient, directoryClient.getDirectoryPath(), fileName, conflictResolution); + } else { + fileClient = createFile(directoryClient, fileName, conflictResolution); + + if (fileClient != null) { + uploadFile(session, flowFile, fileResourceFound, transferSize, fileClient); } } - createDirectoryIfNotExists(directoryClient); - final String fileUrl = renameFile(tempFileClient, directoryClient.getDirectoryPath(), fileName, conflictResolution); - if (fileUrl != null) { - final Map<String, String> attributes = createAttributeMap(fileSystem, originalDirectory, fileName, fileUrl, transferSize); + if (fileClient != null) { + final String fileUrl = fileClient.getFileUrl(); + final Map<String, String> attributes = createAttributeMap(fileSystem, directory, fileName, fileUrl, transferSize); flowFile = session.putAllAttributes(flowFile, attributes); final long transferMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos); @@ -180,12 +195,12 @@ public class PutAzureDataLakeStorage extends AbstractAzureDataLakeStorageProcess } } - private DataLakeFileSystemClient getFileSystemClient(ProcessContext context, FlowFile flowFile, String fileSystem) { + private DataLakeFileSystemClient getFileSystemClient(final ProcessContext context, final FlowFile flowFile, final String fileSystem) { final DataLakeServiceClient storageClient = getStorageClient(context, flowFile); return storageClient.getFileSystemClient(fileSystem); } - private Map<String, String> createAttributeMap(String fileSystem, String originalDirectory, String fileName, String fileUrl, long length) { + private Map<String, String> createAttributeMap(final String fileSystem, final String originalDirectory, final String fileName, final String fileUrl, final long length) { final Map<String, String> attributes = new HashMap<>(); attributes.put(ATTR_NAME_FILESYSTEM, fileSystem); attributes.put(ATTR_NAME_DIRECTORY, originalDirectory); @@ -195,14 +210,29 @@ public class PutAzureDataLakeStorage extends AbstractAzureDataLakeStorageProcess return attributes; } - private void createDirectoryIfNotExists(DataLakeDirectoryClient directoryClient) { + private void createDirectoryIfNotExists(final DataLakeDirectoryClient directoryClient) { if (!directoryClient.getDirectoryPath().isEmpty() && !directoryClient.exists()) { directoryClient.create(); } } + private void uploadFile(final ProcessSession session, final FlowFile flowFile, final Optional<FileResource> fileResourceFound, + final long transferSize, final DataLakeFileClient fileClient) throws Exception { + if (transferSize > 0) { + try (final InputStream inputStream = new BufferedInputStream( + fileResourceFound.map(FileResource::getInputStream) + .orElseGet(() -> session.read(flowFile))) + ) { + uploadContent(fileClient, inputStream, transferSize); + } catch (final Exception e) { + removeFile(fileClient); + throw e; + } + } + } + //Visible for testing - static void uploadContent(DataLakeFileClient fileClient, InputStream in, long length) { + static void uploadContent(final DataLakeFileClient fileClient, final InputStream in, final long length) { long chunkStart = 0; long chunkSize; @@ -223,19 +253,41 @@ public class PutAzureDataLakeStorage extends AbstractAzureDataLakeStorageProcess } /** - * This method serves as a "commit" for the upload process. Upon upload, a 0-byte file is created, then the payload is appended to it. - * Because of that, a work-in-progress file is available for readers before the upload is complete. It is not an efficient approach in - * case of conflicts because FlowFiles are uploaded unnecessarily, but it is a calculated risk because consistency is more important. + * Creates the file on Azure for 'Simple Write' strategy. Upon upload, a 0-byte file is created, then the payload is appended to it. + * Because of that, a work-in-progress file is available for readers before the upload is complete. * + * @param directoryClient directory client of the uploaded file's parent directory + * @param fileName name of the uploaded file + * @param conflictResolution conflict resolution strategy + * @return the file client of the uploaded file or {@code null} if the file already exists and conflict resolution strategy is 'ignore' + * @throws ProcessException if the file already exists and the conflict resolution strategy is 'fail'; also in case of other errors + */ + DataLakeFileClient createFile(DataLakeDirectoryClient directoryClient, final String fileName, final String conflictResolution) { + final String destinationPath = createPath(directoryClient.getDirectoryPath(), fileName); + + try { + final boolean overwrite = conflictResolution.equals(REPLACE_RESOLUTION); + return directoryClient.createFile(fileName, overwrite); + } catch (DataLakeStorageException dataLakeStorageException) { + return handleDataLakeStorageException(dataLakeStorageException, destinationPath, conflictResolution); + } + } + + /** + * This method serves as a "commit" for the upload process in case of 'Write and Rename' strategy. In order to prevent work-in-progress files from being available for readers, + * a temporary file is written first, and then renamed/moved to its final destination. It is not an efficient approach in case of conflicts because FlowFiles are uploaded unnecessarily, + * but it is a calculated risk because consistency is more important for 'Write and Rename' strategy. + * <p> * Visible for testing * - * @param sourceFileClient client of the temporary file + * @param sourceFileClient file client of the temporary file * @param destinationDirectory final location of the uploaded file * @param destinationFileName final name of the uploaded file * @param conflictResolution conflict resolution strategy - * @return URL of the uploaded file + * @return the file client of the uploaded file or {@code null} if the file already exists and conflict resolution strategy is 'ignore' + * @throws ProcessException if the file already exists and the conflict resolution strategy is 'fail'; also in case of other errors */ - String renameFile(final DataLakeFileClient sourceFileClient, final String destinationDirectory, final String destinationFileName, final String conflictResolution) { + DataLakeFileClient renameFile(final DataLakeFileClient sourceFileClient, final String destinationDirectory, final String destinationFileName, final String conflictResolution) { final String destinationPath = createPath(destinationDirectory, destinationFileName); try { @@ -243,18 +295,24 @@ public class PutAzureDataLakeStorage extends AbstractAzureDataLakeStorageProcess if (!conflictResolution.equals(REPLACE_RESOLUTION)) { destinationCondition.setIfNoneMatch("*"); } - return sourceFileClient.renameWithResponse(null, destinationPath, null, destinationCondition, null, null).getValue().getFileUrl(); + return sourceFileClient.renameWithResponse(null, destinationPath, null, destinationCondition, null, null).getValue(); } catch (DataLakeStorageException dataLakeStorageException) { - removeTempFile(sourceFileClient); - if (dataLakeStorageException.getStatusCode() == 409 && conflictResolution.equals(IGNORE_RESOLUTION)) { - getLogger().info("File [{}] already exists. Remote file not modified due to {} being set to '{}'.", - destinationPath, CONFLICT_RESOLUTION.getDisplayName(), conflictResolution); - return null; - } else if (dataLakeStorageException.getStatusCode() == 409 && conflictResolution.equals(FAIL_RESOLUTION)) { - throw new ProcessException(String.format("File [%s] already exists.", destinationPath), dataLakeStorageException); - } else { - throw new ProcessException(String.format("Renaming File [%s] failed", destinationPath), dataLakeStorageException); - } + removeFile(sourceFileClient); + + return handleDataLakeStorageException(dataLakeStorageException, destinationPath, conflictResolution); + } + } + + private DataLakeFileClient handleDataLakeStorageException(final DataLakeStorageException dataLakeStorageException, final String destinationPath, final String conflictResolution) { + final boolean fileAlreadyExists = dataLakeStorageException.getStatusCode() == 409; + if (fileAlreadyExists && conflictResolution.equals(IGNORE_RESOLUTION)) { + getLogger().info("File [{}] already exists. Remote file not modified due to {} being set to '{}'.", + destinationPath, CONFLICT_RESOLUTION.getDisplayName(), conflictResolution); + return null; + } else if (fileAlreadyExists && conflictResolution.equals(FAIL_RESOLUTION)) { + throw new ProcessException(String.format("File [%s] already exists.", destinationPath), dataLakeStorageException); + } else { + throw new ProcessException(String.format("File operation failed [%s]", destinationPath), dataLakeStorageException); } } @@ -264,7 +322,7 @@ public class PutAzureDataLakeStorage extends AbstractAzureDataLakeStorageProcess : path; } - private void removeTempFile(final DataLakeFileClient fileClient) { + private void removeFile(final DataLakeFileClient fileClient) { try { fileClient.delete(); } catch (Exception e) { diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/WritingStrategy.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/WritingStrategy.java new file mode 100644 index 0000000000..6fda36623d --- /dev/null +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/WritingStrategy.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.azure.storage.utils; + +import org.apache.nifi.components.DescribedValue; + +public enum WritingStrategy implements DescribedValue { + + WRITE_AND_RENAME("Write and Rename", "The processor writes the Azure file into a temporary directory and then renames/moves it to the final destination." + + " This prevents other processes from reading partially written files."), + SIMPLE_WRITE("Simple Write", "The processor writes the Azure file directly to the destination. This might result in the reading of partially written files."); + + private final String displayName; + private final String description; + + WritingStrategy(String displayName, String description) { + this.displayName = displayName; + this.description = description; + } + + @Override + public String getValue() { + return name(); + } + + @Override + public String getDisplayName() { + return displayName; + } + + @Override + public String getDescription() { + return description; + } +} diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/docs/org.apache.nifi.processors.azure.storage.PutAzureDataLakeStorage/additionalDetails.html b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/docs/org.apache.nifi.processors.azure.storage.PutAzureDataLakeStorage/additionalDetails.html index 2469ceafaa..4758d4c19e 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/docs/org.apache.nifi.processors.azure.storage.PutAzureDataLakeStorage/additionalDetails.html +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/docs/org.apache.nifi.processors.azure.storage.PutAzureDataLakeStorage/additionalDetails.html @@ -26,31 +26,57 @@ This processor is responsible for uploading files to Azure Data Lake Storage Gen2. </p> -<h3>File uploading and cleanup process</h3> +<h3>File uploading and cleanup process in case of "Write and Rename" strategy</h3> <h4>New file upload</h4> <ol> <li>A temporary file is created with random prefix under the given path in '_nifitempdirectory'.</li> <li>Content is appended to temp file.</li> - <li>Temp file is renamed to its original name, the original file is overwritten.</li> - <li>In case of appending or renaming failure the temp file is deleted, the original file remains intact.</li> - <li>In case of temporary file deletion failure both temp file and original file remain on the server.</li> + <li>Temp file is moved to the final destination directory and renamed to its original name.</li> + <li>In case of appending or renaming failure, the temp file is deleted.</li> + <li>In case of temporary file deletion failure, the temp file remains on the server.</li> </ol> <h4>Existing file upload</h4> <ul> - <li>Processors with "fail" conflict resolution strategy will be directed to "Failure" relationship.</li> - <li>Processors with "ignore" conflict resolution strategy will be directed to "Success" relationship.</li> + <li>Processors with "fail" conflict resolution strategy will direct the FlowFile to "Failure" relationship.</li> + <li>Processors with "ignore" conflict resolution strategy will direct the FlowFile to "Success" relationship.</li> <li>Processors with "replace" conflict resolution strategy:</li> <ol> <li>A temporary file is created with random prefix under the given path in '_nifitempdirectory'.</li> <li>Content is appended to temp file.</li> - <li>Temp file is renamed to its original name, the original file is overwritten.</li> - <li>In case of appending or renaming failure the temp file is deleted, the original file remains intact.</li> - <li>In case of temporary file deletion failure both temp file and original file remain on the server.</li> + <li>Temp file is moved to the final destination directory and renamed to its original name, the original file is overwritten.</li> + <li>In case of appending or renaming failure, the temp file is deleted and the original file remains intact.</li> + <li>In case of temporary file deletion failure, both temp file and original file remain on the server.</li> + </ol> +</ul> + +<h3>File uploading and cleanup process in case of "Simple Write" strategy</h3> + +<h4>New file upload</h4> + +<ol> + <li>An empty file is created at its final destination.</li> + <li>Content is appended to the file.</li> + <li>In case of appending failure, the file is deleted.</li> + <li>In case of file deletion failure, the file remains on the server.</li> +</ol> + +<h4>Existing file upload</h4> + +<ul> + <li>Processors with "fail" conflict resolution strategy will direct the FlowFile to "Failure" relationship.</li> + <li>Processors with "ignore" conflict resolution strategy will direct the FlowFile to "Success" relationship.</li> + <li>Processors with "replace" conflict resolution strategy:</li> + + <ol> + <li>An empty file is created at its final destination, the original file is overwritten.</li> + <li>Content is appended to the file.</li> + <li>In case of appending failure, the file is deleted and the original file is not restored.</li> + <li>In case of file deletion failure, the file remains on the server.</li> </ol> </ul> diff --git a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureDataLakeStorage.java b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureDataLakeStorage.java index 171e3a5e75..bb5498b425 100644 --- a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureDataLakeStorage.java +++ b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureDataLakeStorage.java @@ -22,13 +22,15 @@ import org.apache.commons.lang3.StringUtils; import org.apache.nifi.fileresource.service.StandardFileResourceService; import org.apache.nifi.fileresource.service.api.FileResourceService; import org.apache.nifi.processor.Processor; +import org.apache.nifi.processors.azure.storage.utils.WritingStrategy; import org.apache.nifi.processors.transfer.ResourceTransferProperties; import org.apache.nifi.processors.transfer.ResourceTransferSource; import org.apache.nifi.provenance.ProvenanceEventRecord; import org.apache.nifi.provenance.ProvenanceEventType; import org.apache.nifi.util.MockFlowFile; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; import java.nio.charset.StandardCharsets; import java.nio.file.Files; @@ -68,27 +70,23 @@ public class ITPutAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT { runner.setProperty(PutAzureDataLakeStorage.FILE, FILE_NAME); } - @Test - public void testPutFileToExistingDirectory() throws Exception { - fileSystemClient.createDirectory(DIRECTORY); - - runProcessor(FILE_DATA); + @ParameterizedTest + @EnumSource(WritingStrategy.class) + public void testPutFileToExistingDirectory(WritingStrategy writingStrategy) throws Exception { + setWritingStrategy(writingStrategy); - assertSuccess(DIRECTORY, FILE_NAME, FILE_DATA); - } - - @Test - public void testPutFileToExistingDirectoryUsingProxyConfigurationService() throws Exception { fileSystemClient.createDirectory(DIRECTORY); - configureProxyService(); runProcessor(FILE_DATA); assertSuccess(DIRECTORY, FILE_NAME, FILE_DATA); } - @Test - public void testPutFileToExistingDirectoryWithReplaceResolution() throws Exception { + @ParameterizedTest + @EnumSource(WritingStrategy.class) + public void testPutFileToExistingDirectoryWithReplaceResolution(WritingStrategy writingStrategy) throws Exception { + setWritingStrategy(writingStrategy); + fileSystemClient.createDirectory(DIRECTORY); runner.setProperty(PutAzureDataLakeStorage.CONFLICT_RESOLUTION, PutAzureDataLakeStorage.REPLACE_RESOLUTION); @@ -98,8 +96,11 @@ public class ITPutAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT { assertSuccess(DIRECTORY, FILE_NAME, FILE_DATA); } - @Test - public void testPutFileToExistingDirectoryWithIgnoreResolution() throws Exception { + @ParameterizedTest + @EnumSource(WritingStrategy.class) + public void testPutFileToExistingDirectoryWithIgnoreResolution(WritingStrategy writingStrategy) throws Exception { + setWritingStrategy(writingStrategy); + fileSystemClient.createDirectory(DIRECTORY); runner.setProperty(PutAzureDataLakeStorage.CONFLICT_RESOLUTION, PutAzureDataLakeStorage.IGNORE_RESOLUTION); @@ -109,15 +110,21 @@ public class ITPutAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT { assertSuccess(DIRECTORY, FILE_NAME, FILE_DATA); } - @Test - public void testPutFileToNonExistingDirectory() throws Exception { + @ParameterizedTest + @EnumSource(WritingStrategy.class) + public void testPutFileToNonExistingDirectory(WritingStrategy writingStrategy) throws Exception { + setWritingStrategy(writingStrategy); + runProcessor(FILE_DATA); assertSuccess(DIRECTORY, FILE_NAME, FILE_DATA); } - @Test - public void testPutFileToDeepDirectory() throws Exception { + @ParameterizedTest + @EnumSource(WritingStrategy.class) + public void testPutFileToDeepDirectory(WritingStrategy writingStrategy) throws Exception { + setWritingStrategy(writingStrategy); + String baseDirectory = "dir1/dir2"; String fullDirectory = baseDirectory + "/dir3/dir4"; fileSystemClient.createDirectory(baseDirectory); @@ -128,8 +135,11 @@ public class ITPutAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT { assertSuccess(fullDirectory, FILE_NAME, FILE_DATA); } - @Test - public void testPutFileToRootDirectory() throws Exception { + @ParameterizedTest + @EnumSource(WritingStrategy.class) + public void testPutFileToRootDirectory(WritingStrategy writingStrategy) throws Exception { + setWritingStrategy(writingStrategy); + String rootDirectory = ""; runner.setProperty(PutAzureDataLakeStorage.DIRECTORY, rootDirectory); @@ -138,8 +148,11 @@ public class ITPutAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT { assertSuccess(rootDirectory, FILE_NAME, FILE_DATA); } - @Test - public void testPutEmptyFile() throws Exception { + @ParameterizedTest + @EnumSource(WritingStrategy.class) + public void testPutEmptyFile(WritingStrategy writingStrategy) throws Exception { + setWritingStrategy(writingStrategy); + byte[] fileData = new byte[0]; runProcessor(fileData); @@ -147,8 +160,11 @@ public class ITPutAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT { assertSuccess(DIRECTORY, FILE_NAME, fileData); } - @Test - public void testPutBigFile() throws Exception { + @ParameterizedTest + @EnumSource(WritingStrategy.class) + public void testPutBigFile(WritingStrategy writingStrategy) throws Exception { + setWritingStrategy(writingStrategy); + Random random = new Random(); byte[] fileData = new byte[120_000_000]; random.nextBytes(fileData); @@ -158,8 +174,11 @@ public class ITPutAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT { assertSuccess(DIRECTORY, FILE_NAME, fileData); } - @Test - public void testPutFileWithNonExistingFileSystem() { + @ParameterizedTest + @EnumSource(WritingStrategy.class) + public void testPutFileWithNonExistingFileSystem(WritingStrategy writingStrategy) { + setWritingStrategy(writingStrategy); + runner.setProperty(PutAzureDataLakeStorage.FILESYSTEM, "dummy"); runProcessor(FILE_DATA); @@ -167,8 +186,11 @@ public class ITPutAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT { assertFailure(); } - @Test - public void testPutFileWithInvalidFileName() { + @ParameterizedTest + @EnumSource(WritingStrategy.class) + public void testPutFileWithInvalidFileName(WritingStrategy writingStrategy) { + setWritingStrategy(writingStrategy); + runner.setProperty(PutAzureDataLakeStorage.FILE, "/file1"); runProcessor(FILE_DATA); @@ -176,8 +198,11 @@ public class ITPutAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT { assertFailure(); } - @Test - public void testPutFileWithSpacesInDirectoryAndFileName() throws Exception { + @ParameterizedTest + @EnumSource(WritingStrategy.class) + public void testPutFileWithSpacesInDirectoryAndFileName(WritingStrategy writingStrategy) throws Exception { + setWritingStrategy(writingStrategy); + String directory = "dir 1"; String fileName = "file 1"; runner.setProperty(PutAzureDataLakeStorage.DIRECTORY, directory); @@ -188,8 +213,11 @@ public class ITPutAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT { assertSuccess(directory, fileName, FILE_DATA); } - @Test - public void testPutFileToExistingFileWithFailResolution() { + @ParameterizedTest + @EnumSource(WritingStrategy.class) + public void testPutFileToExistingFileWithFailResolution(WritingStrategy writingStrategy) { + setWritingStrategy(writingStrategy); + fileSystemClient.createFile(String.format("%s/%s", DIRECTORY, FILE_NAME)); runProcessor(FILE_DATA); @@ -197,8 +225,11 @@ public class ITPutAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT { assertFailure(); } - @Test - public void testPutFileToExistingFileWithReplaceResolution() throws Exception { + @ParameterizedTest + @EnumSource(WritingStrategy.class) + public void testPutFileToExistingFileWithReplaceResolution(WritingStrategy writingStrategy) throws Exception { + setWritingStrategy(writingStrategy); + fileSystemClient.createFile(String.format("%s/%s", DIRECTORY, FILE_NAME)); runner.setProperty(PutAzureDataLakeStorage.CONFLICT_RESOLUTION, PutAzureDataLakeStorage.REPLACE_RESOLUTION); @@ -208,8 +239,11 @@ public class ITPutAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT { assertSuccess(DIRECTORY, FILE_NAME, FILE_DATA); } - @Test - public void testPutFileToExistingFileWithIgnoreResolution() throws Exception { + @ParameterizedTest + @EnumSource(WritingStrategy.class) + public void testPutFileToExistingFileWithIgnoreResolution(WritingStrategy writingStrategy) throws Exception { + setWritingStrategy(writingStrategy); + String azureFileContent = "AzureFileContent"; createDirectoryAndUploadFile(DIRECTORY, FILE_NAME, azureFileContent); @@ -220,8 +254,11 @@ public class ITPutAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT { assertSuccessWithIgnoreResolution(DIRECTORY, FILE_NAME, FILE_DATA, azureFileContent.getBytes(StandardCharsets.UTF_8)); } - @Test - public void testPutFileWithEL() throws Exception { + @ParameterizedTest + @EnumSource(WritingStrategy.class) + public void testPutFileWithEL(WritingStrategy writingStrategy) throws Exception { + setWritingStrategy(writingStrategy); + Map<String, String> attributes = createAttributesMap(); setELProperties(); @@ -230,8 +267,11 @@ public class ITPutAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT { assertSuccess(DIRECTORY, FILE_NAME, FILE_DATA); } - @Test - public void testPutFileWithELButFilesystemIsNotSpecified() { + @ParameterizedTest + @EnumSource(WritingStrategy.class) + public void testPutFileWithELButFilesystemIsNotSpecified(WritingStrategy writingStrategy) { + setWritingStrategy(writingStrategy); + Map<String, String> attributes = createAttributesMap(); attributes.remove(EL_FILESYSTEM); setELProperties(); @@ -241,8 +281,11 @@ public class ITPutAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT { assertFailure(); } - @Test - public void testPutFileWithELButFileNameIsNotSpecified() { + @ParameterizedTest + @EnumSource(WritingStrategy.class) + public void testPutFileWithELButFileNameIsNotSpecified(WritingStrategy writingStrategy) { + setWritingStrategy(writingStrategy); + Map<String, String> attributes = createAttributesMap(); attributes.remove(EL_FILE_NAME); setELProperties(); @@ -252,8 +295,11 @@ public class ITPutAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT { assertFailure(); } - @Test - public void testPutFileFromLocalFile() throws Exception { + @ParameterizedTest + @EnumSource(WritingStrategy.class) + public void testPutFileFromLocalFile(WritingStrategy writingStrategy) throws Exception { + setWritingStrategy(writingStrategy); + String attributeName = "file.path"; String serviceId = FileResourceService.class.getSimpleName(); @@ -279,6 +325,19 @@ public class ITPutAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT { assertProvenanceEvents(); } + @ParameterizedTest + @EnumSource(WritingStrategy.class) + public void testPutFileUsingProxy(WritingStrategy writingStrategy) throws Exception { + setWritingStrategy(writingStrategy); + + fileSystemClient.createDirectory(DIRECTORY); + configureProxyService(); + + runProcessor(FILE_DATA); + + assertSuccess(DIRECTORY, FILE_NAME, FILE_DATA); + } + private Map<String, String> createAttributesMap() { Map<String, String> attributes = new HashMap<>(); @@ -289,6 +348,10 @@ public class ITPutAzureDataLakeStorage extends AbstractAzureDataLakeStorageIT { return attributes; } + private void setWritingStrategy(WritingStrategy writingStrategy) { + runner.setProperty(PutAzureDataLakeStorage.WRITING_STRATEGY, writingStrategy.getValue()); + } + private void setELProperties() { runner.setProperty(PutAzureDataLakeStorage.FILESYSTEM, String.format("${%s}", EL_FILESYSTEM)); runner.setProperty(PutAzureDataLakeStorage.DIRECTORY, String.format("${%s}", EL_DIRECTORY));