This is an automated email from the ASF dual-hosted git repository.

joewitt pushed a commit to branch support/nifi-1.15
in repository https://gitbox.apache.org/repos/asf/nifi.git

commit 534e6eafe7b8622e6d141802a40d7c6e01a09361
Author: Bence Simon <simonbence....@gmail.com>
AuthorDate: Thu Sep 30 15:57:41 2021 +0200

    NIFI-9260 Making the 'write and rename' behaviour optional for PutHDFS
    
    This closes #5423.
    
    Signed-off-by: Peter Turcsanyi <turcsa...@apache.org>
---
 .../org/apache/nifi/processors/hadoop/PutHDFS.java | 35 ++++++++--
 .../apache/nifi/processors/hadoop/PutHDFSTest.java | 74 +++++++++++++++++-----
 2 files changed, 90 insertions(+), 19 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
index 62b7996..d5d85fc 100644
--- 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
+++ 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
@@ -125,6 +125,9 @@ public class PutHDFS extends AbstractHadoopProcessor {
     protected static final String FAIL_RESOLUTION = "fail";
     protected static final String APPEND_RESOLUTION = "append";
 
+    protected static final String WRITE_AND_RENAME = "writeAndRename";
+    protected static final String SIMPLE_WRITE = "simpleWrite";
+
     protected static final AllowableValue REPLACE_RESOLUTION_AV = new 
AllowableValue(REPLACE_RESOLUTION,
             REPLACE_RESOLUTION, "Replaces the existing file if any.");
     protected static final AllowableValue IGNORE_RESOLUTION_AV = new 
AllowableValue(IGNORE_RESOLUTION, IGNORE_RESOLUTION,
@@ -134,6 +137,11 @@ public class PutHDFS extends AbstractHadoopProcessor {
     protected static final AllowableValue APPEND_RESOLUTION_AV = new 
AllowableValue(APPEND_RESOLUTION, APPEND_RESOLUTION,
             "Appends to the existing file if any, creates a new file 
otherwise.");
 
+    protected static final AllowableValue WRITE_AND_RENAME_AV = new 
AllowableValue(WRITE_AND_RENAME, "Write and rename",
+            "The processor writes FlowFile data into a temporary file and 
renames it after completion. This prevents other processes from reading 
partially written files.");
+    protected static final AllowableValue SIMPLE_WRITE_AV = new 
AllowableValue(SIMPLE_WRITE, "Simple write",
+            "The processor writes FlowFile data directly to the destination 
file. In some cases this might cause reading partially written files.");
+
     protected static final PropertyDescriptor CONFLICT_RESOLUTION = new 
PropertyDescriptor.Builder()
             .name("Conflict Resolution Strategy")
             .description("Indicates what should happen when a file with the 
same name already exists in the output directory")
@@ -142,6 +150,15 @@ public class PutHDFS extends AbstractHadoopProcessor {
             .allowableValues(REPLACE_RESOLUTION_AV, IGNORE_RESOLUTION_AV, 
FAIL_RESOLUTION_AV, APPEND_RESOLUTION_AV)
             .build();
 
+    protected static final PropertyDescriptor WRITING_STRATEGY = new 
PropertyDescriptor.Builder()
+            .name("writing-strategy")
+            .displayName("Writing Strategy")
+            .description("Defines the approach for writing the FlowFile data.")
+            .required(true)
+            .defaultValue(WRITE_AND_RENAME_AV.getValue())
+            .allowableValues(WRITE_AND_RENAME_AV, SIMPLE_WRITE_AV)
+            .build();
+
     public static final PropertyDescriptor BLOCK_SIZE = new 
PropertyDescriptor.Builder()
             .name("Block Size")
             .description("Size of each block as written to HDFS. This 
overrides the Hadoop Configuration")
@@ -219,6 +236,7 @@ public class PutHDFS extends AbstractHadoopProcessor {
                 .description("The parent HDFS directory to which files should 
be written. The directory will be created if it doesn't exist.")
                 .build());
         props.add(CONFLICT_RESOLUTION);
+        props.add(WRITING_STRATEGY);
         props.add(BLOCK_SIZE);
         props.add(BUFFER_SIZE);
         props.add(REPLICATION_FACTOR);
@@ -280,6 +298,7 @@ public class PutHDFS extends AbstractHadoopProcessor {
                 Path tempDotCopyFile = null;
                 FlowFile putFlowFile = flowFile;
                 try {
+                    final String writingStrategy = 
context.getProperty(WRITING_STRATEGY).getValue();
                     final Path dirPath = getNormalizedPath(context, DIRECTORY, 
putFlowFile);
                     final String conflictResponse = 
context.getProperty(CONFLICT_RESOLUTION).getValue();
                     final long blockSize = getBlockSize(context, session, 
putFlowFile, dirPath);
@@ -295,6 +314,11 @@ public class PutHDFS extends AbstractHadoopProcessor {
                     final Path tempCopyFile = new Path(dirPath, "." + 
filename);
                     final Path copyFile = new Path(dirPath, filename);
 
+                    // Depending on the writing strategy, we might need a 
temporary file
+                    final Path actualCopyFile = 
(writingStrategy.equals(WRITE_AND_RENAME))
+                            ? tempCopyFile
+                            : copyFile;
+
                     // Create destination directory if it does not exist
                     boolean targetDirCreated = false;
                     try {
@@ -361,7 +385,7 @@ public class PutHDFS extends AbstractHadoopProcessor {
                                         
cflags.add(CreateFlag.IGNORE_CLIENT_LOCALITY);
                                     }
 
-                                    fos = hdfs.create(tempCopyFile, 
FsCreateModes.applyUMask(FsPermission.getFileDefault(),
+                                    fos = hdfs.create(actualCopyFile, 
FsCreateModes.applyUMask(FsPermission.getFileDefault(),
                                             
FsPermission.getUMask(hdfs.getConf())), cflags, bufferSize, replication, 
blockSize,
                                             null, null);
                                 }
@@ -369,7 +393,7 @@ public class PutHDFS extends AbstractHadoopProcessor {
                                 if (codec != null) {
                                     fos = codec.createOutputStream(fos);
                                 }
-                                createdFile = tempCopyFile;
+                                createdFile = actualCopyFile;
                                 BufferedInputStream bis = new 
BufferedInputStream(in);
                                 StreamUtils.copy(bis, fos);
                                 bis = null;
@@ -399,9 +423,12 @@ public class PutHDFS extends AbstractHadoopProcessor {
                     final long millis = 
stopWatch.getDuration(TimeUnit.MILLISECONDS);
                     tempDotCopyFile = tempCopyFile;
 
-                    if (!conflictResponse.equals(APPEND_RESOLUTION)
-                            || (conflictResponse.equals(APPEND_RESOLUTION) && 
!destinationExists)) {
+                    if  (
+                        writingStrategy.equals(WRITE_AND_RENAME)
+                        && (!conflictResponse.equals(APPEND_RESOLUTION) || 
(conflictResponse.equals(APPEND_RESOLUTION) && !destinationExists))
+                    ) {
                         boolean renamed = false;
+
                         for (int i = 0; i < 10; i++) { // try to rename 
multiple times.
                             if (hdfs.rename(tempCopyFile, copyFile)) {
                                 renamed = true;
diff --git 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java
 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java
index c41114b..22b3ec2 100644
--- 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java
+++ 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java
@@ -71,6 +71,9 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 
 public class PutHDFSTest {
+    private final static String TARGET_DIRECTORY = "target/test-classes";
+    private final static String SOURCE_DIRECTORY = 
"src/test/resources/testdata";
+    private final static String FILE_NAME = "randombytes-1";
 
     private KerberosProperties kerberosProperties;
     private FileSystem mockFileSystem;
@@ -197,27 +200,32 @@ public class PutHDFSTest {
 
     @Test
     public void testPutFile() throws IOException {
-        PutHDFS proc = new TestablePutHDFS(kerberosProperties, mockFileSystem);
-        TestRunner runner = TestRunners.newTestRunner(proc);
-        runner.setProperty(PutHDFS.DIRECTORY, "target/test-classes");
-        runner.setProperty(PutHDFS.CONFLICT_RESOLUTION, "replace");
-        try (FileInputStream fis = new 
FileInputStream("src/test/resources/testdata/randombytes-1")) {
-            Map<String, String> attributes = new HashMap<>();
-            attributes.put(CoreAttributes.FILENAME.key(), "randombytes-1");
+        // given
+        final FileSystem spyFileSystem = Mockito.spy(mockFileSystem);
+        final PutHDFS proc = new TestablePutHDFS(kerberosProperties, 
spyFileSystem);
+        final TestRunner runner = TestRunners.newTestRunner(proc);
+        runner.setProperty(PutHDFS.DIRECTORY, TARGET_DIRECTORY);
+        runner.setProperty(PutHDFS.CONFLICT_RESOLUTION, 
PutHDFS.REPLACE_RESOLUTION);
+
+        // when
+        try (final FileInputStream fis = new FileInputStream(SOURCE_DIRECTORY 
+ "/" + FILE_NAME)) {
+            final Map<String, String> attributes = new HashMap<>();
+            attributes.put(CoreAttributes.FILENAME.key(), FILE_NAME);
             runner.enqueue(fis, attributes);
             runner.run();
         }
 
-        List<MockFlowFile> failedFlowFiles = runner
-                .getFlowFilesForRelationship(new 
Relationship.Builder().name("failure").build());
+        // then
+        final List<MockFlowFile> failedFlowFiles = 
runner.getFlowFilesForRelationship(PutHDFS.REL_FAILURE);
         assertTrue(failedFlowFiles.isEmpty());
 
-        List<MockFlowFile> flowFiles = 
runner.getFlowFilesForRelationship(PutHDFS.REL_SUCCESS);
+        final List<MockFlowFile> flowFiles = 
runner.getFlowFilesForRelationship(PutHDFS.REL_SUCCESS);
         assertEquals(1, flowFiles.size());
-        MockFlowFile flowFile = flowFiles.get(0);
-        assertTrue(mockFileSystem.exists(new 
Path("target/test-classes/randombytes-1")));
-        assertEquals("randombytes-1", 
flowFile.getAttribute(CoreAttributes.FILENAME.key()));
-        assertEquals("target/test-classes", 
flowFile.getAttribute(PutHDFS.ABSOLUTE_HDFS_PATH_ATTRIBUTE));
+
+        final MockFlowFile flowFile = flowFiles.get(0);
+        assertTrue(spyFileSystem.exists(new Path(TARGET_DIRECTORY + "/" + 
FILE_NAME)));
+        assertEquals(FILE_NAME, 
flowFile.getAttribute(CoreAttributes.FILENAME.key()));
+        assertEquals(TARGET_DIRECTORY, 
flowFile.getAttribute(PutHDFS.ABSOLUTE_HDFS_PATH_ATTRIBUTE));
         assertEquals("true", 
flowFile.getAttribute(PutHDFS.TARGET_HDFS_DIR_CREATED_ATTRIBUTE));
 
         final List<ProvenanceEventRecord> provenanceEvents = 
runner.getProvenanceEvents();
@@ -225,7 +233,43 @@ public class PutHDFSTest {
         final ProvenanceEventRecord sendEvent = provenanceEvents.get(0);
         assertEquals(ProvenanceEventType.SEND, sendEvent.getEventType());
         // If it runs with a real HDFS, the protocol will be "hdfs://", but 
with a local filesystem, just assert the filename.
-        
assertTrue(sendEvent.getTransitUri().endsWith("target/test-classes/randombytes-1"));
+        assertTrue(sendEvent.getTransitUri().endsWith(TARGET_DIRECTORY + "/" + 
FILE_NAME));
+
+        Mockito.verify(spyFileSystem, 
Mockito.times(1)).rename(Mockito.any(Path.class), Mockito.any(Path.class));
+    }
+
+    @Test
+    public void testPutFileWithSimpleWrite() throws IOException {
+        // given
+        final FileSystem spyFileSystem = Mockito.spy(mockFileSystem);
+        final PutHDFS proc = new TestablePutHDFS(kerberosProperties, 
spyFileSystem);
+        final TestRunner runner = TestRunners.newTestRunner(proc);
+        runner.setProperty(PutHDFS.DIRECTORY, TARGET_DIRECTORY);
+        runner.setProperty(PutHDFS.CONFLICT_RESOLUTION, 
PutHDFS.REPLACE_RESOLUTION);
+        runner.setProperty(PutHDFS.WRITING_STRATEGY, PutHDFS.SIMPLE_WRITE);
+
+        // when
+        try (final FileInputStream fis = new FileInputStream(SOURCE_DIRECTORY 
+ "/" + FILE_NAME)) {
+            final Map<String, String> attributes = new HashMap<>();
+            attributes.put(CoreAttributes.FILENAME.key(), FILE_NAME);
+            runner.enqueue(fis, attributes);
+            runner.run();
+        }
+
+        // then
+        final List<MockFlowFile> failedFlowFiles = 
runner.getFlowFilesForRelationship(PutHDFS.REL_FAILURE);
+        assertTrue(failedFlowFiles.isEmpty());
+
+        final List<MockFlowFile> flowFiles = 
runner.getFlowFilesForRelationship(PutHDFS.REL_SUCCESS);
+        assertEquals(1, flowFiles.size());
+
+        final MockFlowFile flowFile = flowFiles.get(0);
+        assertTrue(spyFileSystem.exists(new Path(TARGET_DIRECTORY + "/" + 
FILE_NAME)));
+        assertEquals(FILE_NAME, 
flowFile.getAttribute(CoreAttributes.FILENAME.key()));
+        assertEquals(TARGET_DIRECTORY, 
flowFile.getAttribute(PutHDFS.ABSOLUTE_HDFS_PATH_ATTRIBUTE));
+        assertEquals("true", 
flowFile.getAttribute(PutHDFS.TARGET_HDFS_DIR_CREATED_ATTRIBUTE));
+
+        Mockito.verify(spyFileSystem, 
Mockito.never()).rename(Mockito.any(Path.class), Mockito.any(Path.class));
     }
 
     @Test

Reply via email to