This is an automated email from the ASF dual-hosted git repository.

bbende pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new c7a2885  NIFI-6679 - MoveHDFS removes original file when destination 
exists - Include testPutWhenAlreadyExisting test - Resolve dependencies
c7a2885 is described below

commit c7a2885c0c5c9241976e10bef21d182d1f63d5aa
Author: eduardofontes <[email protected]>
AuthorDate: Wed Sep 18 11:27:24 2019 -0300

    NIFI-6679 - MoveHDFS removes original file when destination exists
    - Include testPutWhenAlreadyExisting test
    - Resolve dependencies
    
    This closes #3746.
    
    Signed-off-by: Bryan Bende <[email protected]>
---
 .../apache/nifi/processors/hadoop/MoveHDFS.java    |  7 +--
 .../nifi/processors/hadoop/MoveHDFSTest.java       | 50 +++++++++++++++++++++-
 2 files changed, 52 insertions(+), 5 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/MoveHDFS.java
 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/MoveHDFS.java
index 95bcdeb..ee023e6 100644
--- 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/MoveHDFS.java
+++ 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/MoveHDFS.java
@@ -357,9 +357,10 @@ public class MoveHDFS extends AbstractHadoopProcessor {
                         if (destinationExists) {
                             switch (processorConfig.getConflictResolution()) {
                                 case REPLACE_RESOLUTION:
-                                    if (hdfs.delete(file, false)) {
+                                    // Remove destination file (newFile) to 
replace
+                                    if (hdfs.delete(newFile, false)) {
                                         getLogger().info("deleted {} in order 
to replace with the contents of {}",
-                                                new Object[]{file, flowFile});
+                                                new Object[]{newFile, 
flowFile});
                                     }
                                     break;
                                 case IGNORE_RESOLUTION:
@@ -547,4 +548,4 @@ public class MoveHDFS extends AbstractHadoopProcessor {
             };
         }
     }
-}
\ No newline at end of file
+}
diff --git 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/MoveHDFSTest.java
 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/MoveHDFSTest.java
index c55a2b1..7d14b5d 100644
--- 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/MoveHDFSTest.java
+++ 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/MoveHDFSTest.java
@@ -20,6 +20,7 @@ import org.apache.commons.io.FileUtils;
 import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.hadoop.KerberosProperties;
 import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.MockProcessContext;
 import org.apache.nifi.util.NiFiProperties;
@@ -38,6 +39,7 @@ import java.util.Collection;
 import java.util.HashSet;
 import java.util.List;
 
+import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -233,6 +235,52 @@ public class MoveHDFSTest {
         Assert.assertEquals(0, flowFiles.size());
     }
 
+    @Test
+    public void testPutWhenAlreadyExistingShouldFailWhenFAIL_RESOLUTION() 
throws IOException {
+        testPutWhenAlreadyExisting(MoveHDFS.FAIL_RESOLUTION, 
MoveHDFS.REL_FAILURE, "randombytes-1");
+    }
+
+    @Test
+    public void testPutWhenAlreadyExistingShouldIgnoreWhenIGNORE_RESOLUTION() 
throws IOException {
+        testPutWhenAlreadyExisting(MoveHDFS.IGNORE_RESOLUTION, 
MoveHDFS.REL_SUCCESS, "randombytes-1");
+    }
+
+    @Test
+    public void 
testPutWhenAlreadyExistingShouldReplaceWhenREPLACE_RESOLUTION() throws 
IOException {
+        testPutWhenAlreadyExisting(MoveHDFS.REPLACE_RESOLUTION, 
MoveHDFS.REL_SUCCESS, "randombytes-2");
+    }
+
+    private void testPutWhenAlreadyExisting(String conflictResolution, 
Relationship expectedDestination, String expectedContent) throws IOException {
+      // GIVEN
+      Files.createDirectories(Paths.get(INPUT_DIRECTORY));
+      Files.createDirectories(Paths.get(OUTPUT_DIRECTORY));
+      Files.copy(Paths.get(TEST_DATA_DIRECTORY, "randombytes-2"), 
Paths.get(INPUT_DIRECTORY, "randombytes-1"));
+      Files.copy(Paths.get(TEST_DATA_DIRECTORY, "randombytes-1"), 
Paths.get(OUTPUT_DIRECTORY, "randombytes-1"));
+
+      MoveHDFS processor = new MoveHDFS();
+
+      TestRunner runner = TestRunners.newTestRunner(processor);
+      runner.setProperty(MoveHDFS.INPUT_DIRECTORY_OR_FILE, INPUT_DIRECTORY);
+      runner.setProperty(MoveHDFS.OUTPUT_DIRECTORY, OUTPUT_DIRECTORY);
+      runner.setProperty(MoveHDFS.CONFLICT_RESOLUTION, conflictResolution);
+
+      byte[] expected = Files.readAllBytes(Paths.get(TEST_DATA_DIRECTORY, 
expectedContent));
+
+      // WHEN
+      runner.enqueue(new byte[0]);
+      runner.run();
+
+      // THEN
+      runner.assertAllFlowFilesTransferred(expectedDestination);
+
+      List<MockFlowFile> flowFiles = 
runner.getFlowFilesForRelationship(expectedDestination);
+      Assert.assertEquals(1, flowFiles.size());
+
+      byte[] actual = Files.readAllBytes(Paths.get(OUTPUT_DIRECTORY, 
"randombytes-1"));
+
+      assertArrayEquals(expected, actual);
+    }
+
     private static class TestableMoveHDFS extends MoveHDFS {
 
         private KerberosProperties testKerberosProperties;
@@ -245,7 +293,5 @@ public class MoveHDFSTest {
         protected KerberosProperties getKerberosProperties(File 
kerberosConfigFile) {
             return testKerberosProperties;
         }
-
     }
-
 }

Reply via email to