NIFI-1024 NIFI-1062 Fixed PutHDFS processor to properly route failures. Ensured 
that during put failures the FlowFile is routed to 'failure' relationship. 
Added validation test Re-enabled previously ignored test.

Signed-off-by: Bryan Bende <bbe...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/62e3cfc6
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/62e3cfc6
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/62e3cfc6

Branch: refs/heads/master
Commit: 62e3cfc6291d1f30ac92f29cf790336644205919
Parents: 6c510fa
Author: Oleg Zhurakousky <o...@suitcase.io>
Authored: Tue Nov 10 07:58:28 2015 -0500
Committer: Bryan Bende <bbe...@apache.org>
Committed: Wed Nov 11 10:12:30 2015 -0500

----------------------------------------------------------------------
 .../apache/nifi/processors/hadoop/PutHDFS.java  |  2 +-
 .../nifi/processors/hadoop/PutHDFSTest.java     | 58 +++++++++++++++++---
 2 files changed, 51 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/62e3cfc6/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
index bedf1b9..4b929bd 100644
--- 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
+++ 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java
@@ -345,7 +345,7 @@ public class PutHDFS extends AbstractHadoopProcessor {
                 }
             }
             getLogger().error("Failed to write to HDFS due to {}", t);
-            session.rollback();
+            session.transfer(flowFile, REL_FAILURE);
             context.yield();
         }
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/62e3cfc6/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java
 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java
index 0b5fee8..2eff5c3 100644
--- 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java
+++ 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java
@@ -16,30 +16,34 @@
  */
 package org.apache.nifi.processors.hadoop;
 
-import org.apache.nifi.processors.hadoop.PutHDFS;
-
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
+import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
 import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.MockProcessContext;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.compress.CompressionCodec;
 import org.junit.Assert;
-import org.junit.Ignore;
 import org.junit.Test;
 
 public class PutHDFSTest {
@@ -154,11 +158,12 @@ public class PutHDFSTest {
     }
 
     // The following only seems to work from cygwin...something about not 
finding the 'chmod' command.
-    @Ignore
+    @Test
     public void testPutFile() throws IOException {
         TestRunner runner = TestRunners.newTestRunner(PutHDFS.class);
         runner.setProperty(PutHDFS.DIRECTORY, "target/test-classes");
         runner.setProperty(PutHDFS.CONFLICT_RESOLUTION, "replace");
+        runner.setValidateExpressionUsage(false);
         FileInputStream fis = new 
FileInputStream("src/test/resources/testdata/randombytes-1");
         Map<String, String> attributes = new HashMap<String, String>();
         attributes.put(CoreAttributes.FILENAME.key(), "randombytes-1");
@@ -168,6 +173,43 @@ public class PutHDFSTest {
 
         Configuration config = new Configuration();
         FileSystem fs = FileSystem.get(config);
+
+        List<MockFlowFile> failedFlowFiles = runner
+                .getFlowFilesForRelationship(new 
Relationship.Builder().name("failure").build());
+        assertTrue(failedFlowFiles.isEmpty());
+
         assertTrue(fs.exists(new Path("target/test-classes/randombytes-1")));
     }
+
+    @Test
+    public void testPutFileWithException() throws IOException {
+        String dirName = "target/testPutFileWrongPermissions";
+        File file = new File(dirName);
+        file.mkdirs();
+        Configuration config = new Configuration();
+        FileSystem fs = FileSystem.get(config);
+        Path p = new Path(dirName).makeQualified(fs.getUri(), 
fs.getWorkingDirectory());
+
+        // modify permissions to ensure no one can write to this directory,
+        // forcing IOException downstream
+        fs.setPermission(p, new FsPermission(FsAction.READ, FsAction.READ, 
FsAction.READ));
+
+        TestRunner runner = TestRunners.newTestRunner(PutHDFS.class);
+        runner.setProperty(PutHDFS.DIRECTORY, dirName);
+        runner.setProperty(PutHDFS.CONFLICT_RESOLUTION, "replace");
+        runner.setValidateExpressionUsage(false);
+        FileInputStream fis = new 
FileInputStream("src/test/resources/testdata/randombytes-1");
+        Map<String, String> attributes = new HashMap<String, String>();
+        attributes.put(CoreAttributes.FILENAME.key(), "randombytes-1");
+        runner.enqueue(fis, attributes);
+        runner.run();
+        fis.close();
+
+        List<MockFlowFile> failedFlowFiles = runner
+                .getFlowFilesForRelationship(new 
Relationship.Builder().name("failure").build());
+        assertFalse(failedFlowFiles.isEmpty());
+
+        fs.setPermission(p, new FsPermission(FsAction.EXECUTE, 
FsAction.EXECUTE, FsAction.EXECUTE));
+        fs.delete(p, true);
+    }
 }

Reply via email to