tpalfy commented on code in PR #7240:
URL: https://github.com/apache/nifi/pull/7240#discussion_r1255861882


##########
nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java:
##########
@@ -36,642 +33,306 @@
 import org.apache.nifi.annotation.documentation.SeeAlso;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
-import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.components.state.Scope;
 import org.apache.nifi.components.state.StateMap;
-import org.apache.nifi.deprecation.log.DeprecationLogger;
-import org.apache.nifi.deprecation.log.DeprecationLoggerFactory;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.flowfile.attributes.CoreAttributes;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.ProcessorInitializationContext;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.hadoop.util.FileStatusIterable;
+import org.apache.nifi.processors.hadoop.util.FileStatusManager;
+import org.apache.nifi.processors.hadoop.util.FilterMode;
+import org.apache.nifi.processors.hadoop.util.writer.FlowFileObjectWriter;
+import org.apache.nifi.processors.hadoop.util.writer.HadoopFileStatusWriter;
+import org.apache.nifi.processors.hadoop.util.writer.RecordObjectWriter;
 import org.apache.nifi.scheduling.SchedulingStrategy;
-import org.apache.nifi.schema.access.SchemaNotFoundException;
-import org.apache.nifi.serialization.RecordSetWriter;
 import org.apache.nifi.serialization.RecordSetWriterFactory;
-import org.apache.nifi.serialization.SimpleRecordSchema;
-import org.apache.nifi.serialization.WriteResult;
-import org.apache.nifi.serialization.record.MapRecord;
-import org.apache.nifi.serialization.record.Record;
-import org.apache.nifi.serialization.record.RecordField;
-import org.apache.nifi.serialization.record.RecordFieldType;
-import org.apache.nifi.serialization.record.RecordSchema;
-
-import java.io.File;
+
 import java.io.IOException;
-import java.io.OutputStream;
-import java.security.PrivilegedExceptionAction;
-import java.sql.Timestamp;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.TreeMap;
 import java.util.concurrent.TimeUnit;
 import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static 
org.apache.nifi.processors.hadoop.util.FilterMode.FILTER_DIRECTORIES_AND_FILES;
 
 @PrimaryNodeOnly
 @TriggerSerially
 @TriggerWhenEmpty
 @InputRequirement(Requirement.INPUT_FORBIDDEN)
 @Tags({"hadoop", "HCFS", "HDFS", "get", "list", "ingest", "source", 
"filesystem"})
 @SeeAlso({GetHDFS.class, FetchHDFS.class, PutHDFS.class})
