Repository: nifi
Updated Branches:
  refs/heads/master a91984446 -> b7dc21bd9


NIFI-2547: Add DeleteHDFS Processor

This processor adds the capability to delete files or
directories inside of HDFS.

Paths supports both static and expression language values,
as well as glob support (e.g. /data/for/2016/07/*).

This processor may be used standalone, as well as part of a
downstream connection.

Signed-off-by: Matt Burgess <[email protected]>

Add Glob Matcher with Tests

Also set displayName on properties.

Signed-off-by: Matt Burgess <[email protected]>

This closes #850


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/26d362b1
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/26d362b1
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/26d362b1

Branch: refs/heads/master
Commit: 26d362b144e15ea4a224e346c340d74e978c134c
Parents: a919844
Author: ricky <[email protected]>
Authored: Wed Aug 10 19:14:39 2016 -0400
Committer: Matt Burgess <[email protected]>
Committed: Sun Aug 21 10:10:21 2016 -0400

----------------------------------------------------------------------
 .../nifi/processors/hadoop/DeleteHDFS.java      | 168 ++++++++++++++++
 .../org.apache.nifi.processor.Processor         |   1 +
 .../nifi/processors/hadoop/TestDeleteHDFS.java  | 198 +++++++++++++++++++
 3 files changed, 367 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/26d362b1/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/DeleteHDFS.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/DeleteHDFS.java
 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/DeleteHDFS.java
new file mode 100644
index 0000000..371b9e1
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/DeleteHDFS.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hadoop;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+@TriggerWhenEmpty
+@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED)
+@Tags({ "hadoop", "HDFS", "delete", "remove", "filesystem" })
+@CapabilityDescription("Deletes a file from HDFS. The file can be provided as 
an attribute from an incoming FlowFile, "
+        + "or a statically set file that is periodically removed. If this 
processor has an incoming connection, it"
+        + "will ignore running on a periodic basis and instead rely on 
incoming FlowFiles to trigger a delete. "
+        + "Optionally, you may specify use a wildcard character to match 
multiple files or directories.")
+public class DeleteHDFS extends AbstractHadoopProcessor {
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("FlowFiles will be routed here if the delete command 
was successful")
+            .build();
+
+    public static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("FlowFiles will be routed here if the delete command 
was unsuccessful")
+            .build();
+
+    public static final PropertyDescriptor FILE_OR_DIRECTORY = new 
PropertyDescriptor.Builder()
+            .name("file_or_directory")
+            .displayName("File or Directory")
+            .description("The HDFS file or directory to delete. A wildcard 
expression may be used to only delete certain files")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(true)
+            .build();
+
+    public static final PropertyDescriptor RECURSIVE = new 
PropertyDescriptor.Builder()
+            .name("recursive")
+            .displayName("Recursive")
+            .description("Remove contents of a non-empty directory 
recursively")
+            .allowableValues("true", "false")
+            .required(true)
+            .defaultValue("true")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    protected final Pattern GLOB_PATTERN = 
Pattern.compile("\\[|\\]|\\*|\\?|\\^|\\{|\\}|\\\\c");
+    protected final Matcher GLOB_MATCHER = GLOB_PATTERN.matcher("");
+
+    private static final Set<Relationship> relationships;
+
+    static {
+        final Set<Relationship> relationshipSet = new HashSet<>();
+        relationshipSet.add(REL_SUCCESS);
+        relationshipSet.add(REL_FAILURE);
+        relationships = Collections.unmodifiableSet(relationshipSet);
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        List<PropertyDescriptor> props = new ArrayList<>(properties);
+        props.add(FILE_OR_DIRECTORY);
+        props.add(RECURSIVE);
+        return props;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
+        String fileOrDirectoryName = null;
+        FlowFile flowFile = session.get();
+
+        // If this processor has an incoming connection, then do not run 
unless a
+        // FlowFile is actually sent through
+        if (flowFile == null && context.hasIncomingConnection()) {
+            context.yield();
+            return;
+        }
+
+        if (flowFile != null) {
+            fileOrDirectoryName = 
context.getProperty(FILE_OR_DIRECTORY).evaluateAttributeExpressions(flowFile).getValue();
+        } else {
+            fileOrDirectoryName = 
context.getProperty(FILE_OR_DIRECTORY).evaluateAttributeExpressions().getValue();
+        }
+
+        final FileSystem fileSystem = getFileSystem();
+        try {
+            // Check if the user has supplied a file or directory pattern
+            List<Path> pathList = Lists.newArrayList();
+            if (GLOB_MATCHER.reset(fileOrDirectoryName).find()) {
+                FileStatus[] fileStatuses = fileSystem.globStatus(new 
Path(fileOrDirectoryName));
+                if (fileStatuses != null) {
+                    for (FileStatus fileStatus : fileStatuses) {
+                        pathList.add(fileStatus.getPath());
+                    }
+                }
+            } else {
+                pathList.add(new Path(fileOrDirectoryName));
+            }
+
+            Map<String, String> attributes = 
Maps.newHashMapWithExpectedSize(2);
+            for (Path path : pathList) {
+                attributes.put("filename", path.getName());
+                attributes.put("path", path.getParent().toString());
+                if (fileSystem.exists(path)) {
+                    fileSystem.delete(path, 
context.getProperty(RECURSIVE).asBoolean());
+                    if (!context.hasIncomingConnection()) {
+                        flowFile = session.create();
+                    }
+                    session.transfer(session.putAllAttributes(flowFile, 
attributes), REL_SUCCESS);
+                } else {
+                    getLogger().warn("File (" + path + ") does not exist");
+                    if (!context.hasIncomingConnection()) {
+                        flowFile = session.create();
+                    }
+                    session.transfer(session.putAllAttributes(flowFile, 
attributes), REL_FAILURE);
+                }
+            }
+        } catch (IOException e) {
+            getLogger().warn("Error processing delete for file or directory", 
e);
+            if (flowFile != null) {
+                session.rollback(true);
+            }
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/26d362b1/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index ef81091..165ec2c 100644
--- 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -19,3 +19,4 @@ org.apache.nifi.processors.hadoop.GetHDFSSequenceFile
 org.apache.nifi.processors.hadoop.inotify.GetHDFSEvents
 org.apache.nifi.processors.hadoop.ListHDFS
 org.apache.nifi.processors.hadoop.PutHDFS
+org.apache.nifi.processors.hadoop.DeleteHDFS

http://git-wip-us.apache.org/repos/asf/nifi/blob/26d362b1/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestDeleteHDFS.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestDeleteHDFS.java
 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestDeleteHDFS.java
new file mode 100644
index 0000000..89e3be3
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestDeleteHDFS.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.hadoop;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Matcher;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.hadoop.KerberosProperties;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.NiFiProperties;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Before;
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+import com.google.common.collect.Maps;
+
+public class TestDeleteHDFS {
+    private NiFiProperties mockNiFiProperties;
+    private FileSystem mockFileSystem;
+    private KerberosProperties kerberosProperties;
+
+    @Before
+    public void setup() throws Exception {
+        mockNiFiProperties = mock(NiFiProperties.class);
+        
when(mockNiFiProperties.getKerberosConfigurationFile()).thenReturn(null);
+        kerberosProperties = KerberosProperties.create(mockNiFiProperties);
+        mockFileSystem = mock(FileSystem.class);
+    }
+
+    @Test
+    public void testSuccessfulDelete() throws Exception {
+        Path filePath = new Path("/some/path/to/file.txt");
+        when(mockFileSystem.exists(any(Path.class))).thenReturn(true);
+        DeleteHDFS deleteHDFS = new TestableDeleteHDFS(kerberosProperties, 
mockFileSystem);
+        TestRunner runner = TestRunners.newTestRunner(deleteHDFS);
+        runner.setIncomingConnection(false);
+        runner.assertNotValid();
+        runner.setProperty(DeleteHDFS.FILE_OR_DIRECTORY, filePath.toString());
+        runner.assertValid();
+        runner.run();
+        runner.assertAllFlowFilesTransferred(DeleteHDFS.REL_SUCCESS);
+        runner.assertTransferCount(DeleteHDFS.REL_SUCCESS, 1);
+        FlowFile flowFile = 
runner.getFlowFilesForRelationship(DeleteHDFS.REL_SUCCESS).get(0);
+        assertEquals(filePath.getName(), flowFile.getAttribute("filename"));
+        assertEquals(filePath.getParent().toString(), 
flowFile.getAttribute("path"));
+    }
+
+    @Test
+    public void testDeleteFromIncomingFlowFile() throws Exception {
+        Path filePath = new Path("/some/path/to/file.txt");
+        when(mockFileSystem.exists(any(Path.class))).thenReturn(true);
+        DeleteHDFS deleteHDFS = new TestableDeleteHDFS(kerberosProperties, 
mockFileSystem);
+        TestRunner runner = TestRunners.newTestRunner(deleteHDFS);
+        runner.setProperty(DeleteHDFS.FILE_OR_DIRECTORY, "${hdfs.file}");
+        Map<String, String> attributes = Maps.newHashMap();
+        attributes.put("hdfs.file", filePath.toString());
+        runner.enqueue("foo", attributes);
+        runner.run();
+        runner.assertAllFlowFilesTransferred(DeleteHDFS.REL_SUCCESS);
+        runner.assertTransferCount(DeleteHDFS.REL_SUCCESS, 1);
+        FlowFile flowFile = 
runner.getFlowFilesForRelationship(DeleteHDFS.REL_SUCCESS).get(0);
+        assertEquals(filePath.getName(), flowFile.getAttribute("filename"));
+        assertEquals(filePath.getParent().toString(), 
flowFile.getAttribute("path"));
+    }
+
+    @Test
+    public void testIOException() throws Exception {
+        Path filePath = new Path("/some/path/to/file.txt");
+        when(mockFileSystem.exists(any(Path.class))).thenThrow(new 
IOException());
+        DeleteHDFS deleteHDFS = new TestableDeleteHDFS(kerberosProperties, 
mockFileSystem);
+        TestRunner runner = TestRunners.newTestRunner(deleteHDFS);
+        runner.setProperty(DeleteHDFS.FILE_OR_DIRECTORY, "${hdfs.file}");
+        Map<String, String> attributes = Maps.newHashMap();
+        attributes.put("hdfs.file", filePath.toString());
+        runner.enqueue("foo", attributes);
+        runner.run();
+        runner.assertQueueNotEmpty();
+        runner.assertPenalizeCount(1);
+        assertEquals(1, runner.getQueueSize().getObjectCount());
+    }
+
+    @Test
+    public void testNoFlowFilesWithIncomingConnection() throws Exception {
+        Path filePath = new Path("${hdfs.file}");
+        DeleteHDFS deleteHDFS = new TestableDeleteHDFS(kerberosProperties, 
mockFileSystem);
+        TestRunner runner = TestRunners.newTestRunner(deleteHDFS);
+        runner.setProperty(DeleteHDFS.FILE_OR_DIRECTORY, filePath.toString());
+        runner.setIncomingConnection(true);
+        runner.run();
+        runner.assertQueueEmpty();
+        runner.assertTransferCount(DeleteHDFS.REL_SUCCESS, 0);
+        runner.assertTransferCount(DeleteHDFS.REL_FAILURE, 0);
+    }
+
+    @Test
+    public void testUnsuccessfulDelete() throws Exception {
+        Path filePath = new Path("/some/path/to/file.txt");
+        when(mockFileSystem.exists(any(Path.class))).thenReturn(false);
+        DeleteHDFS deleteHDFS = new TestableDeleteHDFS(kerberosProperties, 
mockFileSystem);
+        TestRunner runner = TestRunners.newTestRunner(deleteHDFS);
+        runner.setIncomingConnection(false);
+        runner.assertNotValid();
+        runner.setProperty(DeleteHDFS.FILE_OR_DIRECTORY, filePath.toString());
+        runner.assertValid();
+        runner.run();
+        runner.assertAllFlowFilesTransferred(DeleteHDFS.REL_FAILURE);
+        runner.assertTransferCount(DeleteHDFS.REL_FAILURE, 1);
+        FlowFile flowFile = 
runner.getFlowFilesForRelationship(DeleteHDFS.REL_FAILURE).get(0);
+        assertEquals(filePath.getName(), flowFile.getAttribute("filename"));
+        assertEquals(filePath.getParent().toString(), 
flowFile.getAttribute("path"));
+    }
+
+    @Test
+    public void testGlobDelete() throws Exception {
+        Path glob = new Path("/data/for/2017/08/05/*");
+        int fileCount = 300;
+        FileStatus[] fileStatuses = new FileStatus[fileCount];
+        for (int i = 0; i < fileCount; i++) {
+            Path file = new Path("/data/for/2017/08/05/file" + i);
+            FileStatus fileStatus = mock(FileStatus.class);
+            when(fileStatus.getPath()).thenReturn(file);
+            fileStatuses[i] = fileStatus;
+        }
+        when(mockFileSystem.exists(any(Path.class))).thenReturn(true);
+        
when(mockFileSystem.globStatus(any(Path.class))).thenReturn(fileStatuses);
+        DeleteHDFS deleteHDFS = new TestableDeleteHDFS(kerberosProperties, 
mockFileSystem);
+        TestRunner runner = TestRunners.newTestRunner(deleteHDFS);
+        runner.setIncomingConnection(false);
+        runner.assertNotValid();
+        runner.setProperty(DeleteHDFS.FILE_OR_DIRECTORY, glob.toString());
+        runner.assertValid();
+        runner.run();
+        runner.assertAllFlowFilesTransferred(DeleteHDFS.REL_SUCCESS);
+        runner.assertTransferCount(DeleteHDFS.REL_SUCCESS, fileCount);
+        List<MockFlowFile> flowFiles = 
runner.getFlowFilesForRelationship(DeleteHDFS.REL_SUCCESS);
+        for (int i = 0; i < fileCount; i++) {
+            FlowFile flowFile = flowFiles.get(i);
+            assertEquals("file" + i, flowFile.getAttribute("filename"));
+            assertEquals("/data/for/2017/08/05", 
flowFile.getAttribute("path"));
+        }
+    }
+
+    private static class TestableDeleteHDFS extends DeleteHDFS {
+        private KerberosProperties testKerberosProperties;
+        private FileSystem mockFileSystem;
+
+        public TestableDeleteHDFS(KerberosProperties kerberosProperties, 
FileSystem mockFileSystem) {
+            this.testKerberosProperties = kerberosProperties;
+            this.mockFileSystem = mockFileSystem;
+        }
+
+        @Override
+        protected KerberosProperties getKerberosProperties() {
+            return testKerberosProperties;
+        }
+
+        @Override
+        protected FileSystem getFileSystem() {
+            return mockFileSystem;
+        }
+    }
+
+    @Test
+    public void testGlobMatcher() throws Exception {
+        DeleteHDFS deleteHDFS = new DeleteHDFS();
+        assertTrue(deleteHDFS.GLOB_MATCHER.reset("/data/for/08/09/*").find());
+        
assertTrue(deleteHDFS.GLOB_MATCHER.reset("/data/for/08/09/[01-04]").find());
+        assertTrue(deleteHDFS.GLOB_MATCHER.reset("/data/for/0?/09/").find());
+        assertFalse(deleteHDFS.GLOB_MATCHER.reset("/data/for/08/09").find());
+    }
+}

Reply via email to