tpalfy commented on a change in pull request #4721:
URL: https://github.com/apache/nifi/pull/4721#discussion_r542476249



##########
File path: 
nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/AbstractListProcessor.java
##########
@@ -431,11 +482,157 @@ public void onTrigger(final ProcessContext context, 
final ProcessSession session
         } else if (BY_ENTITIES.equals(listingStrategy)) {
             listByTrackingEntities(context, session);
 
+        } else if (BY_ADJUSTED_TIME_WINDOW.equals(listingStrategy)) {
+            listByAdjustedSlidingTimeWindow(context, session);
+
         } else {
             throw new ProcessException("Unknown listing strategy: " + 
listingStrategy);
         }
     }
 
+    public void listByAdjustedSlidingTimeWindow(final ProcessContext context, 
final ProcessSession session) throws ProcessException {
+        if (this.lastListedLatestEntryTimestampMillis == null || 
justElectedPrimaryNode) {
+            try {
+                final StateMap stateMap = 
context.getStateManager().getState(getStateScope(context));
+                
Optional.ofNullable(stateMap.get(LATEST_LISTED_ENTRY_TIMESTAMP_KEY))
+                    .map(Long::parseLong)
+                    .ifPresent(lastTimestamp -> 
this.lastListedLatestEntryTimestampMillis = lastTimestamp);
+
+                justElectedPrimaryNode = false;
+            } catch (final IOException ioe) {
+                getLogger().error("Failed to retrieve timestamp of last 
listing from the State Manager. Will not perform listing until this is 
accomplished.");
+                context.yield();
+                return;
+            }
+        }
+
+        long lowerBoundInclusiveTimestamp = 
Optional.ofNullable(this.lastListedLatestEntryTimestampMillis).orElse(0L);
+        long upperBoundExclusiveTimestamp;
+
+        long currentTime = getCurrentTime();
+
+        final TreeMap<Long, List<T>> orderedEntries = new TreeMap<>();
+        try {
+            List<T> entityList = performListing(context, 
lowerBoundInclusiveTimestamp);
+
+            boolean targetSystemHasMilliseconds = false;
+            boolean targetSystemHasSeconds = false;
+            for (final T entity : entityList) {

Review comment:
       I considered it and decided to leave it as it. I rarely do this but in 
this case it would be either too risky or would require significant effort to 
add additional proper test coverage.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to