This is an automated email from the ASF dual-hosted git repository.

ijokarumawak pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new 8d74822  NIFI-6275 ListHDFS now ignores scheme and authority when uses 
"Full Path" filter mode Updated description for "Full Path" filter mode to 
state that it will ignore scheme and authority Added tests to TestListHDFS for 
listing an empty and nonexistent dirs Updated TestListHDFS' mock file system to 
track state properly when FileStatus instances are added, and updated 
listStatus to work properly with the underlying Map that contains FileStatus 
instances Updated ListHDFS' addi [...]
8d74822 is described below

commit 8d748223ff8f80c7a85fc38013ecf0b221adc2da
Author: Jeff Storck <jtsw...@gmail.com>
AuthorDate: Mon May 13 16:26:36 2019 -0400

    NIFI-6275 ListHDFS now ignores scheme and authority when uses "Full Path" 
filter mode
    Updated description for "Full Path" filter mode to state that it will 
ignore scheme and authority
    Added tests to TestListHDFS for listing an empty and nonexistent dirs
    Updated TestListHDFS' mock file system to track state properly when 
FileStatus instances are added, and updated listStatus to work properly with 
the underlying Map that contains FileStatus instances
    Updated ListHDFS' additional details to document "Full Path" filter mode 
ignoring scheme and authority, with an example
    Updated TestRunners, StandardProcessorTestRunner, 
MockProcessorInitializationContext to support passing in a logger.
    
    NIFI-6275 Updated the "Full Path" filter mode to check the full path of a 
file with and without its scheme and authority against the filter regex
    Added additional documentation for how ListHDFS handles scheme and 
authority when "Full Path" filter mode is used
    Added test case for "Full Path" filter mode with a regular expression that 
includes scheme and authority
    
    This closes #3483.
    
    Signed-off-by: Koji Kawamura <ijokaruma...@apache.org>
---
 .../util/MockProcessorInitializationContext.java   |   6 +-
 .../nifi/util/StandardProcessorTestRunner.java     |   8 +-
 .../java/org/apache/nifi/util/TestRunners.java     |  50 +++++
 .../apache/nifi/processors/hadoop/ListHDFS.java    |  35 ++-
 .../additionalDetails.html                         |  11 +-
 .../nifi/processors/hadoop/TestListHDFS.java       | 243 +++++++++++++++++++--
 6 files changed, 316 insertions(+), 37 deletions(-)

diff --git 
a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessorInitializationContext.java
 
b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessorInitializationContext.java
index e44e731..6a26371 100644
--- 
a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessorInitializationContext.java
+++ 
b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessorInitializationContext.java
@@ -33,8 +33,12 @@ public class MockProcessorInitializationContext implements 
ProcessorInitializati
     private final MockProcessContext context;
 
     public MockProcessorInitializationContext(final Processor processor, final 
MockProcessContext context) {
+        this(processor, context, null);
+    }
+
+    public MockProcessorInitializationContext(final Processor processor, final 
MockProcessContext context, final MockComponentLog logger) {
         processorId = UUID.randomUUID().toString();
-        logger = new MockComponentLog(processorId, processor);
+        this.logger = logger == null ? new MockComponentLog(processorId, 
processor) : logger;
         this.context = context;
     }
 