-@CapabilityDescription("Retrieves a listing of files from HDFS. Each time a 
listing is performed, the files with the latest timestamp will be excluded "
-        + "and picked up during the next execution of the processor. This is 
done to ensure that we do not miss any files, or produce duplicates, in the "
-        + "cases where files with the same timestamp are written immediately 
before and after a single execution of the processor. For each file that is "
-        + "listed in HDFS, this processor creates a FlowFile that represents 
the HDFS file to be fetched in conjunction with FetchHDFS. This Processor is "
-        +  "designed to run on Primary Node only in a cluster. If the primary 
node changes, the new Primary Node will pick up where the previous node left "
-        +  "off without duplicating all of the data. Unlike GetHDFS, this 
Processor does not delete any data from HDFS.")
+@CapabilityDescription("Retrieves a listing of files from HDFS. For each file 
that is listed in HDFS, this processor creates a FlowFile that represents "
+        + "the HDFS file to be fetched in conjunction with FetchHDFS. This 
Processor is designed to run on Primary Node only in a cluster. If the primary "
+        + "node changes, the new Primary Node will pick up where the previous 
node left off without duplicating all of the data. Unlike GetHDFS, this "
+        + "Processor does not delete any data from HDFS.")
 @WritesAttributes({
-    @WritesAttribute(attribute="filename", description="The name of the file 
that was read from HDFS."),
-    @WritesAttribute(attribute="path", description="The path is set to the 
absolute path of the file's directory on HDFS. For example, if the Directory 
property is set to /tmp, "
-            + "then files picked up from /tmp will have the path attribute set 
to \"./\". If the Recurse Subdirectories property is set to true and a file is 
picked up "
-            + "from /tmp/abc/1/2/3, then the path attribute will be set to 
\"/tmp/abc/1/2/3\"."),
-    @WritesAttribute(attribute="hdfs.owner", description="The user that owns 
the file in HDFS"),
-    @WritesAttribute(attribute="hdfs.group", description="The group that owns 
the file in HDFS"),
-    @WritesAttribute(attribute="hdfs.lastModified", description="The timestamp 
of when the file in HDFS was last modified, as milliseconds since midnight Jan 
1, 1970 UTC"),
-    @WritesAttribute(attribute="hdfs.length", description="The number of bytes 
in the file in HDFS"),
-    @WritesAttribute(attribute="hdfs.replication", description="The number of 
HDFS replicas for hte file"),
-    @WritesAttribute(attribute="hdfs.permissions", description="The 
permissions for the file in HDFS. This is formatted as 3 characters for the 
owner, "
-            + "3 for the group, and 3 for other users. For example rw-rw-r--")
+        @WritesAttribute(attribute = "filename", description = "The name of 
the file that was read from HDFS."),
+        @WritesAttribute(attribute = "path", description = "The path is set to 
the absolute path of the file's directory on HDFS. For example, if the 
Directory property is set to /tmp, "
+                + "then files picked up from /tmp will have the path attribute 
set to \"./\". If the Recurse Subdirectories property is set to true and a file 
is picked up "
+                + "from /tmp/abc/1/2/3, then the path attribute will be set to 
\"/tmp/abc/1/2/3\"."),
+        @WritesAttribute(attribute = "hdfs.owner", description = "The user 
that owns the file in HDFS"),
+        @WritesAttribute(attribute = "hdfs.group", description = "The group 
that owns the file in HDFS"),
+        @WritesAttribute(attribute = "hdfs.lastModified", description = "The 
timestamp of when the file in HDFS was last modified, as milliseconds since 
midnight Jan 1, 1970 UTC"),
+        @WritesAttribute(attribute = "hdfs.length", description = "The number 
of bytes in the file in HDFS"),
+        @WritesAttribute(attribute = "hdfs.replication", description = "The 
number of HDFS replicas for hte file"),
+        @WritesAttribute(attribute = "hdfs.permissions", description = "The 
permissions for the file in HDFS. This is formatted as 3 characters for the 
owner, "
+                + "3 for the group, and 3 for other users. For example 
rw-rw-r--")
 })
-@Stateful(scopes = Scope.CLUSTER, description = "After performing a listing of 
HDFS files, the latest timestamp of all the files listed and the latest "
-        + "timestamp of all the files transferred are both stored. This allows 
the Processor to list only files that have been added or modified after "
-        + "this date the next time that the Processor is run, without having 
to store all of the actual filenames/paths which could lead to performance "
-        + "problems. State is stored across the cluster so that this Processor 
can be run on Primary Node only and if a new Primary "
-        + "Node is selected, the new node can pick up where the previous node 
left off, without duplicating the data.")
+@Stateful(scopes = Scope.CLUSTER, description = "After performing a listing of 
HDFS files, the latest timestamp of all the files listed is stored. "
+        + "This allows the Processor to list only files that have been added 
or modified after this date the next time that the Processor is run, "
+        + "without having to store all of the actual filenames/paths which 
could lead to performance problems. State is stored across the cluster "
+        + "so that this Processor can be run on Primary Node only and if a new 
Primary Node is selected, the new node can pick up where the previous "
+        + "node left off, without duplicating the data.")
 @DefaultSchedule(strategy = SchedulingStrategy.TIMER_DRIVEN, period = "1 min")
 public class ListHDFS extends AbstractHadoopProcessor {
 
-    private static final RecordSchema RECORD_SCHEMA;
-    private static final String FILENAME = "filename";
-    private static final String PATH = "path";
-    private static final String IS_DIRECTORY = "directory";
-    private static final String SIZE = "size";
-    private static final String LAST_MODIFIED = "lastModified";
-    private static final String PERMISSIONS = "permissions";
-    private static final String OWNER = "owner";
-    private static final String GROUP = "group";
-    private static final String REPLICATION = "replication";
-    private static final String IS_SYM_LINK = "symLink";
-    private static final String IS_ENCRYPTED = "encrypted";
-    private static final String IS_ERASURE_CODED = "erasureCoded";
-
-    static {
-        final List<RecordField> recordFields = new ArrayList<>();
-        recordFields.add(new RecordField(FILENAME, 
RecordFieldType.STRING.getDataType(), false));
-        recordFields.add(new RecordField(PATH, 
RecordFieldType.STRING.getDataType(), false));
-        recordFields.add(new RecordField(IS_DIRECTORY, 
RecordFieldType.BOOLEAN.getDataType(), false));
-        recordFields.add(new RecordField(SIZE, 
RecordFieldType.LONG.getDataType(), false));
-        recordFields.add(new RecordField(LAST_MODIFIED, 
RecordFieldType.TIMESTAMP.getDataType(), false));
-        recordFields.add(new RecordField(PERMISSIONS, 
RecordFieldType.STRING.getDataType()));
-        recordFields.add(new RecordField(OWNER, 
RecordFieldType.STRING.getDataType()));
-        recordFields.add(new RecordField(GROUP, 
RecordFieldType.STRING.getDataType()));
-        recordFields.add(new RecordField(REPLICATION, 
RecordFieldType.INT.getDataType()));
-        recordFields.add(new RecordField(IS_SYM_LINK, 
RecordFieldType.BOOLEAN.getDataType()));
-        recordFields.add(new RecordField(IS_ENCRYPTED, 
RecordFieldType.BOOLEAN.getDataType()));
-        recordFields.add(new RecordField(IS_ERASURE_CODED, 
RecordFieldType.BOOLEAN.getDataType()));
-        RECORD_SCHEMA = new SimpleRecordSchema(recordFields);
-    }
+    private static final String NON_HIDDEN_FILES_REGEX = "[^\\.].*";
 
     public static final PropertyDescriptor RECURSE_SUBDIRS = new 
PropertyDescriptor.Builder()
-        .name("Recurse Subdirectories")
-        .description("Indicates whether to list files from subdirectories of 
the HDFS directory")
-        .required(true)
-        .allowableValues("true", "false")
-        .defaultValue("true")
-        .build();
+            .name("Recurse Subdirectories")
+            .description("Indicates whether to list files from subdirectories 
of the HDFS directory")
+            .required(true)
+            .allowableValues("true", "false")
+            .defaultValue("true")
+            .build();
 
     public static final PropertyDescriptor RECORD_WRITER = new 
PropertyDescriptor.Builder()
-        .name("record-writer")
-        .displayName("Record Writer")
-        .description("Specifies the Record Writer to use for creating the 
listing. If not specified, one FlowFile will be created for each entity that is 
listed. If the Record Writer is specified, " +
-            "all entities will be written to a single FlowFile.")
-        .required(false)
-        .identifiesControllerService(RecordSetWriterFactory.class)
-        .build();
+            .name("record-writer")
+            .displayName("Record Writer")
+            .description("Specifies the Record Writer to use for creating the 
listing. If not specified, one FlowFile will be created for each "
+                    + "entity that is listed. If the Record Writer is 
specified, all entities will be written to a single FlowFile.")
+            .required(false)
+            .identifiesControllerService(RecordSetWriterFactory.class)
+            .build();
 
     public static final PropertyDescriptor FILE_FILTER = new 
PropertyDescriptor.Builder()
-        .name("File Filter")
-        .description("Only files whose names match the given regular 
expression will be picked up")
-        .required(true)
-        .defaultValue("[^\\.].*")
-        .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
-        .build();
-
-    private static final String FILTER_MODE_DIRECTORIES_AND_FILES = 
"filter-mode-directories-and-files";
-    private static final String FILTER_MODE_FILES_ONLY = 
"filter-mode-files-only";
-    private static final String FILTER_MODE_FULL_PATH = 
"filter-mode-full-path";
-    static final AllowableValue FILTER_DIRECTORIES_AND_FILES_VALUE = new 
AllowableValue(FILTER_MODE_DIRECTORIES_AND_FILES,
-        "Directories and Files",
-        "Filtering will be applied to the names of directories and files.  If 
" + RECURSE_SUBDIRS.getDisplayName()
-                + " is set to true, only subdirectories with a matching name 
will be searched for files that match "
-                + "the regular expression defined in " + 
FILE_FILTER.getDisplayName() + ".");
-    static final AllowableValue FILTER_FILES_ONLY_VALUE = new 
AllowableValue(FILTER_MODE_FILES_ONLY,
-        "Files Only",
-        "Filtering will only be applied to the names of files.  If " + 
RECURSE_SUBDIRS.getDisplayName()
-                + " is set to true, the entire subdirectory tree will be 
searched for files that match "
-                + "the regular expression defined in " + 
FILE_FILTER.getDisplayName() + ".");
-    static final AllowableValue FILTER_FULL_PATH_VALUE = new 
AllowableValue(FILTER_MODE_FULL_PATH,
-        "Full Path",
-        "Filtering will be applied by evaluating the regular expression 
defined in " + FILE_FILTER.getDisplayName()
-                + " against the full path of files with and without the scheme 
and authority.  If "
-                + RECURSE_SUBDIRS.getDisplayName() + " is set to true, the 
entire subdirectory tree will be searched for files in which the full path of "
-                + "the file matches the regular expression defined in " + 
FILE_FILTER.getDisplayName() + ".  See 'Additional Details' for more 
information.");
+            .name("File Filter")
+            .description("Only files whose names match the given regular 
expression will be picked up")
+            .required(true)
+            .defaultValue(NON_HIDDEN_FILES_REGEX)
+            .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
+            .build();
 
     public static final PropertyDescriptor FILE_FILTER_MODE = new 
PropertyDescriptor.Builder()
-        .name("file-filter-mode")
-        .displayName("File Filter Mode")
-        .description("Determines how the regular expression in  " + 
FILE_FILTER.getDisplayName() + " will be used when retrieving listings.")
-        .required(true)
-        .allowableValues(FILTER_DIRECTORIES_AND_FILES_VALUE, 
FILTER_FILES_ONLY_VALUE, FILTER_FULL_PATH_VALUE)
-        .defaultValue(FILTER_DIRECTORIES_AND_FILES_VALUE.getValue())
-        .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
-        .build();
-
-    public static final PropertyDescriptor MIN_AGE = new 
PropertyDescriptor.Builder()
-        .name("minimum-file-age")
-        .displayName("Minimum File Age")
-        .description("The minimum age that a file must be in order to be 
pulled; any file younger than this "
-                + "amount of time (based on last modification date) will be 
ignored")
-        .required(false)
-        .addValidator(StandardValidators.createTimePeriodValidator(0, 
TimeUnit.MILLISECONDS, Long.MAX_VALUE, TimeUnit.NANOSECONDS))
-        .build();
-
-    public static final PropertyDescriptor MAX_AGE = new 
PropertyDescriptor.Builder()
-        .name("maximum-file-age")
-        .displayName("Maximum File Age")
-        .description("The maximum age that a file must be in order to be 
pulled; any file older than this "
-                + "amount of time (based on last modification date) will be 
ignored. Minimum value is 100ms.")
-        .required(false)
-        .addValidator(StandardValidators.createTimePeriodValidator(100, 
TimeUnit.MILLISECONDS, Long.MAX_VALUE, TimeUnit.NANOSECONDS))
-        .build();
+            .name("file-filter-mode")
+            .displayName("File Filter Mode")
+            .description("Determines how the regular expression in  " + 
FILE_FILTER.getDisplayName() + " will be used when retrieving listings.")
+            .required(true)
+            .allowableValues(FilterMode.class)
+            .defaultValue(FILTER_DIRECTORIES_AND_FILES.getValue())
+            .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor MINIMUM_FILE_AGE = new 
PropertyDescriptor.Builder()
+            .name("minimum-file-age")
+            .displayName("Minimum File Age")
+            .description("The minimum age that a file must be in order to be 
pulled; any file younger than this "
+                    + "amount of time (based on last modification date) will 
be ignored")
+            .required(false)
+            .addValidator(StandardValidators.createTimePeriodValidator(0, 
TimeUnit.MILLISECONDS, Long.MAX_VALUE, TimeUnit.NANOSECONDS))
+            .build();
+
+    public static final PropertyDescriptor MAXIMUM_FILE_AGE = new 
PropertyDescriptor.Builder()
+            .name("maximum-file-age")
+            .displayName("Maximum File Age")
+            .description("The maximum age that a file must be in order to be 
pulled; any file older than this "
+                    + "amount of time (based on last modification date) will 
be ignored. Minimum value is 100ms.")
+            .required(false)
+            .addValidator(StandardValidators.createTimePeriodValidator(100, 
TimeUnit.MILLISECONDS, Long.MAX_VALUE, TimeUnit.NANOSECONDS))
+            .build();
 
     public static final Relationship REL_SUCCESS = new Relationship.Builder()
-        .name("success")
-        .description("All FlowFiles are transferred to this relationship")
-        .build();
-
-    private static final DeprecationLogger deprecationLogger = 
DeprecationLoggerFactory.getLogger(ListHDFS.class);
-
-    private volatile long latestTimestampListed = -1L;
-    private volatile long latestTimestampEmitted = -1L;
-    private volatile long lastRunTimestamp = -1L;
-    private volatile boolean resetState = false;
-    static final String LISTING_TIMESTAMP_KEY = "listing.timestamp";
-    static final String EMITTED_TIMESTAMP_KEY = "emitted.timestamp";
-
-    static final long LISTING_LAG_NANOS = TimeUnit.MILLISECONDS.toNanos(100L);
+            .name("success")
+            .description("All FlowFiles are transferred to this relationship")
+            .build();
+    public static final String LEGACY_EMITTED_TIMESTAMP_KEY = 
"emitted.timestamp";
+    public static final String LEGACY_LISTING_TIMESTAMP_KEY = 
"listing.timestamp";
+    public static final String LATEST_TIMESTAMP_KEY = "latest.timestamp";
+    public static final String LATEST_FILES_KEY = "latest.file.%d";
+
+    private static final Set<Relationship> RELATIONSHIPS = 
Collections.singleton(REL_SUCCESS);
     private Pattern fileFilterRegexPattern;
-
-    @Override
-    protected void init(final ProcessorInitializationContext context) {
-        super.init(context);
-    }
+    private volatile boolean resetState = false;
 
     @Override
     protected void preProcessConfiguration(Configuration config, 
ProcessContext context) {
         super.preProcessConfiguration(config, context);
         // Since this processor is marked as INPUT_FORBIDDEN, the FILE_FILTER 
regex can be compiled here rather than during onTrigger processing
         fileFilterRegexPattern = 
Pattern.compile(context.getProperty(FILE_FILTER).getValue());
-
-    }
-
-    protected File getPersistenceFile() {
-        return new File("conf/state/" + getIdentifier());
     }
 
     @Override
     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
         final List<PropertyDescriptor> props = new ArrayList<>(properties);
-        props.add(DIRECTORY);
-        props.add(RECURSE_SUBDIRS);
-        props.add(RECORD_WRITER);
-        props.add(FILE_FILTER);
-        props.add(FILE_FILTER_MODE);
-        props.add(MIN_AGE);
-        props.add(MAX_AGE);
+        props.addAll(Arrays.asList(DIRECTORY, RECURSE_SUBDIRS, RECORD_WRITER, 
FILE_FILTER, FILE_FILTER_MODE, MINIMUM_FILE_AGE, MAXIMUM_FILE_AGE));
         return props;
     }
 
     @Override
     public Set<Relationship> getRelationships() {
-        final Set<Relationship> relationships = new HashSet<>();
-        relationships.add(REL_SUCCESS);
-        return relationships;
+        return RELATIONSHIPS;
     }
 
     @Override
     protected Collection<ValidationResult> customValidate(ValidationContext 
context) {
 
         final List<ValidationResult> problems = new 
ArrayList<>(super.customValidate(context));
 
-        final Long minAgeProp = 
context.getProperty(MIN_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
-        final Long maxAgeProp = 
context.getProperty(MAX_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
+        final Long minAgeProp = 
context.getProperty(MINIMUM_FILE_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
+        final Long maxAgeProp = 
context.getProperty(MAXIMUM_FILE_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
         final long minimumAge = (minAgeProp == null) ? 0L : minAgeProp;
         final long maximumAge = (maxAgeProp == null) ? Long.MAX_VALUE : 
maxAgeProp;
 
         if (minimumAge > maximumAge) {
-            problems.add(new 
ValidationResult.Builder().valid(false).subject("GetHDFS Configuration")
-                    .explanation(MIN_AGE.getDisplayName() + " cannot be 
greater than " + MAX_AGE.getDisplayName()).build());
+            problems.add(new 
ValidationResult.Builder().valid(false).subject("ListHDFS Configuration")
+                    .explanation(MINIMUM_FILE_AGE.getDisplayName() + " cannot 
be greater than " + MAXIMUM_FILE_AGE.getDisplayName()).build());
         }
-
         return problems;
     }
 
-    protected String getKey(final String directory) {
-        return getIdentifier() + ".lastListingTime." + directory;
-    }
-
     @Override
     public void onPropertyModified(final PropertyDescriptor descriptor, final 
String oldValue, final String newValue) {
         super.onPropertyModified(descriptor, oldValue, newValue);
         if (isConfigurationRestored() && (descriptor.equals(DIRECTORY) || 
descriptor.equals(FILE_FILTER))) {
-            this.resetState = true;
-        }
-    }
-
-    /**
-     * Determines which of the given FileStatus's describes a File that should 
be listed.
-     *
-     * @param statuses the eligible FileStatus objects that we could 
potentially list
-     * @param context processor context with properties values
-     * @return a Set containing only those FileStatus objects that we want to 
list
-     */
-    Set<FileStatus> determineListable(final Set<FileStatus> statuses, 
ProcessContext context) {
-        final long minTimestamp = this.latestTimestampListed;
-        final TreeMap<Long, List<FileStatus>> orderedEntries = new TreeMap<>();
-
-        final Long minAgeProp = 
context.getProperty(MIN_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
-        // NIFI-4144 - setting to MIN_VALUE so that in case the file 
modification time is in
-        // the future relative to the nifi instance, files are not skipped.
-        final long minimumAge = (minAgeProp == null) ? Long.MIN_VALUE : 
minAgeProp;
-        final Long maxAgeProp = 
context.getProperty(MAX_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
-        final long maximumAge = (maxAgeProp == null) ? Long.MAX_VALUE : 
maxAgeProp;
-
-        // Build a sorted map to determine the latest possible entries
-        for (final FileStatus status : statuses) {
-            if (status.getPath().getName().endsWith("_COPYING_")) {
-                continue;
-            }
-
-            final long fileAge = System.currentTimeMillis() - 
status.getModificationTime();
-            if (minimumAge > fileAge || fileAge > maximumAge) {
-                continue;
-            }
-
-            final long entityTimestamp = status.getModificationTime();
-
-            if (entityTimestamp > latestTimestampListed) {
-                latestTimestampListed = entityTimestamp;
-            }
-
-            // New entries are all those that occur at or after the associated 
timestamp
-            final boolean newEntry = entityTimestamp >= minTimestamp && 
entityTimestamp > latestTimestampEmitted;
-
-            if (newEntry) {
-                List<FileStatus> entitiesForTimestamp = 
orderedEntries.get(status.getModificationTime());
-                if (entitiesForTimestamp == null) {
-                    entitiesForTimestamp = new ArrayList<FileStatus>();
-                    orderedEntries.put(status.getModificationTime(), 
entitiesForTimestamp);
-                }
-                entitiesForTimestamp.add(status);
-            }
+            resetState = true;
         }
-
-        final Set<FileStatus> toList = new HashSet<>();
-
-        if (orderedEntries.size() > 0) {
-            long latestListingTimestamp = orderedEntries.lastKey();
-
-            // If the last listing time is equal to the newest entries 
previously seen,
-            // another iteration has occurred without new files and special 
handling is needed to avoid starvation
-            if (latestListingTimestamp == minTimestamp) {
-                // We are done if the latest listing timestamp is equal to the 
last processed time,
-                // meaning we handled those items originally passed over
-                if (latestListingTimestamp == latestTimestampEmitted) {
-                    return Collections.emptySet();
-                }
-            } else {
-                // Otherwise, newest entries are held back one cycle to avoid 
issues in writes occurring exactly when the listing is being performed to avoid 
missing data
-                orderedEntries.remove(latestListingTimestamp);
-            }
-
-            for (List<FileStatus> timestampEntities : orderedEntries.values()) 
{
-                for (FileStatus status : timestampEntities) {
-                    toList.add(status);
-                }
-            }
-        }
-
-        return toList;
     }
 
     @OnScheduled
     public void resetStateIfNecessary(final ProcessContext context) throws 
IOException {
         if (resetState) {
-            getLogger().debug("Property has been modified. Resetting the state 
values - listing.timestamp and emitted.timestamp to -1L");
+            getLogger().debug("Property has been modified. Resetting the state 
values.");
             context.getStateManager().clear(Scope.CLUSTER);
-            this.resetState = false;
+            resetState = false;
         }
     }
 
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
-        // We have to ensure that we don't continually perform listings, 
because if we perform two listings within
-        // the same millisecond, our algorithm for comparing timestamps will 
not work. So we ensure here that we do
-        // not let that happen.
-        final long now = System.nanoTime();
-        if (now - lastRunTimestamp < LISTING_LAG_NANOS) {
-            lastRunTimestamp = now;
-            context.yield();
-            return;
-        }
-        lastRunTimestamp = now;
-
         // Ensure that we are using the latest listing information before we 
try to perform a listing of HDFS files.
+        final long latestTimestamp;
+        final List<String> latestFiles;
         try {
             final StateMap stateMap = session.getState(Scope.CLUSTER);
-            if (!stateMap.getStateVersion().isPresent()) {
-                latestTimestampEmitted = -1L;
-                latestTimestampListed = -1L;
-                getLogger().debug("Found no state stored");
+            final String latestTimestampString = 
stateMap.get(LATEST_TIMESTAMP_KEY);
+
+            final String legacyLatestListingTimestampString = 
stateMap.get(LEGACY_LISTING_TIMESTAMP_KEY);
+            final String legacyLatestEmittedTimestampString = 
stateMap.get(LEGACY_EMITTED_TIMESTAMP_KEY);
+
+            if (legacyLatestListingTimestampString != null) {
+                final long legacyLatestListingTimestamp = 
Long.parseLong(legacyLatestListingTimestampString);
+                final long legacyLatestEmittedTimestamp = 
Long.parseLong(legacyLatestEmittedTimestampString);
+                latestTimestamp = legacyLatestListingTimestamp == 
legacyLatestEmittedTimestamp ? legacyLatestListingTimestamp + 1 : 
legacyLatestListingTimestamp;
+                latestFiles = new ArrayList<>();
+                getLogger().debug("Transitioned from legacy state to new 
state. 'legacyLatestListingTimestamp': {}, 'legacyLatestEmittedTimeStamp': 
{}'," +
+                        "'latestTimestamp': {}", legacyLatestListingTimestamp, 
legacyLatestEmittedTimestamp, latestTimestamp);
+            } else if (latestTimestampString != null) {
+                latestTimestamp = Long.parseLong(latestTimestampString);
+                latestFiles = stateMap.toMap().entrySet().stream()
+                        .filter(entry -> 
entry.getKey().startsWith("latest.file"))
+                        .map(Map.Entry::getValue)
+                        .collect(Collectors.toList());
             } else {
-                // Determine if state is stored in the 'new' format or the 
'old' format
-                final String emittedString = 
stateMap.get(EMITTED_TIMESTAMP_KEY);
-                if (emittedString == null) {
-                    latestTimestampEmitted = -1L;
-                    latestTimestampListed = -1L;
-                    getLogger().debug("Found no recognized state keys; 
assuming no relevant state and resetting listing/emitted time to -1");
-                } else {
-                    // state is stored in the new format, using just two 
timestamps
-                    latestTimestampEmitted = Long.parseLong(emittedString);
-                    final String listingTimestmapString = 
stateMap.get(LISTING_TIMESTAMP_KEY);
-                    if (listingTimestmapString != null) {
-                        latestTimestampListed = 
Long.parseLong(listingTimestmapString);
-                    }
-
-                    getLogger().debug("Found new-style state stored, latesting 
timestamp emitted = {}, latest listed = {}",
-                        new Object[] {latestTimestampEmitted, 
latestTimestampListed});
-                }
+                latestTimestamp = 0L;
+                latestFiles = new ArrayList<>();
             }
-        } catch (final IOException ioe) {
+        } catch (IOException e) {
             getLogger().error("Failed to retrieve timestamp of last listing 
from the State Manager. Will not perform listing until this is accomplished.");
             context.yield();
             return;
         }
 
         // Pull in any file that is newer than the timestamp that we have.
-        final FileSystem hdfs = getFileSystem();
-        final boolean recursive = 
context.getProperty(RECURSE_SUBDIRS).asBoolean();
-        String fileFilterMode = 
context.getProperty(FILE_FILTER_MODE).getValue();
+        try (final FileSystem hdfs = getFileSystem()) {
+            final boolean recursive = 
context.getProperty(RECURSE_SUBDIRS).asBoolean();
+            final PathFilter pathFilter = createPathFilter(context);
+            final RecordSetWriterFactory writerFactory = 
context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
 
-        final Set<FileStatus> statuses;
-        try {
+            final FileStatusManager fileStatusManager = new 
FileStatusManager(latestTimestamp, latestFiles);
             final Path rootPath = getNormalizedPath(context, DIRECTORY);
-            statuses = getStatuses(rootPath, recursive, hdfs, 
createPathFilter(context), fileFilterMode);
-            getLogger().debug("Found a total of {} files in HDFS", new 
Object[] {statuses.size()});
-        } catch (final IOException | IllegalArgumentException e) {
-            getLogger().error("Failed to perform listing of HDFS", e);
-            return;
-        } catch (final InterruptedException e) {
-            Thread.currentThread().interrupt();
-            getLogger().error("Interrupted while performing listing of HDFS", 
e);
-            return;
-        }
+            final FileStatusIterable fileStatuses = new 
FileStatusIterable(rootPath, recursive, hdfs, getUserGroupInformation());
 
-        final Set<FileStatus> listable = determineListable(statuses, context);
-        getLogger().debug("Of the {} files found in HDFS, {} are listable", 
new Object[] {statuses.size(), listable.size()});
-
-        // Create FlowFile(s) for the listing, if there are any
-        if (!listable.isEmpty()) {
-            if (context.getProperty(RECORD_WRITER).isSet()) {
-                try {
-                    createRecords(listable, context, session);
-                } catch (final IOException | SchemaNotFoundException e) {
-                    getLogger().error("Failed to write listing of HDFS", e);
-                    return;
-                }
-            } else {
-                createFlowFiles(listable, session);
-            }
-        }
-
-        for (final FileStatus status : listable) {
-            final long fileModTime = status.getModificationTime();
-            if (fileModTime > latestTimestampEmitted) {
-                latestTimestampEmitted = fileModTime;
-            }
-        }
-
-        final Map<String, String> updatedState = new HashMap<>(1);
-        updatedState.put(LISTING_TIMESTAMP_KEY, 
String.valueOf(latestTimestampListed));
-        updatedState.put(EMITTED_TIMESTAMP_KEY, 
String.valueOf(latestTimestampEmitted));
-        getLogger().debug("New state map: {}", new Object[] {updatedState});
-
-        try {
-            session.setState(updatedState, Scope.CLUSTER);
-        } catch (final IOException ioe) {
-            getLogger().warn("Failed to save cluster-wide state. If NiFi is 
restarted, data duplication may occur", ioe);
-        }
-
-        final int listCount = listable.size();
-        if ( listCount > 0 ) {
-            getLogger().info("Successfully created listing with {} new files 
from HDFS", new Object[] {listCount});
-            session.commitAsync();
-        } else {
-            getLogger().debug("There is no data to list. Yielding.");
-            context.yield();
-        }
-    }
+            final Long minAgeProp = 
context.getProperty(MINIMUM_FILE_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
+            final long minimumAge = (minAgeProp == null) ? Long.MIN_VALUE : 
minAgeProp;
+            final Long maxAgeProp = 
context.getProperty(MAXIMUM_FILE_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
+            final long maximumAge = (maxAgeProp == null) ? Long.MAX_VALUE : 
maxAgeProp;
 
-    private void createFlowFiles(final Set<FileStatus> fileStatuses, final 
ProcessSession session) {
-        for (final FileStatus status : fileStatuses) {
-            final Map<String, String> attributes = createAttributes(status);
-            FlowFile flowFile = session.create();
-            flowFile = session.putAllAttributes(flowFile, attributes);
-            session.transfer(flowFile, getSuccessRelationship());
-        }
-    }
-
-    private void createRecords(final Set<FileStatus> fileStatuses, final 
ProcessContext context, final ProcessSession session) throws IOException, 
SchemaNotFoundException {
-        final RecordSetWriterFactory writerFactory = 
context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
-
-        FlowFile flowFile = session.create();
-        final WriteResult writeResult;
-        try (final OutputStream out = session.write(flowFile);
-             final RecordSetWriter recordSetWriter = 
writerFactory.createWriter(getLogger(), getRecordSchema(), out, 
Collections.emptyMap())) {
-
-            recordSetWriter.beginRecordSet();
-            for (final FileStatus fileStatus : fileStatuses) {
-                final Record record = createRecord(fileStatus);
-                recordSetWriter.write(record);
+            final HadoopFileStatusWriter writer;
+            if (writerFactory == null) {
+                writer = new FlowFileObjectWriter(session, fileStatuses, 
minimumAge, maximumAge, pathFilter, fileStatusManager, latestTimestamp, 
latestFiles);
+            } else {
+                writer = new RecordObjectWriter(session, fileStatuses, 
minimumAge, maximumAge, pathFilter, fileStatusManager, latestTimestamp,
+                        latestFiles, writerFactory, getLogger());
             }
 
-            writeResult = recordSetWriter.finishRecordSet();
-        }
+            writer.write();
 
-        final Map<String, String> attributes = new 
HashMap<>(writeResult.getAttributes());
-        attributes.put("record.count", 
String.valueOf(writeResult.getRecordCount()));
-        flowFile = session.putAllAttributes(flowFile, attributes);
-
-        session.transfer(flowFile, getSuccessRelationship());
-    }
-
-    private Record createRecord(final FileStatus fileStatus) {
-        final Map<String, Object> values = new HashMap<>();
-        values.put(FILENAME, fileStatus.getPath().getName());
-        values.put(PATH, getAbsolutePath(fileStatus.getPath().getParent()));
-        values.put(OWNER, fileStatus.getOwner());
-        values.put(GROUP, fileStatus.getGroup());
-        values.put(LAST_MODIFIED, new 
Timestamp(fileStatus.getModificationTime()));
-        values.put(SIZE, fileStatus.getLen());
-        values.put(REPLICATION, fileStatus.getReplication());
-
-        final FsPermission permission = fileStatus.getPermission();
-        final String perms = getPerms(permission.getUserAction()) + 
getPerms(permission.getGroupAction()) + getPerms(permission.getOtherAction());
-        values.put(PERMISSIONS, perms);
-
-        values.put(IS_DIRECTORY, fileStatus.isDirectory());
-        values.put(IS_SYM_LINK, fileStatus.isSymlink());
-        values.put(IS_ENCRYPTED, fileStatus.isEncrypted());
-        values.put(IS_ERASURE_CODED, fileStatus.isErasureCoded());
-
-        return new MapRecord(getRecordSchema(), values);
-    }
-
-    private RecordSchema getRecordSchema() {
-        return RECORD_SCHEMA;
-    }
+            getLogger().debug("Found a total of {} files in HDFS, {} are 
listed", fileStatuses.getTotalFileCount(), writer.getListedFileCount());
 
-    private Set<FileStatus> getStatuses(final Path path, final boolean 
recursive, final FileSystem hdfs, final PathFilter filter, String filterMode) 
throws IOException, InterruptedException {
-        final Set<FileStatus> statusSet = new HashSet<>();
-
-        getLogger().debug("Fetching listing for {}", new Object[] {path});
-        final FileStatus[] statuses;
-        if (isPostListingFilterNeeded(filterMode)) {
-            // For this filter mode, the filter is not passed to listStatus, 
so that directory names will not be
-            // filtered out when the listing is recursive.
-            statuses = 
getUserGroupInformation().doAs((PrivilegedExceptionAction<FileStatus[]>) () -> 
hdfs.listStatus(path));
-        } else {
-            statuses = 
getUserGroupInformation().doAs((PrivilegedExceptionAction<FileStatus[]>) () -> 
hdfs.listStatus(path, filter));
-        }
-
-        for ( final FileStatus status : statuses ) {
-            if ( status.isDirectory() ) {
-                if ( recursive ) {
-                    try {
-                        statusSet.addAll(getStatuses(status.getPath(), 
recursive, hdfs, filter, filterMode));
-                    } catch (final IOException ioe) {
-                        getLogger().error("Failed to retrieve HDFS listing for 
subdirectory {} due to {}; will continue listing others", new Object[] 
{status.getPath(), ioe});
-                    }
+            if (writer.getListedFileCount() > 0) {
+                final Map<String, String> updatedState = new HashMap<>();
+                updatedState.put(LATEST_TIMESTAMP_KEY, 
String.valueOf(fileStatusManager.getCurrentLatestTimestamp()));
+                final List<String> files = 
fileStatusManager.getCurrentLatestFiles();
+                for (int i = 0; i < files.size(); i++) {
+                    final String currentFilePath = files.get(i);
+                    updatedState.put(String.format(LATEST_FILES_KEY, i), 
currentFilePath);
                 }
+                getLogger().debug("New state map: {}", updatedState);
+                updateState(session, updatedState);
+
+                getLogger().info("Successfully created listing with {} new 
files from HDFS", writer.getListedFileCount());
             } else {
-                if (isPostListingFilterNeeded(filterMode)) {
-                    // Filtering explicitly performed here, since it was not 
able to be done when calling listStatus.
-                    if (filter.accept(status.getPath())) {
-                        statusSet.add(status);
-                    }
-                } else {
-                    statusSet.add(status);
-                }
+                getLogger().debug("There is no data to list. Yielding.");
+                context.yield();
             }
+        } catch (IOException e) {
+            throw new ProcessException("IO error occurred when closing HDFS 
file system", e);
         }
-
-        return statusSet;
-    }
-
-    /**
-     * Determines if filtering needs to be applied, after calling {@link 
FileSystem#listStatus(Path)}, based on the
-     * given filter mode.
-     * Filter modes that need to be able to search directories regardless of 
the given filter should return true.
-     * FILTER_MODE_FILES_ONLY and FILTER_MODE_FULL_PATH require that {@link 
FileSystem#listStatus(Path)} be invoked
-     * without a filter so that all directories can be traversed when 
filtering with these modes.
-     * FILTER_MODE_DIRECTORIES_AND_FILES should return false, since filtering 
can be applied directly with
-     * {@link FileSystem#listStatus(Path, PathFilter)} regardless of a 
recursive listing.
-     * @param filterMode the value of one of the defined AllowableValues 
representing filter modes
-     * @return true if results need to be filtered, false otherwise
-     */
-    private boolean isPostListingFilterNeeded(String filterMode) {
-        return filterMode.equals(FILTER_MODE_FILES_ONLY) || 
filterMode.equals(FILTER_MODE_FULL_PATH);
-    }
-
-    private String getAbsolutePath(final Path path) {
-        final Path parent = path.getParent();
-        final String prefix = (parent == null || parent.getName().equals("")) 
? "" : getAbsolutePath(parent);
-        return prefix + "/" + path.getName();
-    }
-
-    private Map<String, String> createAttributes(final FileStatus status) {
-        final Map<String, String> attributes = new HashMap<>();
-        attributes.put(CoreAttributes.FILENAME.key(), 
status.getPath().getName());
-        attributes.put(CoreAttributes.PATH.key(), 
getAbsolutePath(status.getPath().getParent()));
-
-        attributes.put(getAttributePrefix() + ".owner", status.getOwner());
-        attributes.put(getAttributePrefix() + ".group", status.getGroup());
-        attributes.put(getAttributePrefix() + ".lastModified", 
String.valueOf(status.getModificationTime()));
-        attributes.put(getAttributePrefix() + ".length", 
String.valueOf(status.getLen()));
-        attributes.put(getAttributePrefix() + ".replication", 
String.valueOf(status.getReplication()));
-
-        final FsPermission permission = status.getPermission();
-        final String perms = getPerms(permission.getUserAction()) + 
getPerms(permission.getGroupAction()) + getPerms(permission.getOtherAction());
-        attributes.put(getAttributePrefix() + ".permissions", perms);
-        return attributes;
-    }
-
-    private String getPerms(final FsAction action) {
-        final StringBuilder sb = new StringBuilder();
-        if (action.implies(FsAction.READ)) {
-            sb.append("r");
-        } else {
-            sb.append("-");
-        }
-
-        if (action.implies(FsAction.WRITE)) {
-            sb.append("w");
-        } else {
-            sb.append("-");
-        }
-
-        if (action.implies(FsAction.EXECUTE)) {
-            sb.append("x");
-        } else {
-            sb.append("-");
-        }
-
-        return sb.toString();
     }
 
     private PathFilter createPathFilter(final ProcessContext context) {
-        final String filterMode = 
context.getProperty(FILE_FILTER_MODE).getValue();
-        return path -> {
-            final boolean accepted;
-            if (FILTER_FULL_PATH_VALUE.getValue().equals(filterMode)) {
-                accepted = 
fileFilterRegexPattern.matcher(path.toString()).matches()
+        final FilterMode filterMode = 
FilterMode.forName(context.getProperty(FILE_FILTER_MODE).getValue());
+        final boolean recursive = 
context.getProperty(RECURSE_SUBDIRS).asBoolean();
+
+        switch (filterMode) {
+            case FILTER_MODE_FILES_ONLY:
+                return path -> 
fileFilterRegexPattern.matcher(path.getName()).matches();
+            case FILTER_MODE_FULL_PATH:
+                return path -> 
fileFilterRegexPattern.matcher(path.toString()).matches()
                         || 
fileFilterRegexPattern.matcher(Path.getPathWithoutSchemeAndAuthority(path).toString()).matches();
-            } else {
-                accepted =  
fileFilterRegexPattern.matcher(path.getName()).matches();
-            }
-            return accepted;
-        };
+            // FILTER_DIRECTORIES_AND_FILES
+            default:
+                return path -> Stream.of(path.toString().split("/"))

Review Comment:
   This case is not handling the path properly. It is working on the entire 
url. This can lead to dropping files that should be listed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to