[ 
https://issues.apache.org/jira/browse/NIFI-2547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15423638#comment-15423638
 ] 

ASF GitHub Bot commented on NIFI-2547:
--------------------------------------

Github user mattyb149 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/850#discussion_r75044228
  
    --- Diff: 
nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/DeleteHDFS.java
 ---
    @@ -0,0 +1,161 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.hadoop;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +
    +import org.apache.hadoop.fs.FileStatus;
    +import org.apache.hadoop.fs.FileSystem;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +
    +import com.google.common.collect.Lists;
    +import com.google.common.collect.Maps;
    +
    +@TriggerWhenEmpty
    +@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED)
    +@Tags({ "hadoop", "HDFS", "delete", "remove", "filesystem" })
    +@CapabilityDescription("Deletes a file from HDFS. The file can be provided 
as an attribute from an incoming FlowFile, "
    +        + "or a statically set file that is periodically removed. If this 
processor has an incoming connection, it"
    +        + "will ignore running on a periodic basis and instead rely on 
incoming FlowFiles to trigger a delete. "
    +        + "Optionally, you may specify use a wildcard character to match 
multiple files or directories.")
    +public class DeleteHDFS extends AbstractHadoopProcessor {
    +    public static final Relationship REL_SUCCESS = new 
Relationship.Builder()
    +            .name("success")
    +            .description("FlowFiles will be routed here if the delete 
command was successful")
    +            .build();
    +
    +    public static final Relationship REL_FAILURE = new 
Relationship.Builder()
    +            .name("failure")
    +            .description("FlowFiles will be routed here if the delete 
command was unsuccessful")
    +            .build();
    +
    +    public static final PropertyDescriptor FILE_OR_DIRECTORY = new 
PropertyDescriptor.Builder()
    +            .name("File or Directory")
    +            .description("The HDFS file or directory to delete. A wildcard 
expression may be used to only delete certain files")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .expressionLanguageSupported(true)
    +            .build();
    +
    +    public static final PropertyDescriptor RECURSIVE = new 
PropertyDescriptor.Builder()
    +            .name("Recursive")
    +            .description("Remove contents of a non-empty directory 
recursively")
    +            .allowableValues("true", "false")
    +            .required(true)
    +            .defaultValue("true")
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    private static final Set<Relationship> relationships;
    +
    +    static {
    +        final Set<Relationship> relationshipSet = new HashSet<>();
    +        relationshipSet.add(REL_SUCCESS);
    +        relationshipSet.add(REL_FAILURE);
    +        relationships = Collections.unmodifiableSet(relationshipSet);
    +    }
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        List<PropertyDescriptor> props = new ArrayList<>(properties);
    +        props.add(FILE_OR_DIRECTORY);
    +        props.add(RECURSIVE);
    +        return props;
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return relationships;
    +    }
    +
    +    @Override
    +    public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
    +        String fileOrDirectoryName = null;
    +        FlowFile flowFile = session.get();
    +
    +        // If this processor has an incoming connection, then do not run 
unless a
    +        // FlowFile is actually sent through
    +        if (flowFile == null && context.hasIncomingConnection()) {
    +            context.yield();
    +            return;
    +        }
    +
    +        if (flowFile != null) {
    +            fileOrDirectoryName = 
context.getProperty(FILE_OR_DIRECTORY).evaluateAttributeExpressions(flowFile).getValue();
    +        } else {
    +            fileOrDirectoryName = 
context.getProperty(FILE_OR_DIRECTORY).evaluateAttributeExpressions().getValue();
    +        }
    +
    +        final FileSystem fileSystem = getFileSystem();
    +        try {
    +            // Check if the user has supplied a file or directory pattern
    +            List<Path> pathList = Lists.newArrayList();
    +            if (fileOrDirectoryName.contains("*")) {
    --- End diff --
    
    Are there other types of glob patterns that could/should be recognized?


> Add DeleteHDFS Processor 
> -------------------------
>
>                 Key: NIFI-2547
>                 URL: https://issues.apache.org/jira/browse/NIFI-2547
>             Project: Apache NiFi
>          Issue Type: New Feature
>            Reporter: Ricky Saltzer
>            Assignee: Ricky Saltzer
>
> There are times where a user may want to remove a file or directory from 
> HDFS. The reasons for this vary, but to provide some context, I currently 
> have a pipeline where I need to periodically delete files that my NiFi 
> pipeline is producing. In my case, it's a "Delete files after they are 7 days 
> old". 
> Currently, I have to use the {{ExecuteStreamCommand}} processor and manually 
> call {{hdfs dfs -rm}}, which is awful when dealing with a large amount of 
> files. For one, an entire JVM is spun up for each delete, and two, when 
> deleting directories with thousands of files, it can sometimes cause the 
> command to hang indefinitely. 
> With that being said, I am proposing we add a {{DeleteHDFS}} processor which 
> meets the following criteria. 
> * Can delete both directories and files
> * Can delete directories recursively
> * Supports the dynamic expression language 
> * Supports using glob paths (e.g. /data/for/2017/08/*)
> * Capable of being a downstream processor as well as a standalone processor



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to