diff --git 
a/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java 
b/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
index 89af696..d995f8e 100644
--- 
a/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
+++ 
b/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
@@ -99,6 +99,10 @@ public class StandardProcessorTestRunner implements 
TestRunner {
     }
 
     StandardProcessorTestRunner(final Processor processor, String 
processorName) {
+        this(processor, processorName, null);
+    }
+
+    StandardProcessorTestRunner(final Processor processor, String 
processorName, MockComponentLog logger) {
         this.processor = processor;
         this.idGenerator = new AtomicLong(0L);
         this.sharedState = new SharedSessionState(processor, idGenerator);
@@ -108,9 +112,9 @@ public class StandardProcessorTestRunner implements 
TestRunner {
         this.variableRegistry = new MockVariableRegistry();
         this.context = new MockProcessContext(processor, processorName, 
processorStateManager, variableRegistry);
 
-        final MockProcessorInitializationContext mockInitContext = new 
MockProcessorInitializationContext(processor, context);
+        final MockProcessorInitializationContext mockInitContext = new 
MockProcessorInitializationContext(processor, context, logger);
         processor.initialize(mockInitContext);
-        logger =  mockInitContext.getLogger();
+        this.logger =  mockInitContext.getLogger();
 
         try {
             ReflectionUtils.invokeMethodsWithAnnotation(OnAdded.class, 
processor);
diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/TestRunners.java 
b/nifi-mock/src/main/java/org/apache/nifi/util/TestRunners.java
index cff907c..9792473 100644
--- a/nifi-mock/src/main/java/org/apache/nifi/util/TestRunners.java
+++ b/nifi-mock/src/main/java/org/apache/nifi/util/TestRunners.java
@@ -32,6 +32,17 @@ public class TestRunners {
 
     /**
      * Returns a {@code TestRunner} for the given {@code Processor}.
+     * The processor name available from {@code 
TestRunner.getProcessContext().getName()} will have the default name of {@code 
processor.getClass().getName()}
+     * @param processor the {@code Processor} under test
+     * @param logger the {@code ComponentLog} used for logging
+     * @return a {@code TestRunner}
+     */
+    public static TestRunner newTestRunner(final Processor processor, 
MockComponentLog logger) {
+        return newTestRunner(processor,processor.getClass().getName(), logger);
+    }
+
+    /**
+     * Returns a {@code TestRunner} for the given {@code Processor}.
      * The processor name available from {@code 
TestRunner.getProcessContext().getName()} will be the passed name.
      * @param processor the {@code Processor} under test
      * @param name the name to give the {@code Processor}
@@ -42,6 +53,18 @@ public class TestRunners {
     }
 
     /**
+     * Returns a {@code TestRunner} for the given {@code Processor}.
+     * The processor name available from {@code 
TestRunner.getProcessContext().getName()} will be the passed name.
+     * @param processor the {@code Processor} under test
+     * @param name the name to give the {@code Processor}
+     * @param logger the {@code ComponentLog} used for logging
+     * @return a {@code TestRunner}
+     */
+    public static TestRunner newTestRunner(final Processor processor, String 
name, MockComponentLog logger) {
+        return new StandardProcessorTestRunner(processor, name, logger);
+    }
+
+    /**
      * Returns a {@code TestRunner} for the given {@code Processor} class.
      * The processor name available from {@code 
TestRunner.getProcessContext().getName()} will have the default name of {@code 
processor.getClass().getName()}
      * @param processorClass the {@code Processor} class
@@ -55,6 +78,17 @@ public class TestRunners {
      * Returns a {@code TestRunner} for the given {@code Processor} class.
      * The processor name available from {@code 
TestRunner.getProcessContext().getName()} will have the default name of {@code 
processor.getClass().getName()}
      * @param processorClass the {@code Processor} class
+     * @param logger the {@code ComponentLog} used for logging
+     * @return a {@code TestRunner}
+     */
+    public static TestRunner newTestRunner(final Class<? extends Processor> 
processorClass, MockComponentLog logger) {
+        return newTestRunner(processorClass, processorClass.getName(), logger);
+    }
+
+    /**
+     * Returns a {@code TestRunner} for the given {@code Processor} class.
+     * The processor name available from {@code 
TestRunner.getProcessContext().getName()} will have the default name of {@code 
processor.getClass().getName()}
+     * @param processorClass the {@code Processor} class
      * @param name the name to give the {@code Processor}
      * @return a {@code TestRunner}
      */
@@ -67,4 +101,20 @@ public class TestRunners {
         }
     }
 
+    /**
+     * Returns a {@code TestRunner} for the given {@code Processor} class.
+     * The processor name available from {@code 
TestRunner.getProcessContext().getName()} will have the default name of {@code 
processor.getClass().getName()}
+     * @param processorClass the {@code Processor} class
+     * @param name the name to give the {@code Processor}
+     * @param logger the {@code ComponentLog} used for logging
+     * @return a {@code TestRunner}
+     */
+    public static TestRunner newTestRunner(final Class<? extends Processor> 
processorClass, String name, MockComponentLog logger) {
+        try {
+            return newTestRunner(processorClass.newInstance(), name, logger);
+        } catch (final Exception e) {
+            System.err.println("Could not instantiate instance of class " + 
processorClass.getName() + " due to: " + e);
+            throw new RuntimeException(e);
+        }
+    }
 }
diff --git 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java
 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java
index 15ed4b1..31f582f 100644
--- 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java
+++ 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java
@@ -16,6 +16,7 @@
  */
 package org.apache.nifi.processors.hadoop;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -126,24 +127,25 @@ public class ListHDFS extends AbstractHadoopProcessor {
     private static final String FILTER_MODE_FULL_PATH = 
"filter-mode-full-path";
     static final AllowableValue FILTER_DIRECTORIES_AND_FILES_VALUE = new 
AllowableValue(FILTER_MODE_DIRECTORIES_AND_FILES,
         "Directories and Files",
-        "Filtering will be applied to the names of directories and files.  If 
" + RECURSE_SUBDIRS.getName()
+        "Filtering will be applied to the names of directories and files.  If 
" + RECURSE_SUBDIRS.getDisplayName()
                 + " is set to true, only subdirectories with a matching name 
will be searched for files that match "
-                + "the regular expression defined in " + FILE_FILTER.getName() 
+ ".");
+                + "the regular expression defined in " + 
FILE_FILTER.getDisplayName() + ".");
     static final AllowableValue FILTER_FILES_ONLY_VALUE = new 
AllowableValue(FILTER_MODE_FILES_ONLY,
         "Files Only",
-        "Filtering will only be applied to the names of files.  If " + 
RECURSE_SUBDIRS.getName()
+        "Filtering will only be applied to the names of files.  If " + 
RECURSE_SUBDIRS.getDisplayName()
                 + " is set to true, the entire subdirectory tree will be 
searched for files that match "
-                + "the regular expression defined in " + FILE_FILTER.getName() 
+ ".");
+                + "the regular expression defined in " + 
FILE_FILTER.getDisplayName() + ".");
     static final AllowableValue FILTER_FULL_PATH_VALUE = new 
AllowableValue(FILTER_MODE_FULL_PATH,
         "Full Path",
-        "Filtering will be applied to the full path of files.  If " + 
RECURSE_SUBDIRS.getName()
-                + " is set to true, the entire subdirectory tree will be 
searched for files in which the full path of "
-                + "the file matches the regular expression defined in " + 
FILE_FILTER.getName() + ".");
+        "Filtering will be applied by evaluating the regular expression 
defined in " + FILE_FILTER.getDisplayName()
+                + " against the full path of files with and without the scheme 
and authority.  If "
+                + RECURSE_SUBDIRS.getDisplayName() + " is set to true, the 
entire subdirectory tree will be searched for files in which the full path of "
+                + "the file matches the regular expression defined in " + 
FILE_FILTER.getDisplayName() + ".  See 'Additional Details' for more 
information.");
 
     public static final PropertyDescriptor FILE_FILTER_MODE = new 
PropertyDescriptor.Builder()
         .name("file-filter-mode")
         .displayName("File Filter Mode")
-        .description("Determines how the regular expression in  " + 
FILE_FILTER.getName() + " will be used when retrieving listings.")
+        .description("Determines how the regular expression in  " + 
FILE_FILTER.getDisplayName() + " will be used when retrieving listings.")
         .required(true)
         .allowableValues(FILTER_DIRECTORIES_AND_FILES_VALUE, 
FILTER_FILES_ONLY_VALUE, FILTER_FULL_PATH_VALUE)
         .defaultValue(FILTER_DIRECTORIES_AND_FILES_VALUE.getValue())
@@ -181,12 +183,21 @@ public class ListHDFS extends AbstractHadoopProcessor {
     static final String EMITTED_TIMESTAMP_KEY = "emitted.timestamp";
 
     static final long LISTING_LAG_NANOS = TimeUnit.MILLISECONDS.toNanos(100L);
+    private Pattern fileFilterRegexPattern;
 
     @Override
     protected void init(final ProcessorInitializationContext context) {
         super.init(context);
     }
 
+    @Override
+    protected void preProcessConfiguration(Configuration config, 
ProcessContext context) {
+        super.preProcessConfiguration(config, context);
+        // Since this processor is marked as INPUT_FORBIDDEN, the FILE_FILTER 
regex can be compiled here rather than during onTrigger processing
+        fileFilterRegexPattern = 
Pattern.compile(context.getProperty(FILE_FILTER).getValue());
+
+    }
+
     protected File getPersistenceFile() {
         return new File("conf/state/" + getIdentifier());
     }
@@ -222,7 +233,7 @@ public class ListHDFS extends AbstractHadoopProcessor {
 
         if (minimumAge > maximumAge) {
             problems.add(new 
ValidationResult.Builder().valid(false).subject("GetHDFS Configuration")
-                    .explanation(MIN_AGE.getName() + " cannot be greater than 
" + MAX_AGE.getName()).build());
+                    .explanation(MIN_AGE.getDisplayName() + " cannot be 
greater than " + MAX_AGE.getDisplayName()).build());
         }
 
         return problems;
@@ -526,14 +537,14 @@ public class ListHDFS extends AbstractHadoopProcessor {
     }
 
     private PathFilter createPathFilter(final ProcessContext context) {
-        final Pattern filePattern = 
Pattern.compile(context.getProperty(FILE_FILTER).getValue());
         final String filterMode = 
context.getProperty(FILE_FILTER_MODE).getValue();
         return path -> {
             final boolean accepted;
             if (FILTER_FULL_PATH_VALUE.getValue().equals(filterMode)) {
-                accepted = filePattern.matcher(path.toString()).matches();
+                accepted = 
fileFilterRegexPattern.matcher(path.toString()).matches()
+                        || 
fileFilterRegexPattern.matcher(Path.getPathWithoutSchemeAndAuthority(path).toString()).matches();
             } else {
-                accepted =  filePattern.matcher(path.getName()).matches();
+                accepted =  
fileFilterRegexPattern.matcher(path.getName()).matches();
             }
             return accepted;
         };
diff --git 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/resources/docs/org.apache.nifi.processors.hadoop.ListHDFS/additionalDetails.html
 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/resources/docs/org.apache.nifi.processors.hadoop.ListHDFS/additionalDetails.html
index 2fd0deb..804208c 100644
--- 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/resources/docs/org.apache.nifi.processors.hadoop.ListHDFS/additionalDetails.html
+++ 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/resources/docs/org.apache.nifi.processors.hadoop.ListHDFS/additionalDetails.html
@@ -28,11 +28,16 @@
 There are three filter modes available for ListHDFS that determine how the 
regular expression in the <b><code>File Filter</code></b> property will be 
applied to listings in HDFS.
 <ul>
     <li><b><code>Directories and Files</code></b></li>
-Filtering will be applied to the names of directories and files.  If 
<b><code>Recurse Subdirectories</code></b> is set to true, only subdirectories 
with a matching name will be searched for files that match the regular 
expression defined in <b><code>File Filter</code></b>.
+    Filtering will be applied to the names of directories and files.  If 
<b><code>Recurse Subdirectories</code></b> is set to true, only subdirectories 
with a matching name will be searched for files that match the regular 
expression defined in <b><code>File Filter</code></b>.
     <li><b><code>Files Only</code></b></li>
-Filtering will only be applied to the names of files.  If <b><code>Recurse 
Subdirectories</code></b> is set to true, the entire subdirectory tree will be 
searched for files that match the regular expression defined in <b><code>File 
Filter</code></b>.
+    Filtering will only be applied to the names of files.  If <b><code>Recurse 
Subdirectories</code></b> is set to true, the entire subdirectory tree will be 
searched for files that match the regular expression defined in <b><code>File 
Filter</code></b>.
     <li><b><code>Full Path</code></b></li>
-Filtering will be applied to the full path of files.  If <b><code>Recurse 
Subdirectories</code></b> is set to true, the entire subdirectory tree will be 
searched for files in which the full path of the file matches the regular 
expression defined in <b><code>File Filter</code></b>.
+    Filtering will be applied to the full path of files.  If <b><code>Recurse 
Subdirectories</code></b> is set to true, the entire subdirectory tree will be 
searched for files in which the full path of the file matches the regular 
expression defined in <b><code>File Filter</code></b>.<br>
+    Regarding <code>scheme</code> and <code>authority</code>, if a given file 
has a full path of <code>hdfs://hdfscluster:8020/data/txt/1.txt</code>, the 
filter will evaluate the regular expression defined in <b><code>File 
Filter</code></b> against two cases, matching if either is true:<br>
+    <ul>
+        <li>the full path including the scheme (<code>hdfs</code>), authority 
(<code>hdfscluster:8020</code>), and the remaining path components 
(<code>/data/txt/1.txt</code>)</li>
+        <li>only the path components (<code>/data/txt/1.txt</code>)</li>
+    </ul>
 </ul>
 <p>
 <h2>Examples:</h2>
diff --git 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFS.java
 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFS.java
index 8134b4d..18c7e34 100644
--- 
a/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFS.java
+++ 
b/nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/test/java/org/apache/nifi/processors/hadoop/TestListHDFS.java
@@ -20,7 +20,15 @@ import static 
org.apache.nifi.processors.hadoop.ListHDFS.FILTER_DIRECTORIES_AND_
 import static 
org.apache.nifi.processors.hadoop.ListHDFS.FILTER_FILES_ONLY_VALUE;
 import static 
org.apache.nifi.processors.hadoop.ListHDFS.FILTER_FULL_PATH_VALUE;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.atLeast;
+import static org.mockito.Mockito.atLeastOnce;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import java.io.File;
@@ -34,11 +42,14 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
+import java.util.stream.Stream;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
@@ -54,6 +65,7 @@ import 
org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
 import org.apache.nifi.distributed.cache.client.Serializer;
 import org.apache.nifi.hadoop.KerberosProperties;
 import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.MockComponentLog;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.NiFiProperties;
 import org.apache.nifi.util.TestRunner;
@@ -61,6 +73,7 @@ import org.apache.nifi.util.TestRunners;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.ArgumentCaptor;
 
 public class TestListHDFS {
 
@@ -68,6 +81,7 @@ public class TestListHDFS {
     private ListHDFSWithMockedFileSystem proc;
     private NiFiProperties mockNiFiProperties;
     private KerberosProperties kerberosProperties;
+    private MockComponentLog mockLogger;
 
     @Before
     public void setup() throws InitializationException {
@@ -76,7 +90,8 @@ public class TestListHDFS {
         kerberosProperties = new KerberosProperties(null);
 
         proc = new ListHDFSWithMockedFileSystem(kerberosProperties);
-        runner = TestRunners.newTestRunner(proc);
+        mockLogger = spy(new MockComponentLog(UUID.randomUUID().toString(), 
proc));
+        runner = TestRunners.newTestRunner(proc, mockLogger);
 
         runner.setProperty(ListHDFS.HADOOP_CONFIGURATION_RESOURCES, 
"src/test/resources/core-site.xml");
         runner.setProperty(ListHDFS.DIRECTORY, "/test");
@@ -279,25 +294,92 @@ public class TestListHDFS {
     }
 
     @Test
-    public void testRecursiveWithCustomFilterFullPath() throws 
InterruptedException, IOException {
+    public void 
testRecursiveWithCustomFilterFullPathWithoutSchemeAndAuthority() throws 
InterruptedException, IOException {
         // set custom regex filter and filter mode
         runner.setProperty(ListHDFS.FILE_FILTER, "(/.*/)*anotherDir/1\\..*");
         runner.setProperty(ListHDFS.FILE_FILTER_MODE, 
FILTER_FULL_PATH_VALUE.getValue());
 
-        proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, 
false, 1, 1L, 0L, 0L, create777(), "owner", "group", new 
Path("/test/testFile.out")));
-        proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, 
false, 1, 1L, 0L, 0L, create777(), "owner", "group", new 
Path("/test/testFile.txt")));
+        proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", 
"/test"),
+                new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", 
"group", new Path("hdfs", "hdfscluster:8020", "/test/testFile.out")));
+        proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", 
"/test"),
+                new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", 
"group", new Path("hdfs", "hdfscluster:8020", "/test/testFile.txt")));
+
+        proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", 
"/test"),
+                new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", 
"group", new Path("hdfs", "hdfscluster:8020", "/test/testDir")));
+        proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", 
"/test/testDir"),
+                new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", 
"group", new Path("hdfs", "hdfscluster:8020", "/test/testDir/1.txt")));
+
+        proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", 
"/test/testDir"),
+                new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", 
"group", new Path("hdfs", "hdfscluster:8020", "/test/testDir/anotherDir")));
+        proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", 
"/test/testDir/anotherDir"),
+                new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", 
"group", new Path("hdfs", "hdfscluster:8020", 
"/test/testDir/anotherDir/1.out")));
+        proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", 
"/test/testDir/anotherDir"),
+                new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", 
"group", new Path("hdfs", "hdfscluster:8020", 
"/test/testDir/anotherDir/1.txt")));
+        proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", 
"/test/testDir/anotherDir"),
+                new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", 
"group", new Path("hdfs", "hdfscluster:8020", 
"/test/testDir/anotherDir/2.out")));
+        proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", 
"/test/testDir/anotherDir"),
+                new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", 
"group", new Path("hdfs", "hdfscluster:8020", 
"/test/testDir/anotherDir/2.txt")));
+
+        proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", 
"/test/testDir"),
+                new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", 
"group", new Path("hdfs", "hdfscluster:8020", "/test/testDir/someDir")));
+        proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", 
"/test/testDir/someDir"),
+                new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", 
"group", new Path("hdfs", "hdfscluster:8020", "/test/testDir/someDir/1.out")));
 
