This is an automated email from the ASF dual-hosted git repository.

pvillard pushed a commit to branch support/nifi-1.x
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/support/nifi-1.x by this push:
     new 87196e23d6 NIFI-12928 Added Simple Write strategy in 
PutAzureDataLakeStorage
87196e23d6 is described below

commit 87196e23d651fa6edeae364c2189a7779e39060e
Author: Peter Turcsanyi <turcsa...@apache.org>
AuthorDate: Mon Mar 25 17:34:11 2024 +0100

    NIFI-12928 Added Simple Write strategy in PutAzureDataLakeStorage
    
    Signed-off-by: Pierre Villard <pierre.villard...@gmail.com>
    
    This closes #8559.
---
 .../azure/storage/PutAzureDataLakeStorage.java     | 148 ++++++++++++++------
 .../azure/storage/utils/WritingStrategy.java       |  49 +++++++
 .../additionalDetails.html                         |  44 ++++--
 .../azure/storage/ITPutAzureDataLakeStorage.java   | 155 +++++++++++++++------
 4 files changed, 296 insertions(+), 100 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureDataLakeStorage.java
 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureDataLakeStorage.java
index cfd660c289..c15c737f58 100644
--- 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureDataLakeStorage.java
+++ 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/PutAzureDataLakeStorage.java
@@ -39,6 +39,7 @@ import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processors.azure.AbstractAzureDataLakeStorageProcessor;
 import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
+import org.apache.nifi.processors.azure.storage.utils.WritingStrategy;
 import org.apache.nifi.processors.transfer.ResourceTransferSource;
 import org.apache.nifi.util.StringUtils;
 
@@ -93,6 +94,15 @@ public class PutAzureDataLakeStorage extends 
AbstractAzureDataLakeStorageProcess
             .allowableValues(FAIL_RESOLUTION, REPLACE_RESOLUTION, 
IGNORE_RESOLUTION)
             .build();
 
+    protected static final PropertyDescriptor WRITING_STRATEGY = new 
PropertyDescriptor.Builder()
+            .name("writing-strategy")
+            .displayName("Writing Strategy")
+            .description("Defines the approach for writing the Azure file.")
+            .required(true)
+            .allowableValues(WritingStrategy.class)
+            .defaultValue(WritingStrategy.WRITE_AND_RENAME.getValue())
+            .build();
+
     public static final PropertyDescriptor BASE_TEMPORARY_PATH = new 
PropertyDescriptor.Builder()
             .name("base-temporary-path")
             .displayName("Base Temporary Path")
@@ -102,6 +112,7 @@ public class PutAzureDataLakeStorage extends 
AbstractAzureDataLakeStorageProcess
             .defaultValue("")
             
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
             .addValidator(new DirectoryValidator("Base Temporary Path"))
+            .dependsOn(WRITING_STRATEGY, WritingStrategy.WRITE_AND_RENAME)
             .build();
 
     private static final List<PropertyDescriptor> PROPERTIES = 
