tpalfy commented on code in PR #7240:
URL: https://github.com/apache/nifi/pull/7240#discussion_r1253194313


##########
nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java:
##########
@@ -286,392 +200,141 @@ protected Collection<ValidationResult> 
customValidate(ValidationContext context)
             problems.add(new 
ValidationResult.Builder().valid(false).subject("GetHDFS Configuration")
                     .explanation(MIN_AGE.getDisplayName() + " cannot be 
greater than " + MAX_AGE.getDisplayName()).build());
         }
-
         return problems;
     }
 
-    protected String getKey(final String directory) {
-        return getIdentifier() + ".lastListingTime." + directory;
-    }
-
     @Override
     public void onPropertyModified(final PropertyDescriptor descriptor, final 
String oldValue, final String newValue) {
         super.onPropertyModified(descriptor, oldValue, newValue);
         if (isConfigurationRestored() && (descriptor.equals(DIRECTORY) || 
descriptor.equals(FILE_FILTER))) {
-            this.resetState = true;
+            resetState = true;
         }
     }
 
-    /**
-     * Determines which of the given FileStatus's describes a File that should 
be listed.
-     *
-     * @param statuses the eligible FileStatus objects that we could 
potentially list
-     * @param context processor context with properties values
-     * @return a Set containing only those FileStatus objects that we want to 
list
-     */
-    Set<FileStatus> determineListable(final Set<FileStatus> statuses, 
ProcessContext context) {
-        final long minTimestamp = this.latestTimestampListed;
-        final TreeMap<Long, List<FileStatus>> orderedEntries = new TreeMap<>();
-
-        final Long minAgeProp = 
context.getProperty(MIN_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
-        // NIFI-4144 - setting to MIN_VALUE so that in case the file 
modification time is in
-        // the future relative to the nifi instance, files are not skipped.
-        final long minimumAge = (minAgeProp == null) ? Long.MIN_VALUE : 
minAgeProp;
-        final Long maxAgeProp = 
context.getProperty(MAX_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
-        final long maximumAge = (maxAgeProp == null) ? Long.MAX_VALUE : 
maxAgeProp;
-
-        // Build a sorted map to determine the latest possible entries
-        for (final FileStatus status : statuses) {
-            if (status.getPath().getName().endsWith("_COPYING_")) {
-                continue;
-            }
-
-            final long fileAge = System.currentTimeMillis() - 
status.getModificationTime();
-            if (minimumAge > fileAge || fileAge > maximumAge) {
-                continue;
-            }
-
-            final long entityTimestamp = status.getModificationTime();
-
-            if (entityTimestamp > latestTimestampListed) {
-                latestTimestampListed = entityTimestamp;
-            }
-
-            // New entries are all those that occur at or after the associated 
timestamp
-            final boolean newEntry = entityTimestamp >= minTimestamp && 
entityTimestamp > latestTimestampEmitted;
-
-            if (newEntry) {
-                List<FileStatus> entitiesForTimestamp = 
orderedEntries.get(status.getModificationTime());
-                if (entitiesForTimestamp == null) {
-                    entitiesForTimestamp = new ArrayList<FileStatus>();
-                    orderedEntries.put(status.getModificationTime(), 
entitiesForTimestamp);
-                }
-                entitiesForTimestamp.add(status);
-            }
-        }
-
-        final Set<FileStatus> toList = new HashSet<>();
-
-        if (orderedEntries.size() > 0) {
-            long latestListingTimestamp = orderedEntries.lastKey();
-
-            // If the last listing time is equal to the newest entries 
previously seen,
-            // another iteration has occurred without new files and special 
handling is needed to avoid starvation
-            if (latestListingTimestamp == minTimestamp) {
-                // We are done if the latest listing timestamp is equal to the 
last processed time,
-                // meaning we handled those items originally passed over
-                if (latestListingTimestamp == latestTimestampEmitted) {
-                    return Collections.emptySet();
-                }
-            } else {
-                // Otherwise, newest entries are held back one cycle to avoid 
issues in writes occurring exactly when the listing is being performed to avoid 
missing data
-                orderedEntries.remove(latestListingTimestamp);
-            }
-
-            for (List<FileStatus> timestampEntities : orderedEntries.values()) 
{
-                for (FileStatus status : timestampEntities) {
-                    toList.add(status);
-                }
-            }
-        }
-
-        return toList;
-    }
-
     @OnScheduled
     public void resetStateIfNecessary(final ProcessContext context) throws 
IOException {
         if (resetState) {
-            getLogger().debug("Property has been modified. Resetting the state 
values - listing.timestamp and emitted.timestamp to -1L");
+            getLogger().debug("Property has been modified. Resetting the state 
values.");
             context.getStateManager().clear(Scope.CLUSTER);
-            this.resetState = false;
+            resetState = false;
         }
     }
 
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
-        // We have to ensure that we don't continually perform listings, 
because if we perform two listings within
-        // the same millisecond, our algorithm for comparing timestamps will 
not work. So we ensure here that we do
-        // not let that happen.
-        final long now = System.nanoTime();
-        if (now - lastRunTimestamp < LISTING_LAG_NANOS) {
-            lastRunTimestamp = now;
-            context.yield();
-            return;
-        }
-        lastRunTimestamp = now;
-
         // Ensure that we are using the latest listing information before we 
try to perform a listing of HDFS files.
         try {
-            final StateMap stateMap = session.getState(Scope.CLUSTER);
-            if (!stateMap.getStateVersion().isPresent()) {
-                latestTimestampEmitted = -1L;
-                latestTimestampListed = -1L;
-                getLogger().debug("Found no state stored");
-            } else {
-                // Determine if state is stored in the 'new' format or the 
'old' format
-                final String emittedString = 
stateMap.get(EMITTED_TIMESTAMP_KEY);
-                if (emittedString == null) {
-                    latestTimestampEmitted = -1L;
-                    latestTimestampListed = -1L;
-                    getLogger().debug("Found no recognized state keys; 
assuming no relevant state and resetting listing/emitted time to -1");
-                } else {
-                    // state is stored in the new format, using just two 
timestamps
-                    latestTimestampEmitted = Long.parseLong(emittedString);
-                    final String listingTimestmapString = 
stateMap.get(LISTING_TIMESTAMP_KEY);
-                    if (listingTimestmapString != null) {
-                        latestTimestampListed = 
Long.parseLong(listingTimestmapString);
-                    }
-
-                    getLogger().debug("Found new-style state stored, latesting 
timestamp emitted = {}, latest listed = {}",
-                        new Object[] {latestTimestampEmitted, 
latestTimestampListed});
-                }
+            latestTimestamp = 0L;
+            latestFiles = new ArrayList<>();
+            StateMap stateMap = session.getState(Scope.CLUSTER);
+            String latestTimestampString = stateMap.get(LATEST_TIMESTAMP_KEY);
+
+            final String legacyLatestListingTimestampString = 
stateMap.get(LEGACY_LISTING_TIMESTAMP_KEY);
+            final String legacyLatestEmittedTimestampString = 
stateMap.get(LEGACY_EMITTED_TIMESTAMP_KEY);
+
+            if (legacyLatestListingTimestampString != null) {
+                final long legacyLatestListingTimestamp = 
Long.parseLong(legacyLatestListingTimestampString);
+                final long legacyLatestEmittedTimestamp = 
Long.parseLong(legacyLatestEmittedTimestampString);
+                latestTimestamp = legacyLatestListingTimestamp == 
legacyLatestEmittedTimestamp ? legacyLatestListingTimestamp + 1 : 
legacyLatestListingTimestamp;
+                getLogger().debug("Transitioned from legacy state to new 
state. 'legacyLatestListingTimestamp': {}, 'legacyLatestEmittedTimeStamp': 
{}'," +
+                        "'latestTimestamp': {}", legacyLatestListingTimestamp, 
legacyLatestEmittedTimestamp, latestTimestamp);
+            } else if (latestTimestampString != null) {
+                latestTimestamp = Long.parseLong(latestTimestampString);
+                this.latestFiles = stateMap.toMap().entrySet().stream()
+                        .filter(entry -> 
entry.getKey().startsWith("latest.file"))
+                        .map(Map.Entry::getValue)
+                        .collect(Collectors.toList());
             }
-        } catch (final IOException ioe) {
+        } catch (IOException e) {
             getLogger().error("Failed to retrieve timestamp of last listing 
from the State Manager. Will not perform listing until this is accomplished.");
             context.yield();
             return;
         }
 
         // Pull in any file that is newer than the timestamp that we have.
-        final FileSystem hdfs = getFileSystem();
-        final boolean recursive = 
context.getProperty(RECURSE_SUBDIRS).asBoolean();
-        String fileFilterMode = 
context.getProperty(FILE_FILTER_MODE).getValue();
+        try (final FileSystem hdfs = getFileSystem()) {
+            final boolean recursive = 
context.getProperty(RECURSE_SUBDIRS).asBoolean();
+            final PathFilter pathFilter = createPathFilter(context);
+            final RecordSetWriterFactory writerFactory = 
context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
 
-        final Set<FileStatus> statuses;
-        try {
+            final FileStatusManager fileStatusManager = new 
FileStatusManager();

Review Comment:
   I agree with @turcsanyip except the last part. Conceptually I would advise 
against initializing a final field from a non-final running field, even if it's 
hasn't changed at that point yet. I would keep those as parameters in the the 
object writer constructor.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to