-        proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, 
true, 1, 1L, 0L, 0L, create777(), "owner", "group", new Path("/test/testDir")));
-        proc.fileSystem.addFileStatus(new Path("/test/testDir"), new 
FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new 
Path("/test/testDir/1.txt")));
+        // first iteration will not pick up files because it has to instead 
check timestamps.
+        // We must then wait long enough to ensure that the listing can be 
performed safely and
+        // run the Processor again.
+        runner.run();
+        Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * 
ListHDFS.LISTING_LAG_NANOS));
+        runner.run();
 
-        proc.fileSystem.addFileStatus(new Path("/test/testDir"), new 
FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", "group", new 
Path("/test/testDir/anotherDir")));
-        proc.fileSystem.addFileStatus(new Path("/test/testDir/anotherDir"), 
new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new 
Path("/test/testDir/anotherDir/1.out")));
-        proc.fileSystem.addFileStatus(new Path("/test/testDir/anotherDir"), 
new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new 
Path("/test/testDir/anotherDir/1.txt")));
-        proc.fileSystem.addFileStatus(new Path("/test/testDir/anotherDir"), 
new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new 
Path("/test/testDir/anotherDir/2.out")));
-        proc.fileSystem.addFileStatus(new Path("/test/testDir/anotherDir"), 
new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new 
Path("/test/testDir/anotherDir/2.txt")));
+        runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 2);
+
+        final List<MockFlowFile> flowFiles = 
runner.getFlowFilesForRelationship(ListHDFS.REL_SUCCESS);
+        for (int i = 0; i < 2; i++) {
+            final MockFlowFile ff = flowFiles.get(i);
+            final String filename = ff.getAttribute("filename");
+
+            if (filename.equals("1.out")) {
+                ff.assertAttributeEquals("path", "/test/testDir/anotherDir");
+            } else if (filename.equals("1.txt")) {
+                ff.assertAttributeEquals("path", "/test/testDir/anotherDir");
+            } else {
+                Assert.fail("filename was " + filename);
+            }
+        }
+    }
+
+    @Test
+    public void testRecursiveWithCustomFilterFullPathWithSchemeAndAuthority() 
throws InterruptedException, IOException {
+        // set custom regex filter and filter mode
+        runner.setProperty(ListHDFS.FILE_FILTER, 
"hdfs://hdfscluster:8020(/.*/)*anotherDir/1\\..*");
+        runner.setProperty(ListHDFS.FILE_FILTER_MODE, 
FILTER_FULL_PATH_VALUE.getValue());
 
-        proc.fileSystem.addFileStatus(new Path("/test/testDir"), new 
FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", "group", new 
Path("/test/testDir/someDir")));
-        proc.fileSystem.addFileStatus(new Path("/test/testDir/someDir"), new 
FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", "group", new 
Path("/test/testDir/someDir/1.out")));
+        proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", 
"/test"),
+                new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", 
"group", new Path("hdfs", "hdfscluster:8020", "/test/testFile.out")));
+        proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", 
"/test"),
+                new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", 
"group", new Path("hdfs", "hdfscluster:8020", "/test/testFile.txt")));
+
+        proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", 
"/test"),
+                new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", 
"group", new Path("hdfs", "hdfscluster:8020", "/test/testDir")));
+        proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", 
"/test/testDir"),
+                new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", 
"group", new Path("hdfs", "hdfscluster:8020", "/test/testDir/1.txt")));
+
+        proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", 
"/test/testDir"),
+                new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", 
"group", new Path("hdfs", "hdfscluster:8020", "/test/testDir/anotherDir")));
+        proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", 
"/test/testDir/anotherDir"),
+                new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", 
"group", new Path("hdfs", "hdfscluster:8020", 
"/test/testDir/anotherDir/1.out")));
+        proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", 
"/test/testDir/anotherDir"),
+                new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", 
"group", new Path("hdfs", "hdfscluster:8020", 
"/test/testDir/anotherDir/1.txt")));
+        proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", 
"/test/testDir/anotherDir"),
+                new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", 
"group", new Path("hdfs", "hdfscluster:8020", 
"/test/testDir/anotherDir/2.out")));
+        proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", 
"/test/testDir/anotherDir"),
+                new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", 
"group", new Path("hdfs", "hdfscluster:8020", 
"/test/testDir/anotherDir/2.txt")));
+
+        proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", 
"/test/testDir"),
+                new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", 
"group", new Path("hdfs", "hdfscluster:8020", "/test/testDir/someDir")));
+        proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", 
"/test/testDir/someDir"),
+                new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", 
"group", new Path("hdfs", "hdfscluster:8020", "/test/testDir/someDir/1.out")));
 
         // first iteration will not pick up files because it has to instead 