Collections.unmodifiableList(Arrays.asList(
@@ -109,6 +120,7 @@ public class PutAzureDataLakeStorage extends 
AbstractAzureDataLakeStorageProcess
             FILESYSTEM,
             DIRECTORY,
             FILE,
+            WRITING_STRATEGY,
             BASE_TEMPORARY_PATH,
             CONFLICT_RESOLUTION,
             RESOURCE_TRANSFER_SOURCE,
@@ -131,41 +143,44 @@ public class PutAzureDataLakeStorage extends 
AbstractAzureDataLakeStorageProcess
         final long startNanos = System.nanoTime();
         try {
             final String fileSystem = evaluateFileSystemProperty(context, 
flowFile);
-            final String originalDirectory = 
evaluateDirectoryProperty(context, flowFile);
-            final String tempPath = evaluateDirectoryProperty(context, 
flowFile, BASE_TEMPORARY_PATH);
-            final String tempDirectory = createPath(tempPath, 
TEMP_FILE_DIRECTORY);
+            final String directory = evaluateDirectoryProperty(context, 
flowFile);
             final String fileName = evaluateFileNameProperty(context, 
flowFile);
 
             final DataLakeFileSystemClient fileSystemClient = 
getFileSystemClient(context, flowFile, fileSystem);
-            final DataLakeDirectoryClient directoryClient = 
fileSystemClient.getDirectoryClient(originalDirectory);
+            final DataLakeDirectoryClient directoryClient = 
fileSystemClient.getDirectoryClient(directory);
 
-            final String tempFilePrefix = UUID.randomUUID().toString();
-            final DataLakeDirectoryClient tempDirectoryClient = 
fileSystemClient.getDirectoryClient(tempDirectory);
+            final WritingStrategy writingStrategy = 
WritingStrategy.valueOf(context.getProperty(WRITING_STRATEGY).getValue());
             final String conflictResolution = 
context.getProperty(CONFLICT_RESOLUTION).getValue();
             final ResourceTransferSource resourceTransferSource = 
ResourceTransferSource.valueOf(context.getProperty(RESOURCE_TRANSFER_SOURCE).getValue());
             final Optional<FileResource> fileResourceFound = 
getFileResource(resourceTransferSource, context, flowFile.getAttributes());
             final long transferSize = 
fileResourceFound.map(FileResource::getSize).orElse(flowFile.getSize());
 
-            final DataLakeFileClient tempFileClient = 
tempDirectoryClient.createFile(tempFilePrefix + fileName, true);
-            if (transferSize > 0) {
-                final FlowFile sourceFlowFile = flowFile;
-                try (
-                        final InputStream inputStream = new 
BufferedInputStream(
-                                
fileResourceFound.map(FileResource::getInputStream)
-                                        .orElseGet(() -> 
session.read(sourceFlowFile))
-                        )
-                ) {
-                    uploadContent(tempFileClient, inputStream, transferSize);
-                } catch (final Exception e) {
-                    removeTempFile(tempFileClient);
-                    throw e;
+            final DataLakeFileClient fileClient;
+
+            if (writingStrategy == WritingStrategy.WRITE_AND_RENAME) {
+                final String tempPath = evaluateDirectoryProperty(context, 
flowFile, BASE_TEMPORARY_PATH);
+                final String tempDirectory = createPath(tempPath, 
TEMP_FILE_DIRECTORY);
+                final String tempFilePrefix = UUID.randomUUID().toString();
+
+                final DataLakeDirectoryClient tempDirectoryClient = 
fileSystemClient.getDirectoryClient(tempDirectory);
+                final DataLakeFileClient tempFileClient = 
tempDirectoryClient.createFile(tempFilePrefix + fileName, true);
+
+                uploadFile(session, flowFile, fileResourceFound, transferSize, 
tempFileClient);
+
+                createDirectoryIfNotExists(directoryClient);
+
+                fileClient = renameFile(tempFileClient, 
directoryClient.getDirectoryPath(), fileName, conflictResolution);
+            } else {
+                fileClient = createFile(directoryClient, fileName, 
conflictResolution);
+
+                if (fileClient != null) {
+                    uploadFile(session, flowFile, fileResourceFound, 
transferSize, fileClient);
                 }
             }
-            createDirectoryIfNotExists(directoryClient);
 
-            final String fileUrl = renameFile(tempFileClient, 
directoryClient.getDirectoryPath(), fileName, conflictResolution);
-            if (fileUrl != null) {
-                final Map<String, String> attributes = 
createAttributeMap(fileSystem, originalDirectory, fileName, fileUrl, 
transferSize);
+            if (fileClient != null) {
+                final String fileUrl = fileClient.getFileUrl();
+                final Map<String, String> attributes = 
createAttributeMap(fileSystem, directory, fileName, fileUrl, transferSize);
                 flowFile = session.putAllAttributes(flowFile, attributes);
 
                 final long transferMillis = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
@@ -180,12 +195,12 @@ public class PutAzureDataLakeStorage extends 
AbstractAzureDataLakeStorageProcess
         }
     }
 
-    private DataLakeFileSystemClient getFileSystemClient(ProcessContext 
context, FlowFile flowFile, String fileSystem) {
+    private DataLakeFileSystemClient getFileSystemClient(final ProcessContext 
context, final FlowFile flowFile, final String fileSystem) {
         final DataLakeServiceClient storageClient = getStorageClient(context, 
flowFile);
         return storageClient.getFileSystemClient(fileSystem);
     }
 
-    private Map<String, String> createAttributeMap(String fileSystem, String 
originalDirectory, String fileName, String fileUrl, long length) {
+    private Map<String, String> createAttributeMap(final String fileSystem, 
final String originalDirectory, final String fileName, final String fileUrl, 
final long length) {
         final Map<String, String> attributes = new HashMap<>();
         attributes.put(ATTR_NAME_FILESYSTEM, fileSystem);
         attributes.put(ATTR_NAME_DIRECTORY, originalDirectory);
@@ -195,14 +210,29 @@ public class PutAzureDataLakeStorage extends 
AbstractAzureDataLakeStorageProcess
         return attributes;
     }
 
-    private void createDirectoryIfNotExists(DataLakeDirectoryClient 
directoryClient) {
+    private void createDirectoryIfNotExists(final DataLakeDirectoryClient 
directoryClient) {
         if (!directoryClient.getDirectoryPath().isEmpty() && 
!directoryClient.exists()) {
             directoryClient.create();
         }
     }
 
+    private void uploadFile(final ProcessSession session, final FlowFile 
flowFile, final Optional<FileResource> fileResourceFound,
+                            final long transferSize, final DataLakeFileClient 
fileClient) throws Exception {
+        if (transferSize > 0) {
+            try (final InputStream inputStream = new BufferedInputStream(
+                    fileResourceFound.map(FileResource::getInputStream)
+                            .orElseGet(() -> session.read(flowFile)))
+            ) {
+                uploadContent(fileClient, inputStream, transferSize);
+            } catch (final Exception e) {
+                removeFile(fileClient);
+                throw e;
+            }
+        }
+    }
+
     //Visible for testing
-    static void uploadContent(DataLakeFileClient fileClient, InputStream in, 
long length) {
+    static void uploadContent(final DataLakeFileClient fileClient, final 
InputStream in, final long length) {
         long chunkStart = 0;
         long chunkSize;
 
@@ -223,19 +253,41 @@ public class PutAzureDataLakeStorage extends 
AbstractAzureDataLakeStorageProcess
     }
 
     /**
-     * This method serves as a "commit" for the upload process. Upon upload, a 
0-byte file is created, then the payload is appended to it.
-     * Because of that, a work-in-progress file is available for readers 
before the upload is complete. It is not an efficient approach in
-     * case of conflicts because FlowFiles are uploaded unnecessarily, but it 
is a calculated risk because consistency is more important.
+     * Creates the file on Azure for 'Simple Write' strategy. Upon upload, a 
0-byte file is created, then the payload is appended to it.
+     * Because of that, a work-in-progress file is available for readers 
before the upload is complete.
      *
+     * @param directoryClient directory client of the uploaded file's parent 
directory
+     * @param fileName name of the uploaded file
+     * @param conflictResolution conflict resolution strategy
+     * @return the file client of the uploaded file or {@code null} if the 
file already exists and conflict resolution strategy is 'ignore'
+     * @throws ProcessException if the file already exists and the conflict 
resolution strategy is 'fail'; also in case of other errors
+     */
+    DataLakeFileClient createFile(DataLakeDirectoryClient directoryClient, 
final String fileName, final String conflictResolution) {
+        final String destinationPath = 
createPath(directoryClient.getDirectoryPath(), fileName);
+
+        try {
+            final boolean overwrite = 
conflictResolution.equals(REPLACE_RESOLUTION);
+            return directoryClient.createFile(fileName, overwrite);
+        } catch (DataLakeStorageException dataLakeStorageException) {
+            return handleDataLakeStorageException(dataLakeStorageException, 
destinationPath, conflictResolution);
+        }
+    }
+
+    /**
+     * This method serves as a "commit" for the upload process in case of 
'Write and Rename' strategy. In order to prevent work-in-progress files from 
being available for readers,
+     * a temporary file is written first, and then renamed/moved to its final 
destination. It is not an efficient approach in case of conflicts because 
FlowFiles are uploaded unnecessarily,
+     * but it is a calculated risk because consistency is more important for 
'Write and Rename' strategy.
+     * <p>
      * Visible for testing
      *
-     * @param sourceFileClient client of the temporary file
+     * @param sourceFileClient file client of the temporary file
      * @param destinationDirectory final location of the uploaded file
      * @param destinationFileName final name of the uploaded file
      * @param conflictResolution conflict resolution strategy
-     * @return URL of the uploaded file
+     * @return the file client of the uploaded file or {@code null} if the 
file already exists and conflict resolution strategy is 'ignore'
+     * @throws ProcessException if the file already exists and the conflict 
resolution strategy is 'fail'; also in case of other errors
      */
-    String renameFile(final DataLakeFileClient sourceFileClient, final String 
destinationDirectory, final String destinationFileName, final String 
conflictResolution) {
+    DataLakeFileClient renameFile(final DataLakeFileClient sourceFileClient, 
final String destinationDirectory, final String destinationFileName, final 
String conflictResolution) {
         final String destinationPath = createPath(destinationDirectory, 
destinationFileName);
 
         try {
@@ -243,18 +295,24 @@ public class PutAzureDataLakeStorage extends 
AbstractAzureDataLakeStorageProcess
             if (!conflictResolution.equals(REPLACE_RESOLUTION)) {
                 destinationCondition.setIfNoneMatch("*");
             }
-            return sourceFileClient.renameWithResponse(null, destinationPath, 
null, destinationCondition, null, null).getValue().getFileUrl();
+            return sourceFileClient.renameWithResponse(null, destinationPath, 
null, destinationCondition, null, null).getValue();
         } catch (DataLakeStorageException dataLakeStorageException) {
-            removeTempFile(sourceFileClient);
-            if (dataLakeStorageException.getStatusCode() == 409 && 
conflictResolution.equals(IGNORE_RESOLUTION)) {
-                getLogger().info("File [{}] already exists. Remote file not 
modified due to {} being set to '{}'.",
-                        destinationPath, CONFLICT_RESOLUTION.getDisplayName(), 
conflictResolution);
-                return null;
-            } else if (dataLakeStorageException.getStatusCode() == 409 && 
conflictResolution.equals(FAIL_RESOLUTION)) {
-                throw new ProcessException(String.format("File [%s] already 
exists.", destinationPath), dataLakeStorageException);
-            } else {
-                throw new ProcessException(String.format("Renaming File [%s] 
failed", destinationPath), dataLakeStorageException);
-            }
+            removeFile(sourceFileClient);
+
+            return handleDataLakeStorageException(dataLakeStorageException, 
destinationPath, conflictResolution);
+        }
+    }
+
+    private DataLakeFileClient handleDataLakeStorageException(final 
DataLakeStorageException dataLakeStorageException, final String 
destinationPath, final String conflictResolution) {
+        final boolean fileAlreadyExists = 
dataLakeStorageException.getStatusCode() == 409;
+        if (fileAlreadyExists && conflictResolution.equals(IGNORE_RESOLUTION)) 
{
+            getLogger().info("File [{}] already exists. Remote file not 
modified due to {} being set to '{}'.",
+                    destinationPath, CONFLICT_RESOLUTION.getDisplayName(), 
conflictResolution);
+            return null;
+        } else if (fileAlreadyExists && 
conflictResolution.equals(FAIL_RESOLUTION)) {
+            throw new ProcessException(String.format("File [%s] already 
exists.", destinationPath), dataLakeStorageException);
+        } else {
+            throw new ProcessException(String.format("File operation failed 
[%s]", destinationPath), dataLakeStorageException);
         }
     }
 
@@ -264,7 +322,7 @@ public class PutAzureDataLakeStorage extends 
AbstractAzureDataLakeStorageProcess
                 : path;
     }
 
-    private void removeTempFile(final DataLakeFileClient fileClient) {
+    private void removeFile(final DataLakeFileClient fileClient) {
         try {
             fileClient.delete();
         } catch (Exception e) {
diff --git 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/WritingStrategy.java
 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/WritingStrategy.java
new file mode 100644
index 0000000000..6fda36623d
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/java/org/apache/nifi/processors/azure/storage/utils/WritingStrategy.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.azure.storage.utils;
+
+import org.apache.nifi.components.DescribedValue;
+
+public enum WritingStrategy implements DescribedValue {
+
+    WRITE_AND_RENAME("Write and Rename", "The processor writes the Azure file 
into a temporary directory and then renames/moves it to the final destination." 
+
+            " This prevents other processes from reading partially written 
files."),
+    SIMPLE_WRITE("Simple Write", "The processor writes the Azure file directly 
to the destination. This might result in the reading of partially written 
files.");
+
+    private final String displayName;
+    private final String description;
+
+    WritingStrategy(String displayName, String description) {
+        this.displayName = displayName;
+        this.description = description;
+    }
+
+    @Override
+    public String getValue() {
+        return name();
+    }
+
+    @Override
+    public String getDisplayName() {
+        return displayName;
+    }
+
+    @Override
+    public String getDescription() {
+        return description;
+    }
+}
diff --git 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/docs/org.apache.nifi.processors.azure.storage.PutAzureDataLakeStorage/additionalDetails.html
 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/docs/org.apache.nifi.processors.azure.storage.PutAzureDataLakeStorage/additionalDetails.html
index 2469ceafaa..4758d4c19e 100644
--- 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/docs/org.apache.nifi.processors.azure.storage.PutAzureDataLakeStorage/additionalDetails.html
+++ 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/main/resources/docs/org.apache.nifi.processors.azure.storage.PutAzureDataLakeStorage/additionalDetails.html
@@ -26,31 +26,57 @@
     This processor is responsible for uploading files to Azure Data Lake 
Storage Gen2.
 </p>
 
-<h3>File uploading and cleanup process</h3>
+<h3>File uploading and cleanup process in case of "Write and Rename" 
strategy</h3>
 
 <h4>New file upload</h4>
 
 <ol>
     <li>A temporary file is created with random prefix under the given path in 
'_nifitempdirectory'.</li>
     <li>Content is appended to temp file.</li>
-    <li>Temp file is renamed to its original name, the original file is 
overwritten.</li>
-    <li>In case of appending or renaming failure the temp file is deleted, the 
original file remains intact.</li>
-    <li>In case of temporary file deletion failure both temp file and original 
file remain on the server.</li>
+    <li>Temp file is moved to the final destination directory and renamed to 
its original name.</li>
+    <li>In case of appending or renaming failure, the temp file is 
deleted.</li>
+    <li>In case of temporary file deletion failure, the temp file remains on 
the server.</li>
 </ol>
 
 <h4>Existing file upload</h4>
 
 <ul>
-    <li>Processors with "fail" conflict resolution strategy will be directed 
to "Failure" relationship.</li>
-    <li>Processors with "ignore" conflict resolution strategy will be directed 
to "Success" relationship.</li>
+    <li>Processors with "fail" conflict resolution strategy will direct the 
FlowFile to "Failure" relationship.</li>
+    <li>Processors with "ignore" conflict resolution strategy will direct the 
FlowFile to "Success" relationship.</li>
     <li>Processors with "replace" conflict resolution strategy:</li>
 
     <ol>
         <li>A temporary file is created with random prefix under the given 
path in '_nifitempdirectory'.</li>
         <li>Content is appended to temp file.</li>
-        <li>Temp file is renamed to its original name, the original file is 
overwritten.</li>
-        <li>In case of appending or renaming failure the temp file is deleted, 
the original file remains intact.</li>
-        <li>In case of temporary file deletion failure both temp file and 
original file remain on the server.</li>
+        <li>Temp file is moved to the final destination directory and renamed 
to its original name, the original file is overwritten.</li>
+        <li>In case of appending or renaming failure, the temp file is deleted 
and the original file remains intact.</li>
+        <li>In case of temporary file deletion failure, both temp file and 
original file remain on the server.</li>
+    </ol>
+</ul>
+
+<h3>File uploading and cleanup process in case of "Simple Write" strategy</h3>
+
+<h4>New file upload</h4>
+
+<ol>
+    <li>An empty file is created at its final destination.</li>
+    <li>Content is appended to the file.</li>
+    <li>In case of appending failure, the file is deleted.</li>
+    <li>In case of file deletion failure, the file remains on the server.</li>
+</ol>
+
+<h4>Existing file upload</h4>
+
+<ul>
+    <li>Processors with "fail" conflict resolution strategy will direct the 
FlowFile to "Failure" relationship.</li>
+    <li>Processors with "ignore" conflict resolution strategy will direct the 
FlowFile to "Success" relationship.</li>
+    <li>Processors with "replace" conflict resolution strategy:</li>
+
+    <ol>
+        <li>An empty file is created at its final destination, the original 
file is overwritten.</li>
+        <li>Content is appended to the file.</li>
+        <li>In case of appending failure, the file is deleted and the original 
file is not restored.</li>
+        <li>In case of file deletion failure, the file remains on the 
server.</li>
     </ol>
 </ul>
 
diff --git 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureDataLakeStorage.java
 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureDataLakeStorage.java
index 171e3a5e75..bb5498b425 100644
--- 
a/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureDataLakeStorage.java
+++ 
b/nifi-nar-bundles/nifi-azure-bundle/nifi-azure-processors/src/test/java/org/apache/nifi/processors/azure/storage/ITPutAzureDataLakeStorage.java
@@ -22,13 +22,15 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.fileresource.service.StandardFileResourceService;
 import org.apache.nifi.fileresource.service.api.FileResourceService;
 import org.apache.nifi.processor.Processor;
+import org.apache.nifi.processors.azure.storage.utils.WritingStrategy;
 import org.apache.nifi.processors.transfer.ResourceTransferProperties;
 import org.apache.nifi.processors.transfer.ResourceTransferSource;
 import org.apache.nifi.provenance.ProvenanceEventRecord;
 import org.apache.nifi.provenance.ProvenanceEventType;
 import org.apache.nifi.util.MockFlowFile;
 import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
 
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
@@ -68,27 +70,23 @@ public class ITPutAzureDataLakeStorage extends 
AbstractAzureDataLakeStorageIT {
         runner.setProperty(PutAzureDataLakeStorage.FILE, FILE_NAME);
     }
 
-    @Test
-    public void testPutFileToExistingDirectory() throws Exception {
-        fileSystemClient.createDirectory(DIRECTORY);
-
-        runProcessor(FILE_DATA);
+    @ParameterizedTest
+    @EnumSource(WritingStrategy.class)
+    public void testPutFileToExistingDirectory(WritingStrategy 
writingStrategy) throws Exception {
+        setWritingStrategy(writingStrategy);
 
-        assertSuccess(DIRECTORY, FILE_NAME, FILE_DATA);
-    }
-
-    @Test
-    public void testPutFileToExistingDirectoryUsingProxyConfigurationService() 
throws Exception {
         fileSystemClient.createDirectory(DIRECTORY);
-        configureProxyService();
 
         runProcessor(FILE_DATA);
 
         assertSuccess(DIRECTORY, FILE_NAME, FILE_DATA);
     }
 
-    @Test
-    public void testPutFileToExistingDirectoryWithReplaceResolution() throws 
Exception {
+    @ParameterizedTest
+    @EnumSource(WritingStrategy.class)
+    public void 
testPutFileToExistingDirectoryWithReplaceResolution(WritingStrategy 
writingStrategy) throws Exception {
+        setWritingStrategy(writingStrategy);
+
         fileSystemClient.createDirectory(DIRECTORY);
 
         runner.setProperty(PutAzureDataLakeStorage.CONFLICT_RESOLUTION, 
PutAzureDataLakeStorage.REPLACE_RESOLUTION);
@@ -98,8 +96,11 @@ public class ITPutAzureDataLakeStorage extends 
AbstractAzureDataLakeStorageIT {
         assertSuccess(DIRECTORY, FILE_NAME, FILE_DATA);
     }
 
-    @Test
-    public void testPutFileToExistingDirectoryWithIgnoreResolution() throws 
Exception {
+    @ParameterizedTest
+    @EnumSource(WritingStrategy.class)
+    public void 
testPutFileToExistingDirectoryWithIgnoreResolution(WritingStrategy 
writingStrategy) throws Exception {
+        setWritingStrategy(writingStrategy);
+
         fileSystemClient.createDirectory(DIRECTORY);
 
         runner.setProperty(PutAzureDataLakeStorage.CONFLICT_RESOLUTION, 
PutAzureDataLakeStorage.IGNORE_RESOLUTION);
@@ -109,15 +110,21 @@ public class ITPutAzureDataLakeStorage extends 
AbstractAzureDataLakeStorageIT {
         assertSuccess(DIRECTORY, FILE_NAME, FILE_DATA);
     }
 
-    @Test
-    public void testPutFileToNonExistingDirectory() throws Exception {
+    @ParameterizedTest
+    @EnumSource(WritingStrategy.class)
+    public void testPutFileToNonExistingDirectory(WritingStrategy 
writingStrategy) throws Exception {
+        setWritingStrategy(writingStrategy);
+
         runProcessor(FILE_DATA);
 
         assertSuccess(DIRECTORY, FILE_NAME, FILE_DATA);
     }
 
-    @Test
-    public void testPutFileToDeepDirectory() throws Exception {
+    @ParameterizedTest
+    @EnumSource(WritingStrategy.class)
+    public void testPutFileToDeepDirectory(WritingStrategy writingStrategy) 
throws Exception {
+        setWritingStrategy(writingStrategy);
+
         String baseDirectory = "dir1/dir2";
         String fullDirectory = baseDirectory + "/dir3/dir4";
         fileSystemClient.createDirectory(baseDirectory);
@@ -128,8 +135,11 @@ public class ITPutAzureDataLakeStorage extends 
AbstractAzureDataLakeStorageIT {
         assertSuccess(fullDirectory, FILE_NAME, FILE_DATA);
     }
 
-    @Test
-    public void testPutFileToRootDirectory() throws Exception {
+    @ParameterizedTest
+    @EnumSource(WritingStrategy.class)
+    public void testPutFileToRootDirectory(WritingStrategy writingStrategy) 
throws Exception {
+        setWritingStrategy(writingStrategy);
+
         String rootDirectory = "";
         runner.setProperty(PutAzureDataLakeStorage.DIRECTORY, rootDirectory);
 
@@ -138,8 +148,11 @@ public class ITPutAzureDataLakeStorage extends 
AbstractAzureDataLakeStorageIT {
         assertSuccess(rootDirectory, FILE_NAME, FILE_DATA);
     }
 
-    @Test
-    public void testPutEmptyFile() throws Exception {
+    @ParameterizedTest
+    @EnumSource(WritingStrategy.class)
+    public void testPutEmptyFile(WritingStrategy writingStrategy) throws 
Exception {
+        setWritingStrategy(writingStrategy);
+
         byte[] fileData = new byte[0];
 
         runProcessor(fileData);
@@ -147,8 +160,11 @@ public class ITPutAzureDataLakeStorage extends 
AbstractAzureDataLakeStorageIT {
         assertSuccess(DIRECTORY, FILE_NAME, fileData);
     }
 
-    @Test
-    public void testPutBigFile() throws Exception {
+    @ParameterizedTest
+    @EnumSource(WritingStrategy.class)
+    public void testPutBigFile(WritingStrategy writingStrategy) throws 
Exception {
+        setWritingStrategy(writingStrategy);
+
         Random random = new Random();
         byte[] fileData = new byte[120_000_000];
         random.nextBytes(fileData);
@@ -158,8 +174,11 @@ public class ITPutAzureDataLakeStorage extends 
AbstractAzureDataLakeStorageIT {
         assertSuccess(DIRECTORY, FILE_NAME, fileData);
     }
 
-    @Test
-    public void testPutFileWithNonExistingFileSystem() {
+    @ParameterizedTest
+    @EnumSource(WritingStrategy.class)
+    public void testPutFileWithNonExistingFileSystem(WritingStrategy 
writingStrategy) {
+        setWritingStrategy(writingStrategy);
+
         runner.setProperty(PutAzureDataLakeStorage.FILESYSTEM, "dummy");
 
         runProcessor(FILE_DATA);
@@ -167,8 +186,11 @@ public class ITPutAzureDataLakeStorage extends 
AbstractAzureDataLakeStorageIT {
         assertFailure();
     }
 
-    @Test
-    public void testPutFileWithInvalidFileName() {
+    @ParameterizedTest
+    @EnumSource(WritingStrategy.class)
+    public void testPutFileWithInvalidFileName(WritingStrategy 
writingStrategy) {
+        setWritingStrategy(writingStrategy);
+
         runner.setProperty(PutAzureDataLakeStorage.FILE, "/file1");
 
         runProcessor(FILE_DATA);
@@ -176,8 +198,11 @@ public class ITPutAzureDataLakeStorage extends 
AbstractAzureDataLakeStorageIT {
         assertFailure();
     }
 
-    @Test
-    public void testPutFileWithSpacesInDirectoryAndFileName() throws Exception 
{
+    @ParameterizedTest
+    @EnumSource(WritingStrategy.class)
+    public void testPutFileWithSpacesInDirectoryAndFileName(WritingStrategy 
writingStrategy) throws Exception {
+        setWritingStrategy(writingStrategy);
+
         String directory = "dir 1";
         String fileName = "file 1";
         runner.setProperty(PutAzureDataLakeStorage.DIRECTORY, directory);
@@ -188,8 +213,11 @@ public class ITPutAzureDataLakeStorage extends 
AbstractAzureDataLakeStorageIT {
         assertSuccess(directory, fileName, FILE_DATA);
     }
 
-    @Test
-    public void testPutFileToExistingFileWithFailResolution() {
+    @ParameterizedTest
+    @EnumSource(WritingStrategy.class)
+    public void testPutFileToExistingFileWithFailResolution(WritingStrategy 
writingStrategy) {
+        setWritingStrategy(writingStrategy);
+
         fileSystemClient.createFile(String.format("%s/%s", DIRECTORY, 
FILE_NAME));
 
         runProcessor(FILE_DATA);
@@ -197,8 +225,11 @@ public class ITPutAzureDataLakeStorage extends 
AbstractAzureDataLakeStorageIT {
         assertFailure();
     }
 
-    @Test
-    public void testPutFileToExistingFileWithReplaceResolution() throws 
Exception {
+    @ParameterizedTest
+    @EnumSource(WritingStrategy.class)
+    public void testPutFileToExistingFileWithReplaceResolution(WritingStrategy 
writingStrategy) throws Exception {
+        setWritingStrategy(writingStrategy);
+
         fileSystemClient.createFile(String.format("%s/%s", DIRECTORY, 
FILE_NAME));
 
         runner.setProperty(PutAzureDataLakeStorage.CONFLICT_RESOLUTION, 
PutAzureDataLakeStorage.REPLACE_RESOLUTION);
@@ -208,8 +239,11 @@ public class ITPutAzureDataLakeStorage extends 
AbstractAzureDataLakeStorageIT {
         assertSuccess(DIRECTORY, FILE_NAME, FILE_DATA);
     }
 
-    @Test
-    public void testPutFileToExistingFileWithIgnoreResolution() throws 
Exception {
+    @ParameterizedTest
+    @EnumSource(WritingStrategy.class)
+    public void testPutFileToExistingFileWithIgnoreResolution(WritingStrategy 
writingStrategy) throws Exception {
+        setWritingStrategy(writingStrategy);
+
         String azureFileContent = "AzureFileContent";
         createDirectoryAndUploadFile(DIRECTORY, FILE_NAME, azureFileContent);
 
@@ -220,8 +254,11 @@ public class ITPutAzureDataLakeStorage extends 
AbstractAzureDataLakeStorageIT {
         assertSuccessWithIgnoreResolution(DIRECTORY, FILE_NAME, FILE_DATA, 
azureFileContent.getBytes(StandardCharsets.UTF_8));
     }
 
-    @Test
-    public void testPutFileWithEL() throws Exception {
+    @ParameterizedTest
+    @EnumSource(WritingStrategy.class)
+    public void testPutFileWithEL(WritingStrategy writingStrategy) throws 
Exception {
+        setWritingStrategy(writingStrategy);
+
         Map<String, String> attributes = createAttributesMap();
         setELProperties();
 
@@ -230,8 +267,11 @@ public class ITPutAzureDataLakeStorage extends 
AbstractAzureDataLakeStorageIT {
         assertSuccess(DIRECTORY, FILE_NAME, FILE_DATA);
     }
 
-    @Test
-    public void testPutFileWithELButFilesystemIsNotSpecified() {
+    @ParameterizedTest
+    @EnumSource(WritingStrategy.class)
+    public void testPutFileWithELButFilesystemIsNotSpecified(WritingStrategy 
writingStrategy) {
+        setWritingStrategy(writingStrategy);
+
         Map<String, String> attributes = createAttributesMap();
         attributes.remove(EL_FILESYSTEM);
         setELProperties();
@@ -241,8 +281,11 @@ public class ITPutAzureDataLakeStorage extends 
AbstractAzureDataLakeStorageIT {
         assertFailure();
     }
 
-    @Test
-    public void testPutFileWithELButFileNameIsNotSpecified() {
+    @ParameterizedTest
+    @EnumSource(WritingStrategy.class)
+    public void testPutFileWithELButFileNameIsNotSpecified(WritingStrategy 
writingStrategy) {
+        setWritingStrategy(writingStrategy);
+
         Map<String, String> attributes = createAttributesMap();
         attributes.remove(EL_FILE_NAME);
         setELProperties();
@@ -252,8 +295,11 @@ public class ITPutAzureDataLakeStorage extends 
AbstractAzureDataLakeStorageIT {
         assertFailure();
     }
 
-    @Test
-    public void testPutFileFromLocalFile() throws Exception {
+    @ParameterizedTest
+    @EnumSource(WritingStrategy.class)
+    public void testPutFileFromLocalFile(WritingStrategy writingStrategy) 
throws Exception {
+        setWritingStrategy(writingStrategy);
+
         String attributeName = "file.path";
 
         String serviceId = FileResourceService.class.getSimpleName();
@@ -279,6 +325,19 @@ public class ITPutAzureDataLakeStorage extends 
AbstractAzureDataLakeStorageIT {
         assertProvenanceEvents();
     }
 
+    @ParameterizedTest
+    @EnumSource(WritingStrategy.class)
+    public void testPutFileUsingProxy(WritingStrategy writingStrategy) throws 
Exception {
+        setWritingStrategy(writingStrategy);
+
+        fileSystemClient.createDirectory(DIRECTORY);
+        configureProxyService();
+
+        runProcessor(FILE_DATA);
+
+        assertSuccess(DIRECTORY, FILE_NAME, FILE_DATA);
+    }
+
     private Map<String, String> createAttributesMap() {
         Map<String, String> attributes = new HashMap<>();
 
@@ -289,6 +348,10 @@ public class ITPutAzureDataLakeStorage extends 
AbstractAzureDataLakeStorageIT {
         return attributes;
     }
 
+    private void setWritingStrategy(WritingStrategy writingStrategy) {
+        runner.setProperty(PutAzureDataLakeStorage.WRITING_STRATEGY, 
writingStrategy.getValue());
+    }
+
     private void setELProperties() {
         runner.setProperty(PutAzureDataLakeStorage.FILESYSTEM, 
String.format("${%s}", EL_FILESYSTEM));
         runner.setProperty(PutAzureDataLakeStorage.DIRECTORY, 
String.format("${%s}", EL_DIRECTORY));


Reply via email to