Repository: nifi
Updated Branches:
  refs/heads/master 5ec85299e -> f3f7cdbab


NIFI-5719 Ensuring FetchFile routes to failure if the move completion strategy 
can't be completed

This closes #3088.

Signed-off-by: Koji Kawamura <ijokaruma...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/f3f7cdba
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/f3f7cdba
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/f3f7cdba

Branch: refs/heads/master
Commit: f3f7cdbab9248ba13c00d0e27a398e820e9ec7ed
Parents: 5ec8529
Author: Bryan Bende <bbe...@apache.org>
Authored: Wed Oct 17 11:28:29 2018 -0400
Committer: Koji Kawamura <ijokaruma...@apache.org>
Committed: Thu Oct 18 14:37:55 2018 +0900

----------------------------------------------------------------------
 .../nifi/processors/standard/FetchFile.java     | 12 ++++
 .../nifi/processors/standard/TestFetchFile.java | 73 ++++++++++++++++++++
 2 files changed, 85 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/f3f7cdba/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFile.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFile.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFile.java
index 3bf3f52..b929f07 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFile.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FetchFile.java
@@ -236,6 +236,18 @@ public class FetchFile extends AbstractProcessor {
                     return;
                 }
 
+                if (!targetDir.exists()) {
+                    try {
+                        Files.createDirectories(targetDir.toPath());
+                    } catch (Exception e) {
+                        getLogger().error("Could not fetch file {} from file 
system for {} because Completion Strategy is configured to move the original 
file to {}, "
+                                        + "but that directory does not exist 
and could not be created due to: {}",
+                                new Object[] {file, flowFile, targetDir, 
e.getMessage()}, e);
+                        session.transfer(flowFile, REL_FAILURE);
+                        return;
+                    }
+                }
+
                 final String conflictStrategy = 
context.getProperty(CONFLICT_STRATEGY).getValue();
 
                 if 
(CONFLICT_FAIL.getValue().equalsIgnoreCase(conflictStrategy)) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/f3f7cdba/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchFile.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchFile.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchFile.java
index cfced0a..dcafefe 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchFile.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestFetchFile.java
@@ -42,6 +42,8 @@ public class TestFetchFile {
             return;
         }
 
+        targetDir.setReadable(true);
+
         for (final File file : targetDir.listFiles()) {
             Files.delete(file.toPath());
         }
@@ -110,6 +112,9 @@ public class TestFetchFile {
         runner.assertValid();
 
         final File destDir = new File("target/move-target");
+        destDir.mkdirs();
+        assertTrue(destDir.exists());
+
         final File destFile = new File(destDir, sourceFile.getName());
 
         runner.enqueue(new byte[0]);
@@ -135,6 +140,11 @@ public class TestFetchFile {
         runner.assertValid();
 
         final File destDir = new File("target/move-target");
+        if (destDir.exists()) {
+            destDir.delete();
+        }
+        assertFalse(destDir.exists());
+
         final File destFile = new File(destDir, sourceFile.getName());
 
         runner.enqueue(new byte[0]);
@@ -147,6 +157,69 @@ public class TestFetchFile {
     }
 
     @Test
+    public void testMoveOnCompleteWithTargetExistsButNotWritable() throws 
IOException {
+        final File sourceFile = new File("target/1.txt");
+        final byte[] content = "Hello, World!".getBytes();
+        Files.write(sourceFile.toPath(), content, StandardOpenOption.CREATE);
+
+        final TestRunner runner = TestRunners.newTestRunner(new FetchFile());
+        runner.setProperty(FetchFile.FILENAME, sourceFile.getAbsolutePath());
+        runner.setProperty(FetchFile.COMPLETION_STRATEGY, 
FetchFile.COMPLETION_MOVE.getValue());
+        runner.assertNotValid();
+        runner.setProperty(FetchFile.MOVE_DESTINATION_DIR, 
"target/move-target");
+        runner.assertValid();
+
+        final File destDir = new File("target/move-target");
+        if (!destDir.exists()) {
+            destDir.mkdirs();
+        }
+        destDir.setWritable(false);
+
+        assertTrue(destDir.exists());
+        assertFalse(destDir.canWrite());
+
+        final File destFile = new File(destDir, sourceFile.getName());
+
+        runner.enqueue(new byte[0]);
+        runner.run();
+        runner.assertAllFlowFilesTransferred(FetchFile.REL_FAILURE, 1);
+        
runner.getFlowFilesForRelationship(FetchFile.REL_FAILURE).get(0).assertContentEquals("");
+
+        assertTrue(sourceFile.exists());
+        assertFalse(destFile.exists());
+    }
+
+    @Test
+    public void testMoveOnCompleteWithParentOfTargetDirNotAccessible() throws 
IOException {
+        final File sourceFile = new File("target/1.txt");
+        final byte[] content = "Hello, World!".getBytes();
+        Files.write(sourceFile.toPath(), content, StandardOpenOption.CREATE);
+
+        final String moveTargetParent = "target/fetch-file";
+        final String moveTarget = moveTargetParent + "/move-target";
+
+        final TestRunner runner = TestRunners.newTestRunner(new FetchFile());
+        runner.setProperty(FetchFile.FILENAME, sourceFile.getAbsolutePath());
+        runner.setProperty(FetchFile.COMPLETION_STRATEGY, 
FetchFile.COMPLETION_MOVE.getValue());
+        runner.assertNotValid();
+        runner.setProperty(FetchFile.MOVE_DESTINATION_DIR, moveTarget);
+        runner.assertValid();
+
+        // Make the parent of move-target non-writable and non-readable
+        final File moveTargetParentDir = new File(moveTargetParent);
+        moveTargetParentDir.mkdirs();
+        moveTargetParentDir.setReadable(false);
+        moveTargetParentDir.setWritable(false);
+
+        runner.enqueue(new byte[0]);
+        runner.run();
+        runner.assertAllFlowFilesTransferred(FetchFile.REL_FAILURE, 1);
+        
runner.getFlowFilesForRelationship(FetchFile.REL_FAILURE).get(0).assertContentEquals("");
+
+        assertTrue(sourceFile.exists());
+    }
+
+    @Test
     public void testMoveAndReplace() throws IOException {
         final File sourceFile = new File("target/1.txt");
         final byte[] content = "Hello, World!".getBytes();

Reply via email to