Repository: nifi
Updated Branches:
  refs/heads/master f0dfcc180 -> 5f65b2561


NIFI-3600 Improve logging and relationship routing for failures in DeleteHDFS

realized that the session should be cloned here because its inside a for loop 
and the original flow file would be transferred but not be the latest flow file 
if an error occurred in the for loop

@trixpan at a high level what do you think about this approach?

NIFI-3600: Added unit test

NIFI-3600: Removed the hdfs.error.code attribute
Signed-off-by: Matt Burgess <mattyb...@apache.org>

This closes #1595


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/5f65b256
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/5f65b256
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/5f65b256

Branch: refs/heads/master
Commit: 5f65b2561a25a10289a1a10c5325042ed988b105
Parents: f0dfcc1
Author: Jeremy Dyer <jeremyd...@apache.org>
Authored: Tue Mar 14 14:07:54 2017 -0400
Committer: Matt Burgess <mattyb...@apache.org>
Committed: Fri May 5 15:25:08 2017 -0400

----------------------------------------------------------------------
 .../nifi/processors/hadoop/DeleteHDFS.java      | 50 +++++++++++++++-----
 .../nifi/processors/hadoop/TestDeleteHDFS.java  | 21 ++++++++
 2 files changed, 58 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/5f65b256/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/DeleteHDFS.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/DeleteHDFS.java
 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/DeleteHDFS.java
index cdabc80..197228a 100644
--- 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/DeleteHDFS.java
+++ 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/DeleteHDFS.java
@@ -16,14 +16,26 @@
  */
 package org.apache.nifi.processors.hadoop;
 
-import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.Restricted;
 import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.flowfile.FlowFile;
@@ -33,15 +45,8 @@ import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-import org.apache.nifi.annotation.documentation.SeeAlso;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 
 @TriggerWhenEmpty
 @InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED)
@@ -54,6 +59,11 @@ import org.apache.nifi.annotation.documentation.SeeAlso;
         + " flowfile then provided there are no detected failures it will be 
transferred to success otherwise it will be sent to false. If"
         + " knowledge of globbed files deleted is necessary use ListHDFS first 
to produce a specific list of files to delete. ")
 @Restricted("Provides operator the ability to delete any file that NiFi has 
access to in HDFS or the local filesystem.")
+@WritesAttributes({
+        @WritesAttribute(attribute="hdfs.filename", description="HDFS file to 
be deleted"),
+        @WritesAttribute(attribute="hdfs.path", description="HDFS Path 
specified in the delete request"),
+        @WritesAttribute(attribute="hdfs.error.message", description="HDFS 
error message related to the hdfs.error.code")
+})
 @SeeAlso({ListHDFS.class})
 public class DeleteHDFS extends AbstractHadoopProcessor {
 
@@ -146,8 +156,22 @@ public class DeleteHDFS extends AbstractHadoopProcessor {
 
             for (Path path : pathList) {
                 if (fileSystem.exists(path)) {
-                    fileSystem.delete(path, 
context.getProperty(RECURSIVE).asBoolean());
-                    getLogger().debug("For flowfile {} Deleted file at path {} 
with name {}", new Object[]{originalFlowFile, path.getParent().toString(), 
path.getName()});
+                    try {
+                        fileSystem.delete(path, 
context.getProperty(RECURSIVE).asBoolean());
+                        getLogger().debug("For flowfile {} Deleted file at 
path {} with name {}", new Object[]{originalFlowFile, 
path.getParent().toString(), path.getName()});
+                    } catch (IOException ioe) {
+                        // One possible scenario is that the IOException is 
permissions based, however it would be impractical to check every possible
+                        // external HDFS authorization tool (Ranger, Sentry, 
etc). Local ACLs could be checked but the operation would be expensive.
+                        getLogger().warn("Failed to delete file or directory", 
ioe);
+
+                        Map<String, String> attributes = 
Maps.newHashMapWithExpectedSize(3);
+                        attributes.put("hdfs.filename", path.getName());
+                        attributes.put("hdfs.path", 
path.getParent().toString());
+                        // The error message is helpful in understanding at a 
flowfile level what caused the IOException (which ACL is denying the operation, 
e.g.)
+                        attributes.put("hdfs.error.message", ioe.getMessage());
+
+                        
session.transfer(session.putAllAttributes(session.clone(originalFlowFile), 
attributes), REL_FAILURE);
+                    }
                 }
             }
             if (originalFlowFile != null) {
@@ -161,4 +185,4 @@ public class DeleteHDFS extends AbstractHadoopProcessor {
         }
 
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/5f65b256/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestDeleteHDFS.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestDeleteHDFS.java
 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestDeleteHDFS.java
index be16ac6..b77b71a 100644
--- 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestDeleteHDFS.java
+++ 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestDeleteHDFS.java
@@ -16,6 +16,7 @@
  */
 package org.apache.nifi.processors.hadoop;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.any;
@@ -30,6 +31,7 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.nifi.hadoop.KerberosProperties;
+import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.NiFiProperties;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
@@ -97,6 +99,25 @@ public class TestDeleteHDFS {
     }
 
     @Test
+    public void testPermissionIOException() throws Exception {
+        Path filePath = new Path("/some/path/to/file.txt");
+        when(mockFileSystem.exists(any(Path.class))).thenReturn(true);
+        when(mockFileSystem.delete(any(Path.class), 
any(Boolean.class))).thenThrow(new IOException("Permissions Error"));
+        DeleteHDFS deleteHDFS = new TestableDeleteHDFS(kerberosProperties, 
mockFileSystem);
+        TestRunner runner = TestRunners.newTestRunner(deleteHDFS);
+        runner.setProperty(DeleteHDFS.FILE_OR_DIRECTORY, "${hdfs.file}");
+        Map<String, String> attributes = Maps.newHashMap();
+        attributes.put("hdfs.file", filePath.toString());
+        runner.enqueue("foo", attributes);
+        runner.run();
+        runner.assertTransferCount(DeleteHDFS.REL_FAILURE, 1);
+        MockFlowFile flowFile = 
runner.getFlowFilesForRelationship(DeleteHDFS.REL_FAILURE).get(0);
+        assertEquals("file.txt", flowFile.getAttribute("hdfs.filename"));
+        assertEquals("/some/path/to", flowFile.getAttribute("hdfs.path"));
+        assertEquals("Permissions Error", 
flowFile.getAttribute("hdfs.error.message"));
+    }
+
+    @Test
     public void testNoFlowFilesWithIncomingConnection() throws Exception {
         Path filePath = new Path("${hdfs.file}");
         DeleteHDFS deleteHDFS = new TestableDeleteHDFS(kerberosProperties, 
mockFileSystem);

Reply via email to