[ https://issues.apache.org/jira/browse/NIFI-2547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15423638#comment-15423638 ]
ASF GitHub Bot commented on NIFI-2547: -------------------------------------- Github user mattyb149 commented on a diff in the pull request: https://github.com/apache/nifi/pull/850#discussion_r75044228 --- Diff: nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/DeleteHDFS.java --- @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.hadoop; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.TriggerWhenEmpty; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + +@TriggerWhenEmpty +@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED) +@Tags({ "hadoop", "HDFS", "delete", "remove", "filesystem" }) +@CapabilityDescription("Deletes a file from HDFS. The file can be provided as an attribute from an incoming FlowFile, " + + "or a statically set file that is periodically removed. If this processor has an incoming connection, it" + + "will ignore running on a periodic basis and instead rely on incoming FlowFiles to trigger a delete. " + + "Optionally, you may specify use a wildcard character to match multiple files or directories.") +public class DeleteHDFS extends AbstractHadoopProcessor { + public static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("FlowFiles will be routed here if the delete command was successful") + .build(); + + public static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("FlowFiles will be routed here if the delete command was unsuccessful") + .build(); + + public static final PropertyDescriptor FILE_OR_DIRECTORY = new PropertyDescriptor.Builder() + .name("File or Directory") + .description("The HDFS file or directory to delete. A wildcard expression may be used to only delete certain files") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) + .build(); + + public static final PropertyDescriptor RECURSIVE = new PropertyDescriptor.Builder() + .name("Recursive") + .description("Remove contents of a non-empty directory recursively") + .allowableValues("true", "false") + .required(true) + .defaultValue("true") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + private static final Set<Relationship> relationships; + + static { + final Set<Relationship> relationshipSet = new HashSet<>(); + relationshipSet.add(REL_SUCCESS); + relationshipSet.add(REL_FAILURE); + relationships = Collections.unmodifiableSet(relationshipSet); + } + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + List<PropertyDescriptor> props = new ArrayList<>(properties); + props.add(FILE_OR_DIRECTORY); + props.add(RECURSIVE); + return props; + } + + @Override + public Set<Relationship> getRelationships() { + return relationships; + } + + @Override + public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + String fileOrDirectoryName = null; + FlowFile flowFile = session.get(); + + // If this processor has an incoming connection, then do not run unless a + // FlowFile is actually sent through + if (flowFile == null && context.hasIncomingConnection()) { + context.yield(); + return; + } + + if (flowFile != null) { + fileOrDirectoryName = context.getProperty(FILE_OR_DIRECTORY).evaluateAttributeExpressions(flowFile).getValue(); + } else { + fileOrDirectoryName = context.getProperty(FILE_OR_DIRECTORY).evaluateAttributeExpressions().getValue(); + } + + final FileSystem fileSystem = getFileSystem(); + try { + // Check if the user has supplied a file or directory pattern + List<Path> pathList = Lists.newArrayList(); + if (fileOrDirectoryName.contains("*")) { --- End diff -- Are there other types of glob patterns that could/should be recognized? > Add DeleteHDFS Processor > ------------------------- > > Key: NIFI-2547 > URL: https://issues.apache.org/jira/browse/NIFI-2547 > Project: Apache NiFi > Issue Type: New Feature > Reporter: Ricky Saltzer > Assignee: Ricky Saltzer > > There are times where a user may want to remove a file or directory from > HDFS. The reasons for this vary, but to provide some context, I currently > have a pipeline where I need to periodically delete files that my NiFi > pipeline is producing. In my case, it's a "Delete files after they are 7 days > old". > Currently, I have to use the {{ExecuteStreamCommand}} processor and manually > call {{hdfs dfs -rm}}, which is awful when dealing with a large amount of > files. For one, an entire JVM is spun up for each delete, and two, when > deleting directories with thousands of files, it can sometimes cause the > command to hang indefinitely. > With that being said, I am proposing we add a {{DeleteHDFS}} processor which > meets the following criteria. > * Can delete both directories and files > * Can delete directories recursively > * Supports the dynamic expression language > * Supports using glob paths (e.g. /data/for/2017/08/*) > * Capable of being a downstream processor as well as a standalone processor -- This message was sent by Atlassian JIRA (v6.3.4#6332)