tpalfy commented on code in PR #7240: URL: https://github.com/apache/nifi/pull/7240#discussion_r1253186098
########## nifi-nar-bundles/nifi-hadoop-bundle/nifi-hdfs-processors/src/main/java/org/apache/nifi/processors/hadoop/ListHDFS.java: ########## @@ -286,392 +200,141 @@ protected Collection<ValidationResult> customValidate(ValidationContext context) problems.add(new ValidationResult.Builder().valid(false).subject("GetHDFS Configuration") .explanation(MIN_AGE.getDisplayName() + " cannot be greater than " + MAX_AGE.getDisplayName()).build()); } - return problems; } - protected String getKey(final String directory) { - return getIdentifier() + ".lastListingTime." + directory; - } - @Override public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) { super.onPropertyModified(descriptor, oldValue, newValue); if (isConfigurationRestored() && (descriptor.equals(DIRECTORY) || descriptor.equals(FILE_FILTER))) { - this.resetState = true; + resetState = true; } } - /** - * Determines which of the given FileStatus's describes a File that should be listed. - * - * @param statuses the eligible FileStatus objects that we could potentially list - * @param context processor context with properties values - * @return a Set containing only those FileStatus objects that we want to list - */ - Set<FileStatus> determineListable(final Set<FileStatus> statuses, ProcessContext context) { - final long minTimestamp = this.latestTimestampListed; - final TreeMap<Long, List<FileStatus>> orderedEntries = new TreeMap<>(); - - final Long minAgeProp = context.getProperty(MIN_AGE).asTimePeriod(TimeUnit.MILLISECONDS); - // NIFI-4144 - setting to MIN_VALUE so that in case the file modification time is in - // the future relative to the nifi instance, files are not skipped. - final long minimumAge = (minAgeProp == null) ? Long.MIN_VALUE : minAgeProp; - final Long maxAgeProp = context.getProperty(MAX_AGE).asTimePeriod(TimeUnit.MILLISECONDS); - final long maximumAge = (maxAgeProp == null) ? Long.MAX_VALUE : maxAgeProp; - - // Build a sorted map to determine the latest possible entries - for (final FileStatus status : statuses) { - if (status.getPath().getName().endsWith("_COPYING_")) { - continue; - } - - final long fileAge = System.currentTimeMillis() - status.getModificationTime(); - if (minimumAge > fileAge || fileAge > maximumAge) { - continue; - } - - final long entityTimestamp = status.getModificationTime(); - - if (entityTimestamp > latestTimestampListed) { - latestTimestampListed = entityTimestamp; - } - - // New entries are all those that occur at or after the associated timestamp - final boolean newEntry = entityTimestamp >= minTimestamp && entityTimestamp > latestTimestampEmitted; - - if (newEntry) { - List<FileStatus> entitiesForTimestamp = orderedEntries.get(status.getModificationTime()); - if (entitiesForTimestamp == null) { - entitiesForTimestamp = new ArrayList<FileStatus>(); - orderedEntries.put(status.getModificationTime(), entitiesForTimestamp); - } - entitiesForTimestamp.add(status); - } - } - - final Set<FileStatus> toList = new HashSet<>(); - - if (orderedEntries.size() > 0) { - long latestListingTimestamp = orderedEntries.lastKey(); - - // If the last listing time is equal to the newest entries previously seen, - // another iteration has occurred without new files and special handling is needed to avoid starvation - if (latestListingTimestamp == minTimestamp) { - // We are done if the latest listing timestamp is equal to the last processed time, - // meaning we handled those items originally passed over - if (latestListingTimestamp == latestTimestampEmitted) { - return Collections.emptySet(); - } - } else { - // Otherwise, newest entries are held back one cycle to avoid issues in writes occurring exactly when the listing is being performed to avoid missing data - orderedEntries.remove(latestListingTimestamp); - } - - for (List<FileStatus> timestampEntities : orderedEntries.values()) { - for (FileStatus status : timestampEntities) { - toList.add(status); - } - } - } - - return toList; - } - @OnScheduled public void resetStateIfNecessary(final ProcessContext context) throws IOException { if (resetState) { - getLogger().debug("Property has been modified. Resetting the state values - listing.timestamp and emitted.timestamp to -1L"); + getLogger().debug("Property has been modified. Resetting the state values."); context.getStateManager().clear(Scope.CLUSTER); - this.resetState = false; + resetState = false; } } @Override public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { - // We have to ensure that we don't continually perform listings, because if we perform two listings within - // the same millisecond, our algorithm for comparing timestamps will not work. So we ensure here that we do - // not let that happen. - final long now = System.nanoTime(); - if (now - lastRunTimestamp < LISTING_LAG_NANOS) { - lastRunTimestamp = now; - context.yield(); - return; - } - lastRunTimestamp = now; - // Ensure that we are using the latest listing information before we try to perform a listing of HDFS files. try { - final StateMap stateMap = session.getState(Scope.CLUSTER); - if (!stateMap.getStateVersion().isPresent()) { - latestTimestampEmitted = -1L; - latestTimestampListed = -1L; - getLogger().debug("Found no state stored"); - } else { - // Determine if state is stored in the 'new' format or the 'old' format - final String emittedString = stateMap.get(EMITTED_TIMESTAMP_KEY); - if (emittedString == null) { - latestTimestampEmitted = -1L; - latestTimestampListed = -1L; - getLogger().debug("Found no recognized state keys; assuming no relevant state and resetting listing/emitted time to -1"); - } else { - // state is stored in the new format, using just two timestamps - latestTimestampEmitted = Long.parseLong(emittedString); - final String listingTimestmapString = stateMap.get(LISTING_TIMESTAMP_KEY); - if (listingTimestmapString != null) { - latestTimestampListed = Long.parseLong(listingTimestmapString); - } - - getLogger().debug("Found new-style state stored, latesting timestamp emitted = {}, latest listed = {}", - new Object[] {latestTimestampEmitted, latestTimestampListed}); - } + latestTimestamp = 0L; + latestFiles = new ArrayList<>(); Review Comment: Also I think it would help the reader understand better and easier the 3 possible scenarios if these were final and set explicitly in all 3 cases (legacy, non-legacy, first run). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org