NIFI-1024 NIFI-1062 Fixed PutHDFS processor to properly route failures. Ensured that during put failures the FlowFile is routed to 'failure' relationship. Added validation test Re-enabled previously ignored test.
Signed-off-by: Bryan Bende <bbe...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/62e3cfc6 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/62e3cfc6 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/62e3cfc6 Branch: refs/heads/master Commit: 62e3cfc6291d1f30ac92f29cf790336644205919 Parents: 6c510fa Author: Oleg Zhurakousky <o...@suitcase.io> Authored: Tue Nov 10 07:58:28 2015 -0500 Committer: Bryan Bende <bbe...@apache.org> Committed: Wed Nov 11 10:12:30 2015 -0500 ---------------------------------------------------------------------- .../apache/nifi/processors/hadoop/PutHDFS.java | 2 +- .../nifi/processors/hadoop/PutHDFSTest.java | 58 +++++++++++++++++--- 2 files changed, 51 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/62e3cfc6/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java index bedf1b9..4b929bd 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/PutHDFS.java @@ -345,7 +345,7 @@ public class PutHDFS extends AbstractHadoopProcessor { } } getLogger().error("Failed to write to HDFS due to {}", t); - session.rollback(); + session.transfer(flowFile, REL_FAILURE); context.yield(); } } http://git-wip-us.apache.org/repos/asf/nifi/blob/62e3cfc6/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java index 0b5fee8..2eff5c3 100644 --- a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java +++ b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/PutHDFSTest.java @@ -16,30 +16,34 @@ */ package org.apache.nifi.processors.hadoop; -import org.apache.nifi.processors.hadoop.PutHDFS; - import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import java.io.File; import java.io.FileInputStream; import java.io.IOException; import java.util.Collection; import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsAction; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.MockProcessContext; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.compress.CompressionCodec; import org.junit.Assert; -import org.junit.Ignore; import org.junit.Test; public class PutHDFSTest { @@ -154,11 +158,12 @@ public class PutHDFSTest { } // The following only seems to work from cygwin...something about not finding the 'chmod' command. - @Ignore + @Test public void testPutFile() throws IOException { TestRunner runner = TestRunners.newTestRunner(PutHDFS.class); runner.setProperty(PutHDFS.DIRECTORY, "target/test-classes"); runner.setProperty(PutHDFS.CONFLICT_RESOLUTION, "replace"); + runner.setValidateExpressionUsage(false); FileInputStream fis = new FileInputStream("src/test/resources/testdata/randombytes-1"); Map<String, String> attributes = new HashMap<String, String>(); attributes.put(CoreAttributes.FILENAME.key(), "randombytes-1"); @@ -168,6 +173,43 @@ public class PutHDFSTest { Configuration config = new Configuration(); FileSystem fs = FileSystem.get(config); + + List<MockFlowFile> failedFlowFiles = runner + .getFlowFilesForRelationship(new Relationship.Builder().name("failure").build()); + assertTrue(failedFlowFiles.isEmpty()); + assertTrue(fs.exists(new Path("target/test-classes/randombytes-1"))); } + + @Test + public void testPutFileWithException() throws IOException { + String dirName = "target/testPutFileWrongPermissions"; + File file = new File(dirName); + file.mkdirs(); + Configuration config = new Configuration(); + FileSystem fs = FileSystem.get(config); + Path p = new Path(dirName).makeQualified(fs.getUri(), fs.getWorkingDirectory()); + + // modify permissions to ensure no one can write to this directory, + // forcing IOException downstream + fs.setPermission(p, new FsPermission(FsAction.READ, FsAction.READ, FsAction.READ)); + + TestRunner runner = TestRunners.newTestRunner(PutHDFS.class); + runner.setProperty(PutHDFS.DIRECTORY, dirName); + runner.setProperty(PutHDFS.CONFLICT_RESOLUTION, "replace"); + runner.setValidateExpressionUsage(false); + FileInputStream fis = new FileInputStream("src/test/resources/testdata/randombytes-1"); + Map<String, String> attributes = new HashMap<String, String>(); + attributes.put(CoreAttributes.FILENAME.key(), "randombytes-1"); + runner.enqueue(fis, attributes); + runner.run(); + fis.close(); + + List<MockFlowFile> failedFlowFiles = runner + .getFlowFilesForRelationship(new Relationship.Builder().name("failure").build()); + assertFalse(failedFlowFiles.isEmpty()); + + fs.setPermission(p, new FsPermission(FsAction.EXECUTE, FsAction.EXECUTE, FsAction.EXECUTE)); + fs.delete(p, true); + } }