This is an automated email from the ASF dual-hosted git repository.
bbende pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new c7a2885 NIFI-6679 - MoveHDFS removes original file when destination
exists - Include testPutWhenAlreadyExisting test - Resolve dependencies
c7a2885 is described below
commit c7a2885c0c5c9241976e10bef21d182d1f63d5aa
Author: eduardofontes <[email protected]>
AuthorDate: Wed Sep 18 11:27:24 2019 -0300
NIFI-6679 - MoveHDFS removes original file when destination exists
- Include testPutWhenAlreadyExisting test
- Resolve dependencies
This closes #3746.
Signed-off-by: Bryan Bende <[email protected]>
---
.../apache/nifi/processors/hadoop/MoveHDFS.java | 7 +--
.../nifi/processors/hadoop/MoveHDFSTest.java | 50 +++++++++++++++++++++-
2 files changed, 52 insertions(+), 5 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/MoveHDFS.java
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/MoveHDFS.java
index 95bcdeb..ee023e6 100644
---
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/MoveHDFS.java
+++
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/MoveHDFS.java
@@ -357,9 +357,10 @@ public class MoveHDFS extends AbstractHadoopProcessor {
if (destinationExists) {
switch (processorConfig.getConflictResolution()) {
case REPLACE_RESOLUTION:
- if (hdfs.delete(file, false)) {
+ // Remove destination file (newFile) to
replace
+ if (hdfs.delete(newFile, false)) {
getLogger().info("deleted {} in order
to replace with the contents of {}",
- new Object[]{file, flowFile});
+ new Object[]{newFile,
flowFile});
}
break;
case IGNORE_RESOLUTION:
@@ -547,4 +548,4 @@ public class MoveHDFS extends AbstractHadoopProcessor {
};
}
}
-}
\ No newline at end of file
+}
diff --git
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/MoveHDFSTest.java
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/MoveHDFSTest.java
index c55a2b1..7d14b5d 100644
---
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/MoveHDFSTest.java
+++
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/MoveHDFSTest.java
@@ -20,6 +20,7 @@ import org.apache.commons.io.FileUtils;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.hadoop.KerberosProperties;
import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.Relationship;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.MockProcessContext;
import org.apache.nifi.util.NiFiProperties;
@@ -38,6 +39,7 @@ import java.util.Collection;
import java.util.HashSet;
import java.util.List;
+import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -233,6 +235,52 @@ public class MoveHDFSTest {
Assert.assertEquals(0, flowFiles.size());
}
+ @Test
+ public void testPutWhenAlreadyExistingShouldFailWhenFAIL_RESOLUTION()
throws IOException {
+ testPutWhenAlreadyExisting(MoveHDFS.FAIL_RESOLUTION,
MoveHDFS.REL_FAILURE, "randombytes-1");
+ }
+
+ @Test
+ public void testPutWhenAlreadyExistingShouldIgnoreWhenIGNORE_RESOLUTION()
throws IOException {
+ testPutWhenAlreadyExisting(MoveHDFS.IGNORE_RESOLUTION,
MoveHDFS.REL_SUCCESS, "randombytes-1");
+ }
+
+ @Test
+ public void
testPutWhenAlreadyExistingShouldReplaceWhenREPLACE_RESOLUTION() throws
IOException {
+ testPutWhenAlreadyExisting(MoveHDFS.REPLACE_RESOLUTION,
MoveHDFS.REL_SUCCESS, "randombytes-2");
+ }
+
+ private void testPutWhenAlreadyExisting(String conflictResolution,
Relationship expectedDestination, String expectedContent) throws IOException {
+ // GIVEN
+ Files.createDirectories(Paths.get(INPUT_DIRECTORY));
+ Files.createDirectories(Paths.get(OUTPUT_DIRECTORY));
+ Files.copy(Paths.get(TEST_DATA_DIRECTORY, "randombytes-2"),
Paths.get(INPUT_DIRECTORY, "randombytes-1"));
+ Files.copy(Paths.get(TEST_DATA_DIRECTORY, "randombytes-1"),
Paths.get(OUTPUT_DIRECTORY, "randombytes-1"));
+
+ MoveHDFS processor = new MoveHDFS();
+
+ TestRunner runner = TestRunners.newTestRunner(processor);
+ runner.setProperty(MoveHDFS.INPUT_DIRECTORY_OR_FILE, INPUT_DIRECTORY);
+ runner.setProperty(MoveHDFS.OUTPUT_DIRECTORY, OUTPUT_DIRECTORY);
+ runner.setProperty(MoveHDFS.CONFLICT_RESOLUTION, conflictResolution);
+
+ byte[] expected = Files.readAllBytes(Paths.get(TEST_DATA_DIRECTORY,
expectedContent));
+
+ // WHEN
+ runner.enqueue(new byte[0]);
+ runner.run();
+
+ // THEN
+ runner.assertAllFlowFilesTransferred(expectedDestination);
+
+ List<MockFlowFile> flowFiles =
runner.getFlowFilesForRelationship(expectedDestination);
+ Assert.assertEquals(1, flowFiles.size());
+
+ byte[] actual = Files.readAllBytes(Paths.get(OUTPUT_DIRECTORY,
"randombytes-1"));
+
+ assertArrayEquals(expected, actual);
+ }
+
private static class TestableMoveHDFS extends MoveHDFS {
private KerberosProperties testKerberosProperties;
@@ -245,7 +293,5 @@ public class MoveHDFSTest {
protected KerberosProperties getKerberosProperties(File
kerberosConfigFile) {
return testKerberosProperties;
}
-
}
-
}