Repository: nifi Updated Branches: refs/heads/master 5ec85299e -> f3f7cdbab
NIFI-5719 Ensuring FetchFile routes to failure if the move completion strategy can't be completed This closes #3088. Signed-off-by: Koji Kawamura <ijokaruma...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/f3f7cdba Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/f3f7cdba Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/f3f7cdba Branch: refs/heads/master Commit: f3f7cdbab9248ba13c00d0e27a398e820e9ec7ed Parents: 5ec8529 Author: Bryan Bende <bbe...@apache.org> Authored: Wed Oct 17 11:28:29 2018 -0400 Committer: Koji Kawamura <ijokaruma...@apache.org> Committed: Thu Oct 18 14:37:55 2018 +0900 ---------------------------------------------------------------------- .../nifi/processors/standard/FetchFile.java | 12 ++++ .../nifi/processors/standard/TestFetchFile.java | 73 ++++++++++++++++++++ 2 files changed, 85 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/f3f7cdba/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFile.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFile.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFile.java index 3bf3f52..b929f07 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFile.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFile.java @@ -236,6 +236,18 @@ public class FetchFile extends AbstractProcessor { return; } + if (!targetDir.exists()) { + try { + Files.createDirectories(targetDir.toPath()); + } catch (Exception e) { + getLogger().error("Could not fetch file {} from file system for {} because Completion Strategy is configured to move the original file to {}, " + + "but that directory does not exist and could not be created due to: {}", + new Object[] {file, flowFile, targetDir, e.getMessage()}, e); + session.transfer(flowFile, REL_FAILURE); + return; + } + } + final String conflictStrategy = context.getProperty(CONFLICT_STRATEGY).getValue(); if (CONFLICT_FAIL.getValue().equalsIgnoreCase(conflictStrategy)) { http://git-wip-us.apache.org/repos/asf/nifi/blob/f3f7cdba/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchFile.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchFile.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchFile.java index cfced0a..dcafefe 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchFile.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchFile.java @@ -42,6 +42,8 @@ public class TestFetchFile { return; } + targetDir.setReadable(true); + for (final File file : targetDir.listFiles()) { Files.delete(file.toPath()); } @@ -110,6 +112,9 @@ public class TestFetchFile { runner.assertValid(); final File destDir = new File("target/move-target"); + destDir.mkdirs(); + assertTrue(destDir.exists()); + final File destFile = new File(destDir, sourceFile.getName()); runner.enqueue(new byte[0]); @@ -135,6 +140,11 @@ public class TestFetchFile { runner.assertValid(); final File destDir = new File("target/move-target"); + if (destDir.exists()) { + destDir.delete(); + } + assertFalse(destDir.exists()); + final File destFile = new File(destDir, sourceFile.getName()); runner.enqueue(new byte[0]); @@ -147,6 +157,69 @@ public class TestFetchFile { } @Test + public void testMoveOnCompleteWithTargetExistsButNotWritable() throws IOException { + final File sourceFile = new File("target/1.txt"); + final byte[] content = "Hello, World!".getBytes(); + Files.write(sourceFile.toPath(), content, StandardOpenOption.CREATE); + + final TestRunner runner = TestRunners.newTestRunner(new FetchFile()); + runner.setProperty(FetchFile.FILENAME, sourceFile.getAbsolutePath()); + runner.setProperty(FetchFile.COMPLETION_STRATEGY, FetchFile.COMPLETION_MOVE.getValue()); + runner.assertNotValid(); + runner.setProperty(FetchFile.MOVE_DESTINATION_DIR, "target/move-target"); + runner.assertValid(); + + final File destDir = new File("target/move-target"); + if (!destDir.exists()) { + destDir.mkdirs(); + } + destDir.setWritable(false); + + assertTrue(destDir.exists()); + assertFalse(destDir.canWrite()); + + final File destFile = new File(destDir, sourceFile.getName()); + + runner.enqueue(new byte[0]); + runner.run(); + runner.assertAllFlowFilesTransferred(FetchFile.REL_FAILURE, 1); + runner.getFlowFilesForRelationship(FetchFile.REL_FAILURE).get(0).assertContentEquals(""); + + assertTrue(sourceFile.exists()); + assertFalse(destFile.exists()); + } + + @Test + public void testMoveOnCompleteWithParentOfTargetDirNotAccessible() throws IOException { + final File sourceFile = new File("target/1.txt"); + final byte[] content = "Hello, World!".getBytes(); + Files.write(sourceFile.toPath(), content, StandardOpenOption.CREATE); + + final String moveTargetParent = "target/fetch-file"; + final String moveTarget = moveTargetParent + "/move-target"; + + final TestRunner runner = TestRunners.newTestRunner(new FetchFile()); + runner.setProperty(FetchFile.FILENAME, sourceFile.getAbsolutePath()); + runner.setProperty(FetchFile.COMPLETION_STRATEGY, FetchFile.COMPLETION_MOVE.getValue()); + runner.assertNotValid(); + runner.setProperty(FetchFile.MOVE_DESTINATION_DIR, moveTarget); + runner.assertValid(); + + // Make the parent of move-target non-writable and non-readable + final File moveTargetParentDir = new File(moveTargetParent); + moveTargetParentDir.mkdirs(); + moveTargetParentDir.setReadable(false); + moveTargetParentDir.setWritable(false); + + runner.enqueue(new byte[0]); + runner.run(); + runner.assertAllFlowFilesTransferred(FetchFile.REL_FAILURE, 1); + runner.getFlowFilesForRelationship(FetchFile.REL_FAILURE).get(0).assertContentEquals(""); + + assertTrue(sourceFile.exists()); + } + + @Test public void testMoveAndReplace() throws IOException { final File sourceFile = new File("target/1.txt"); final byte[] content = "Hello, World!".getBytes();