check timestamps.
         // We must then wait long enough to ensure that the listing can be 
performed safely and
@@ -532,6 +614,115 @@ public class TestListHDFS {
         runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 3);
     }
 
+    @Test
+    public void testListingEmptyDir() throws InterruptedException, IOException 
{
+        runner.setProperty(ListHDFS.DIRECTORY, "/test/emptyDir");
+
+        proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", 
"/test"),
+                new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", 
"group", new Path("hdfs", "hdfscluster:8020", "/test/testFile.out")));
+        proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", 
"/test"),
+                new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", 
"group", new Path("hdfs", "hdfscluster:8020", "/test/testFile.txt")));
+
+        proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", 
"/test"),
+                new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", 
"group", new Path("hdfs", "hdfscluster:8020", "/test/emptyDir")));
+
+        proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", 
"/test"),
+                new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", 
"group", new Path("hdfs", "hdfscluster:8020", "/test/testDir")));
+        proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", 
"/test/testDir"),
+                new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", 
"group", new Path("hdfs", "hdfscluster:8020", "/test/testDir/1.txt")));
+
+        proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", 
"/test/testDir"),
+                new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", 
"group", new Path("hdfs", "hdfscluster:8020", "/test/testDir/anotherDir")));
+        proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", 
"/test/testDir/anotherDir"),
+                new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", 
"group", new Path("hdfs", "hdfscluster:8020", 
"/test/testDir/anotherDir/1.out")));
+        proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", 
"/test/testDir/anotherDir"),
+                new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", 
"group", new Path("hdfs", "hdfscluster:8020", 
"/test/testDir/anotherDir/1.txt")));
+        proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", 
"/test/testDir/anotherDir"),
+                new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", 
"group", new Path("hdfs", "hdfscluster:8020", 
"/test/testDir/anotherDir/2.out")));
+        proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", 
"/test/testDir/anotherDir"),
+                new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", 
"group", new Path("hdfs", "hdfscluster:8020", 
"/test/testDir/anotherDir/2.txt")));
+
+        proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", 
"/test/testDir"),
+                new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", 
"group", new Path("hdfs", "hdfscluster:8020", "/test/testDir/someDir")));
+        proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", 
"/test/testDir/someDir"),
+                new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", 
"group", new Path("hdfs", "hdfscluster:8020", "/test/testDir/someDir/1.out")));
+
+        // first iteration will not pick up files because it has to instead 
check timestamps.
+        // We must then wait long enough to ensure that the listing can be 
performed safely and
+        // run the Processor again.
+        runner.run();
+        Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * 
ListHDFS.LISTING_LAG_NANOS));
+        runner.run();
+
+        // verify that no messages were logged at the error level
+        verify(mockLogger, never()).error(anyString());
+        final ArgumentCaptor<Throwable> throwableArgumentCaptor = 
ArgumentCaptor.forClass(Throwable.class);
+        verify(mockLogger, atLeast(0)).error(anyString(), 
throwableArgumentCaptor.capture());
+        // if error.(message, throwable) was called, ignore JobConf CNFEs 
since mapreduce libs are not included as dependencies
+        
assertTrue(throwableArgumentCaptor.getAllValues().stream().flatMap(Stream::of)
+                // check that there are no throwables that are not of JobConf 
CNFE exceptions
+                .noneMatch(throwable -> !(throwable instanceof 
ClassNotFoundException && throwable.getMessage().contains("JobConf"))));
+        verify(mockLogger, never()).error(anyString(), any(Object[].class));
+        verify(mockLogger, never()).error(anyString(), any(Object[].class), 
any(Throwable.class));
+
+        // assert that no files were listed
+        runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 0);
+        // assert that no files were penalized
+        runner.assertPenalizeCount(0);
+    }
+
+    @Test
+    public void testListingNonExistingDir() throws InterruptedException, 
IOException {
+        String nonExistingPath = "/test/nonExistingDir";
+        runner.setProperty(ListHDFS.DIRECTORY, nonExistingPath);
+
+        proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", 
"/test"),
+                new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", 
"group", new Path("hdfs", "hdfscluster:8020", "/test/testFile.out")));
+        proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", 
"/test"),
+                new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", 
"group", new Path("hdfs", "hdfscluster:8020", "/test/testFile.txt")));
+
+        proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", 
"/test"),
+                new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", 
"group", new Path("hdfs", "hdfscluster:8020", "/test/emptyDir")));
+
+        proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", 
"/test"),
+                new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", 
"group", new Path("hdfs", "hdfscluster:8020", "/test/testDir")));
+        proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", 
"/test/testDir"),
+                new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", 
"group", new Path("hdfs", "hdfscluster:8020", "/test/testDir/1.txt")));
+
+        proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", 
"/test/testDir"),
+                new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", 
"group", new Path("hdfs", "hdfscluster:8020", "/test/testDir/anotherDir")));
+        proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", 
"/test/testDir/anotherDir"),
+                new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", 
"group", new Path("hdfs", "hdfscluster:8020", 
"/test/testDir/anotherDir/1.out")));
+        proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", 
"/test/testDir/anotherDir"),
+                new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", 
"group", new Path("hdfs", "hdfscluster:8020", 
"/test/testDir/anotherDir/1.txt")));
+        proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", 
"/test/testDir/anotherDir"),
+                new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", 
"group", new Path("hdfs", "hdfscluster:8020", 
"/test/testDir/anotherDir/2.out")));
+        proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", 
"/test/testDir/anotherDir"),
+                new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", 
"group", new Path("hdfs", "hdfscluster:8020", 
"/test/testDir/anotherDir/2.txt")));
+
+        proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", 
"/test/testDir"),
+                new FileStatus(1L, true, 1, 1L, 0L, 0L, create777(), "owner", 
"group", new Path("hdfs", "hdfscluster:8020", "/test/testDir/someDir")));
+        proc.fileSystem.addFileStatus(new Path("hdfs", "hdfscluster:8020", 
"/test/testDir/someDir"),
+                new FileStatus(1L, false, 1, 1L, 0L, 0L, create777(), "owner", 
"group", new Path("hdfs", "hdfscluster:8020", "/test/testDir/someDir/1.out")));
+
+        // first iteration will not pick up files because it has to instead 
check timestamps.
+        // We must then wait long enough to ensure that the listing can be 
performed safely and
+        // run the Processor again.
+        runner.run();
+        Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * 
ListHDFS.LISTING_LAG_NANOS));
+        runner.run();
+
+        final ArgumentCaptor<Object[]> loggingArgsCaptor = 
ArgumentCaptor.forClass(Object[].class);
+        verify(mockLogger, atLeastOnce()).error(anyString(), 
loggingArgsCaptor.capture());
+        // assert that FNFE exceptions were logged for the Directory 
property's value.
+        
assertTrue(loggingArgsCaptor.getAllValues().stream().flatMap(Stream::of)
+                .anyMatch(o -> o instanceof FileNotFoundException && 
((FileNotFoundException)o).getMessage().contains(nonExistingPath)));
+
+        // assert that no files were listed
+        runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 0);
+        // assert that no files were penalized
+        runner.assertPenalizeCount(0);
+    }
 
     private FsPermission create777() {
         return new FsPermission((short) 0777);
@@ -578,6 +769,11 @@ public class TestListHDFS {
             }
 
             children.add(child);
+
+            // if the child is directory and a key for it does not exist, 
create it with an empty set of children
+            if (child.isDirectory() && 
!fileStatuses.containsKey(child.getPath())) {
+                fileStatuses.put(child.getPath(), new HashSet<>());
+            }
         }
 
         @Override
