turcsanyip commented on code in PR #7240:
URL: https://github.com/apache/nifi/pull/7240#discussion_r1226928846


##########
nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java:
##########
@@ -319,131 +240,36 @@ public void onPropertyModified(final PropertyDescriptor 
descriptor, final String
         }
     }
 
-    /**
-     * Determines which of the given FileStatus's describes a File that should 
be listed.
-     *
-     * @param statuses the eligible FileStatus objects that we could 
potentially list
-     * @param context processor context with properties values
-     * @return a Set containing only those FileStatus objects that we want to 
list
-     */
-    Set<FileStatus> determineListable(final Set<FileStatus> statuses, 
ProcessContext context) {
-        final long minTimestamp = this.latestTimestampListed;
-        final TreeMap<Long, List<FileStatus>> orderedEntries = new TreeMap<>();
-
-        final Long minAgeProp = 
context.getProperty(MIN_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
-        // NIFI-4144 - setting to MIN_VALUE so that in case the file 
modification time is in
-        // the future relative to the nifi instance, files are not skipped.
-        final long minimumAge = (minAgeProp == null) ? Long.MIN_VALUE : 
minAgeProp;
-        final Long maxAgeProp = 
context.getProperty(MAX_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
-        final long maximumAge = (maxAgeProp == null) ? Long.MAX_VALUE : 
maxAgeProp;
-
-        // Build a sorted map to determine the latest possible entries
-        for (final FileStatus status : statuses) {
-            if (status.getPath().getName().endsWith("_COPYING_")) {
-                continue;
-            }
-
-            final long fileAge = System.currentTimeMillis() - 
status.getModificationTime();
-            if (minimumAge > fileAge || fileAge > maximumAge) {
-                continue;
-            }
-
-            final long entityTimestamp = status.getModificationTime();
-
-            if (entityTimestamp > latestTimestampListed) {
-                latestTimestampListed = entityTimestamp;
-            }
-
-            // New entries are all those that occur at or after the associated 
timestamp
-            final boolean newEntry = entityTimestamp >= minTimestamp && 
entityTimestamp > latestTimestampEmitted;
-
-            if (newEntry) {
-                List<FileStatus> entitiesForTimestamp = 
orderedEntries.get(status.getModificationTime());
-                if (entitiesForTimestamp == null) {
-                    entitiesForTimestamp = new ArrayList<FileStatus>();
-                    orderedEntries.put(status.getModificationTime(), 
entitiesForTimestamp);
-                }
-                entitiesForTimestamp.add(status);
-            }
-        }
-
-        final Set<FileStatus> toList = new HashSet<>();
-
-        if (orderedEntries.size() > 0) {
-            long latestListingTimestamp = orderedEntries.lastKey();
-
-            // If the last listing time is equal to the newest entries 
previously seen,
-            // another iteration has occurred without new files and special 
handling is needed to avoid starvation
-            if (latestListingTimestamp == minTimestamp) {
-                // We are done if the latest listing timestamp is equal to the 
last processed time,
-                // meaning we handled those items originally passed over
-                if (latestListingTimestamp == latestTimestampEmitted) {
-                    return Collections.emptySet();
-                }
-            } else {
-                // Otherwise, newest entries are held back one cycle to avoid 
issues in writes occurring exactly when the listing is being performed to avoid 
missing data
-                orderedEntries.remove(latestListingTimestamp);
-            }
-
-            for (List<FileStatus> timestampEntities : orderedEntries.values()) 
{
-                for (FileStatus status : timestampEntities) {
-                    toList.add(status);
-                }
-            }
-        }
-
-        return toList;
-    }
-
     @OnScheduled
     public void resetStateIfNecessary(final ProcessContext context) throws 
IOException {
         if (resetState) {
-            getLogger().debug("Property has been modified. Resetting the state 
values - listing.timestamp and emitted.timestamp to -1L");
+            getLogger().debug("Property has been modified. Resetting the state 
values.");
             context.getStateManager().clear(Scope.CLUSTER);
             this.resetState = false;
         }
     }
 
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
-        // We have to ensure that we don't continually perform listings, 
because if we perform two listings within
-        // the same millisecond, our algorithm for comparing timestamps will 
not work. So we ensure here that we do
-        // not let that happen.
-        final long now = System.nanoTime();
-        if (now - lastRunTimestamp < LISTING_LAG_NANOS) {
-            lastRunTimestamp = now;
-            context.yield();
-            return;
-        }
-        lastRunTimestamp = now;
-
         // Ensure that we are using the latest listing information before we 
try to perform a listing of HDFS files.
         try {
-            final StateMap stateMap = session.getState(Scope.CLUSTER);
-            if (!stateMap.getStateVersion().isPresent()) {
-                latestTimestampEmitted = -1L;
-                latestTimestampListed = -1L;
-                getLogger().debug("Found no state stored");
-            } else {
-                // Determine if state is stored in the 'new' format or the 
'old' format
-                final String emittedString = 
stateMap.get(EMITTED_TIMESTAMP_KEY);
-                if (emittedString == null) {
-                    latestTimestampEmitted = -1L;
-                    latestTimestampListed = -1L;
-                    getLogger().debug("Found no recognized state keys; 
assuming no relevant state and resetting listing/emitted time to -1");
-                } else {
-                    // state is stored in the new format, using just two 
timestamps
-                    latestTimestampEmitted = Long.parseLong(emittedString);
-                    final String listingTimestmapString = 
stateMap.get(LISTING_TIMESTAMP_KEY);
-                    if (listingTimestmapString != null) {
-                        latestTimestampListed = 
Long.parseLong(listingTimestmapString);
-                    }
-
-                    getLogger().debug("Found new-style state stored, latesting 
timestamp emitted = {}, latest listed = {}",
-                        new Object[] {latestTimestampEmitted, 
latestTimestampListed});
-                }
+            latestModifiedStatuses = new ArrayList<>();
+            StateMap stateMap = session.getState(Scope.CLUSTER);
+            String latestListedTimestampString = 
stateMap.get(LATEST_TIMESTAMP_KEY);
+            String latestFiles = stateMap.get(LATEST_FILES_KEY);
+
+            final String legacyLatestListingTimestampString = 
stateMap.get(LEGACY_LISTING_TIMESTAMP_KEY);
+            final String legacyLatestEmittedTimestampString = 
stateMap.get(LEGACY_EMITTED_TIMESTAMP_KEY);
+
+            if (legacyLatestListingTimestampString != null) {
+                final long legacyLatestListingTimestamp = 
Long.parseLong(legacyLatestListingTimestampString);
+                final long legacyLatestEmittedTimestamp = 
Long.parseLong(legacyLatestEmittedTimestampString);
+                latestModificationTime = legacyLatestListingTimestamp == 
legacyLatestEmittedTimestamp ? legacyLatestListingTimestamp + 1 : 
legacyLatestListingTimestamp;
+            } else if (latestListedTimestampString != null) {
+                latestModificationTime = 
Long.parseLong(latestListedTimestampString);
+                latestModifiedStatuses = new 
ArrayList<>(Arrays.asList(latestFiles.split("\\s")));

Review Comment:
   Are we sure that file and directory names cannot contain spaces within the 
path?
   Space separated paths may not work, I'm afraid.
   Just an idea but other processors add the paths in separate entries in the 
state like `latest.file.1`, `latest.file.2`, etc. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to