@@ -625,12 +821,21 @@ public class TestListHDFS {
 
         @Override
         public FileStatus[] listStatus(final Path f) throws 
FileNotFoundException, IOException {
-            final Set<FileStatus> statuses = fileStatuses.get(f);
-            if (statuses == null) {
-                return new FileStatus[0];
-            }
-
-            return statuses.toArray(new FileStatus[statuses.size()]);
+            return fileStatuses.keySet().stream()
+                    // find the key in fileStatuses that matches the given 
Path f
+                    .filter(pathKey -> f.isAbsoluteAndSchemeAuthorityNull()
+                            // f is an absolute path with no scheme and no 
authority, compare with the keys of fileStatuses without their scheme and 
authority
+                            ? 
Path.getPathWithoutSchemeAndAuthority(pathKey).equals(Path.getPathWithoutSchemeAndAuthority(f))
 :
+                            // f is absolute, but contains a scheme or 
authority, compare directly to the keys of fileStatuses
+                            // if f is not absolute, false will be returned;
+                            f.isAbsolute() && pathKey.equals(f))
+                    // get the set of FileStatus objects for the filtered 
paths in the stream
+                    .map(fileStatuses::get)
+                    // return the first set of FileStatus objects in the 
stream; there should only be one, since fileStatuses is a Map
+                    .findFirst()
+
+                    // if no set of FileStatus objects was found, throw a FNFE
+                    .orElseThrow(() -> new 
FileNotFoundException(String.format("%s instance does not contain an key for 
%s", this.getClass().getSimpleName(), f))).toArray(new FileStatus[0]);
         }
 
         @Override

Reply via email to