simonbence commented on a change in pull request #4721: URL: https://github.com/apache/nifi/pull/4721#discussion_r540944725
########## File path: nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/AbstractListProcessor.java ########## @@ -207,6 +225,39 @@ .defaultValue(BY_TIMESTAMPS.getValue()) .build(); + public static final PropertyDescriptor TIME_ADJUSTMENT = new Builder() + .name("time-adjustment") + .displayName("Time Adjustment") + .description("If the system hosting the files is in a different time zone than NiFi, either it's timezone or the numerical difference should be set here." + + " If a timezone is specified, NiFi tries to calculate the time difference." + + " If a numeric value is set, its value can be either a single integer (milliseconds) or in HH:mm/HH:mm:ss format." + + " EXAMPLE: NiFi is hosted in UTC, File Server is hosted in EST. In this case 'Time Adjustment' value should be -05:00:00 or 18000000." + Review comment: I suggest to use file source instead of File Server as the implementations of this abstract class possibly works with the local file system. File Server suggests there is an additional concept not introduced in the documentation. Note: I get that, this property is used only in case of ListFTP and ListSFTP and it is a necessity coming from the structure that this is here and not in ListFileTransfer but as it is in the common place, and no remark about the this ########## File path: nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/AbstractListProcessor.java ########## @@ -207,6 +225,39 @@ .defaultValue(BY_TIMESTAMPS.getValue()) .build(); + public static final PropertyDescriptor TIME_ADJUSTMENT = new Builder() + .name("time-adjustment") + .displayName("Time Adjustment") + .description("If the system hosting the files is in a different time zone than NiFi, either it's timezone or the numerical difference should be set here." + + " If a timezone is specified, NiFi tries to calculate the time difference." + + " If a numeric value is set, its value can be either a single integer (milliseconds) or in HH:mm/HH:mm:ss format." + + " EXAMPLE: NiFi is hosted in UTC, File Server is hosted in EST. In this case 'Time Adjustment' value should be -05:00:00 or 18000000." + + " If the locations were reversed i.e. NiFi is hosted in EST, File Server is hosted in UTC, the value should be 05:00:00 or 18000000." + + " NOTE: Any mid-year changes (due to daylight saving for example) requires manual re-adjustment in this case." + ) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .required(false) + .addValidator(new Validator() { + Pattern signed_integer_or_signed_HHmm_or_HHmmss = Pattern.compile("-?(\\d{2}:\\d{2}(:\\d{2})?)|-?\\d+"); Review comment: Minor: this could go into a constant to keep the validator code slightly smaller and cleaner ########## File path: nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/AbstractListProcessor.java ########## @@ -431,11 +482,157 @@ public void onTrigger(final ProcessContext context, final ProcessSession session } else if (BY_ENTITIES.equals(listingStrategy)) { listByTrackingEntities(context, session); + } else if (BY_ADJUSTED_TIME_WINDOW.equals(listingStrategy)) { + listByAdjustedSlidingTimeWindow(context, session); + } else { throw new ProcessException("Unknown listing strategy: " + listingStrategy); } } + public void listByAdjustedSlidingTimeWindow(final ProcessContext context, final ProcessSession session) throws ProcessException { + if (this.lastListedLatestEntryTimestampMillis == null || justElectedPrimaryNode) { + try { + final StateMap stateMap = context.getStateManager().getState(getStateScope(context)); + Optional.ofNullable(stateMap.get(LATEST_LISTED_ENTRY_TIMESTAMP_KEY)) + .map(Long::parseLong) + .ifPresent(lastTimestamp -> this.lastListedLatestEntryTimestampMillis = lastTimestamp); + + justElectedPrimaryNode = false; + } catch (final IOException ioe) { + getLogger().error("Failed to retrieve timestamp of last listing from the State Manager. Will not perform listing until this is accomplished."); + context.yield(); + return; + } + } + + long lowerBoundInclusiveTimestamp = Optional.ofNullable(this.lastListedLatestEntryTimestampMillis).orElse(0L); + long upperBoundExclusiveTimestamp; + + long currentTime = getCurrentTime(); + + final TreeMap<Long, List<T>> orderedEntries = new TreeMap<>(); + try { + List<T> entityList = performListing(context, lowerBoundInclusiveTimestamp); + + boolean targetSystemHasMilliseconds = false; + boolean targetSystemHasSeconds = false; + for (final T entity : entityList) { + final long entityTimestampMillis = entity.getTimestamp(); + if (!targetSystemHasMilliseconds) { + targetSystemHasMilliseconds = entityTimestampMillis % 1000 > 0; + } + if (!targetSystemHasSeconds) { + targetSystemHasSeconds = entityTimestampMillis % 60_000 > 0; + } + } + + // Determine target system time precision. + String specifiedPrecision = context.getProperty(TARGET_SYSTEM_TIMESTAMP_PRECISION).getValue(); + if (StringUtils.isBlank(specifiedPrecision)) { + // If TARGET_SYSTEM_TIMESTAMP_PRECISION is not supported by the Processor, then specifiedPrecision can be null, instead of its default value. + specifiedPrecision = getDefaultTimePrecision(); + } + final TimeUnit targetSystemTimePrecision + = PRECISION_AUTO_DETECT.getValue().equals(specifiedPrecision) + ? targetSystemHasMilliseconds ? TimeUnit.MILLISECONDS : targetSystemHasSeconds ? TimeUnit.SECONDS : TimeUnit.MINUTES + : PRECISION_MILLIS.getValue().equals(specifiedPrecision) ? TimeUnit.MILLISECONDS + : PRECISION_SECONDS.getValue().equals(specifiedPrecision) ? TimeUnit.SECONDS : TimeUnit.MINUTES; + final Long listingLagMillis = LISTING_LAG_MILLIS.get(targetSystemTimePrecision); + + upperBoundExclusiveTimestamp = getAdjustedCurrentTimestamp(context, currentTime) - listingLagMillis; + + if (getLogger().isTraceEnabled()) { + getLogger().trace("interval: " + lowerBoundInclusiveTimestamp + " - " + upperBoundExclusiveTimestamp); + getLogger().trace("entityList: " + entityList.stream().map(entity -> entity.getName() + "_" + entity.getTimestamp()).collect(Collectors.joining(", "))); + } + entityList + .stream() + .filter(entity -> entity.getTimestamp() >= lowerBoundInclusiveTimestamp) + .filter(entity -> entity.getTimestamp() < upperBoundExclusiveTimestamp) + .forEach(entity -> orderedEntries + .computeIfAbsent(entity.getTimestamp(), __ -> new ArrayList<>()) + .add(entity) + ); + if (getLogger().isTraceEnabled()) { + getLogger().trace("orderedEntries: " + + orderedEntries.values().stream() + .flatMap(List::stream) + .map(entity -> entity.getName() + "_" + entity.getTimestamp()) + .collect(Collectors.joining(", ")) + ); + } + } catch (final IOException e) { + getLogger().error("Failed to perform listing on remote host due to {}", new Object[]{e.getMessage()}, e); + context.yield(); + return; + } + + if (orderedEntries.isEmpty()) { + getLogger().debug("There is no data to list. Yielding."); + context.yield(); + return; + } + + final boolean writerSet = context.getProperty(RECORD_WRITER).isSet(); Review comment: Same question as in case of target precision ########## File path: nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/test/java/org/apache/nifi/processor/util/list/TestAbstractListProcessor.java ########## @@ -105,6 +111,174 @@ public void setup() { @Rule public final TemporaryFolder testFolder = new TemporaryFolder(); + @Test + public void testGetAdjustedCurrentTimestampWith0() throws Exception { + // GIVEN + String timeAdjustment = "0"; + long currentTime = 100; + + long expected = 100; + + // WHEN + // THEN + testGetAdjustedCurrentTimestamp(timeAdjustment, expected, currentTime); + + } + + @Test + public void testGetAdjustedCurrentTimestampWith15() throws Exception { + // GIVEN + String timeAdjustment = "15"; + long currentTime = 100; + + long expected = 100 + 15; + + // WHEN + // THEN + testGetAdjustedCurrentTimestamp(timeAdjustment, expected, currentTime); + + } + + @Test + public void testGetAdjustedCurrentTimestampWithMinus15() throws Exception { + // GIVEN + String timeAdjustment = "-15"; + long currentTime = 100; + + long expected = 100 - 15; + + // WHEN + // THEN + testGetAdjustedCurrentTimestamp(timeAdjustment, expected, currentTime); + + } + + @Test + public void testGetAdjustedCurrentTimestampWithMinus_01_02() throws Exception { + // GIVEN + String timeAdjustment = "-01:02"; + long currentTime = 100; + + long expected = 100 + - 1 * 60 * 60 * 1000 + - 2 * 60 * 1000; + + // WHEN + // THEN + testGetAdjustedCurrentTimestamp(timeAdjustment, expected, currentTime); + } + + @Test + public void testGetAdjustedCurrentTimestampWith_01_02() throws Exception { + // GIVEN + String timeAdjustment = "01:02"; + long currentTime = 100; + + long expected = 100 + + 1 * 60 * 60 * 1000 + + 2 * 60 * 1000; + + // WHEN + // THEN + testGetAdjustedCurrentTimestamp(timeAdjustment, expected, currentTime); + } + + @Test + public void testGetAdjustedCurrentTimestampWith_01_02_34() throws Exception { + // GIVEN + String timeAdjustment = "01:02:34"; + long currentTime = 100; + + long expected = 100 + + 1 * 60 * 60 * 1000 + + 2 * 60 * 1000 + + 34 * 1000; + + // WHEN + // THEN + testGetAdjustedCurrentTimestamp(timeAdjustment, expected, currentTime); + } + + @Test + public void testGetAdjustedCurrentTimestampWithMinus_01_02_34() throws Exception { + // GIVEN + String timeAdjustment = "-01:02:34"; + long currentTime = 100; + + long expected = 100 + - 1 * 60 * 60 * 1000 + - 2 * 60 * 1000 + - 34 * 1000; + + // WHEN + // THEN + testGetAdjustedCurrentTimestamp(timeAdjustment, expected, currentTime); + } + + @Test + public void testGetAdjustedCurrentTimestampWithEST() throws Exception { + // GIVEN + String timeAdjustment = "EST"; + long currentTime = System.currentTimeMillis(); + + TimeZone targetTimeZone = TimeZone.getTimeZone(timeAdjustment); + TimeZone localTimeZone = Calendar.getInstance().getTimeZone(); + long expected = currentTime + targetTimeZone.getOffset(currentTime) - localTimeZone.getOffset(currentTime); + + // WHEN + // THEN + testGetAdjustedCurrentTimestamp(timeAdjustment, expected, currentTime); + } + + private void testGetAdjustedCurrentTimestamp(String timeAdjustment, long expected, final long currentTime) { Review comment: As there are a lot of abstract method needs to be implemented, it might be a possibility to use Mockito.spy ########## File path: nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/AbstractListProcessor.java ########## @@ -207,6 +225,39 @@ .defaultValue(BY_TIMESTAMPS.getValue()) .build(); + public static final PropertyDescriptor TIME_ADJUSTMENT = new Builder() Review comment: This property appears second in the property list, before hostname, port, etc. As this is an optional parameter used in certain cases, I think it should be somewhere in the bottom part of the list. ########## File path: nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/AbstractListProcessor.java ########## @@ -431,11 +482,157 @@ public void onTrigger(final ProcessContext context, final ProcessSession session } else if (BY_ENTITIES.equals(listingStrategy)) { listByTrackingEntities(context, session); + } else if (BY_ADJUSTED_TIME_WINDOW.equals(listingStrategy)) { + listByAdjustedSlidingTimeWindow(context, session); + } else { throw new ProcessException("Unknown listing strategy: " + listingStrategy); } } + public void listByAdjustedSlidingTimeWindow(final ProcessContext context, final ProcessSession session) throws ProcessException { + if (this.lastListedLatestEntryTimestampMillis == null || justElectedPrimaryNode) { + try { + final StateMap stateMap = context.getStateManager().getState(getStateScope(context)); + Optional.ofNullable(stateMap.get(LATEST_LISTED_ENTRY_TIMESTAMP_KEY)) + .map(Long::parseLong) + .ifPresent(lastTimestamp -> this.lastListedLatestEntryTimestampMillis = lastTimestamp); + + justElectedPrimaryNode = false; + } catch (final IOException ioe) { + getLogger().error("Failed to retrieve timestamp of last listing from the State Manager. Will not perform listing until this is accomplished."); + context.yield(); + return; + } + } + + long lowerBoundInclusiveTimestamp = Optional.ofNullable(this.lastListedLatestEntryTimestampMillis).orElse(0L); + long upperBoundExclusiveTimestamp; + + long currentTime = getCurrentTime(); + + final TreeMap<Long, List<T>> orderedEntries = new TreeMap<>(); + try { + List<T> entityList = performListing(context, lowerBoundInclusiveTimestamp); + + boolean targetSystemHasMilliseconds = false; + boolean targetSystemHasSeconds = false; + for (final T entity : entityList) { Review comment: This bigger code block about precision of the target system seems very similar to the according part of the listByAdjustedSlidingTimeWindow. Would there be a chance to extract this? ########## File path: nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/AbstractListProcessor.java ########## @@ -431,11 +482,157 @@ public void onTrigger(final ProcessContext context, final ProcessSession session } else if (BY_ENTITIES.equals(listingStrategy)) { listByTrackingEntities(context, session); + } else if (BY_ADJUSTED_TIME_WINDOW.equals(listingStrategy)) { + listByAdjustedSlidingTimeWindow(context, session); + } else { throw new ProcessException("Unknown listing strategy: " + listingStrategy); } } + public void listByAdjustedSlidingTimeWindow(final ProcessContext context, final ProcessSession session) throws ProcessException { + if (this.lastListedLatestEntryTimestampMillis == null || justElectedPrimaryNode) { + try { + final StateMap stateMap = context.getStateManager().getState(getStateScope(context)); + Optional.ofNullable(stateMap.get(LATEST_LISTED_ENTRY_TIMESTAMP_KEY)) + .map(Long::parseLong) + .ifPresent(lastTimestamp -> this.lastListedLatestEntryTimestampMillis = lastTimestamp); + + justElectedPrimaryNode = false; + } catch (final IOException ioe) { + getLogger().error("Failed to retrieve timestamp of last listing from the State Manager. Will not perform listing until this is accomplished."); + context.yield(); + return; + } + } + + long lowerBoundInclusiveTimestamp = Optional.ofNullable(this.lastListedLatestEntryTimestampMillis).orElse(0L); + long upperBoundExclusiveTimestamp; + + long currentTime = getCurrentTime(); + + final TreeMap<Long, List<T>> orderedEntries = new TreeMap<>(); + try { + List<T> entityList = performListing(context, lowerBoundInclusiveTimestamp); + + boolean targetSystemHasMilliseconds = false; + boolean targetSystemHasSeconds = false; + for (final T entity : entityList) { + final long entityTimestampMillis = entity.getTimestamp(); + if (!targetSystemHasMilliseconds) { + targetSystemHasMilliseconds = entityTimestampMillis % 1000 > 0; + } + if (!targetSystemHasSeconds) { + targetSystemHasSeconds = entityTimestampMillis % 60_000 > 0; + } + } + + // Determine target system time precision. + String specifiedPrecision = context.getProperty(TARGET_SYSTEM_TIMESTAMP_PRECISION).getValue(); + if (StringUtils.isBlank(specifiedPrecision)) { + // If TARGET_SYSTEM_TIMESTAMP_PRECISION is not supported by the Processor, then specifiedPrecision can be null, instead of its default value. + specifiedPrecision = getDefaultTimePrecision(); + } + final TimeUnit targetSystemTimePrecision + = PRECISION_AUTO_DETECT.getValue().equals(specifiedPrecision) + ? targetSystemHasMilliseconds ? TimeUnit.MILLISECONDS : targetSystemHasSeconds ? TimeUnit.SECONDS : TimeUnit.MINUTES + : PRECISION_MILLIS.getValue().equals(specifiedPrecision) ? TimeUnit.MILLISECONDS + : PRECISION_SECONDS.getValue().equals(specifiedPrecision) ? TimeUnit.SECONDS : TimeUnit.MINUTES; + final Long listingLagMillis = LISTING_LAG_MILLIS.get(targetSystemTimePrecision); + + upperBoundExclusiveTimestamp = getAdjustedCurrentTimestamp(context, currentTime) - listingLagMillis; + + if (getLogger().isTraceEnabled()) { + getLogger().trace("interval: " + lowerBoundInclusiveTimestamp + " - " + upperBoundExclusiveTimestamp); + getLogger().trace("entityList: " + entityList.stream().map(entity -> entity.getName() + "_" + entity.getTimestamp()).collect(Collectors.joining(", "))); + } + entityList + .stream() + .filter(entity -> entity.getTimestamp() >= lowerBoundInclusiveTimestamp) + .filter(entity -> entity.getTimestamp() < upperBoundExclusiveTimestamp) + .forEach(entity -> orderedEntries + .computeIfAbsent(entity.getTimestamp(), __ -> new ArrayList<>()) + .add(entity) + ); + if (getLogger().isTraceEnabled()) { + getLogger().trace("orderedEntries: " + + orderedEntries.values().stream() + .flatMap(List::stream) + .map(entity -> entity.getName() + "_" + entity.getTimestamp()) + .collect(Collectors.joining(", ")) + ); + } + } catch (final IOException e) { + getLogger().error("Failed to perform listing on remote host due to {}", new Object[]{e.getMessage()}, e); + context.yield(); + return; + } + + if (orderedEntries.isEmpty()) { + getLogger().debug("There is no data to list. Yielding."); + context.yield(); + return; + } + + final boolean writerSet = context.getProperty(RECORD_WRITER).isSet(); + if (writerSet) { + try { + createRecordsForEntities(context, session, orderedEntries); + } catch (final IOException | SchemaNotFoundException e) { + getLogger().error("Failed to write listing to FlowFile", e); + context.yield(); + return; + } + } else { + createFlowFilesForEntities(context, session, orderedEntries); + } + + try { + if (getLogger().isTraceEnabled()) { + getLogger().info("this.lastListedLatestEntryTimestampMillis = upperBoundExclusiveTimestamp: " + this.lastListedLatestEntryTimestampMillis + " = " + upperBoundExclusiveTimestamp); + } + this.lastListedLatestEntryTimestampMillis = upperBoundExclusiveTimestamp; + persist(upperBoundExclusiveTimestamp, upperBoundExclusiveTimestamp, latestIdentifiersProcessed, context.getStateManager(), getStateScope(context)); + } catch (final IOException ioe) { + getLogger().warn("Unable to save state due to {}. If NiFi is restarted before state is saved, or " + + "if another node begins executing this Processor, data duplication may occur.", ioe); + } + } + + protected long getAdjustedCurrentTimestamp(ProcessContext context, long currentTime) { + String timeAdjustmentString = context.getProperty(TIME_ADJUSTMENT).evaluateAttributeExpressions().getValue(); + + long positiveOrNegative; + long timeAdjustment; + + if (timeAdjustmentString.startsWith("-")) { + positiveOrNegative = -1L; + timeAdjustmentString = timeAdjustmentString.substring(1); + } else { + positiveOrNegative = 1L; + } + + if (timeAdjustmentString.matches("\\d{2}:\\d{2}(:\\d{2})?")) { Review comment: Why not consider "-" as part of the regexps? parseLong should be able to handle the original string, and hypothetically this would allow input like "-GMT". (It should be caught by the validator however) ########## File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFileTransfer.java ########## @@ -69,7 +72,10 @@ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) .defaultValue(".") .build(); - + public static final PropertyDescriptor FILE_TANSFER_LISTING_STRATEGY = new PropertyDescriptor.Builder() Review comment: Typo: FILE_TRANSFER... ########## File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFileTransfer.java ########## @@ -138,4 +144,19 @@ protected boolean isListingResetNecessary(final PropertyDescriptor property) { protected abstract FileTransfer getFileTransfer(final ProcessContext context); protected abstract String getProtocolName(); + + protected void validateAdjustedTimeWindow(ValidationContext validationContext, Collection<ValidationResult> results) { + if ( + BY_ADJUSTED_TIME_WINDOW.getValue().equals(validationContext.getProperty(LISTING_STRATEGY).getValue()) Review comment: Would it make sense to add a negative case, e.g.: when it's not adjusted time window, the time adjustment should _not_ be set? ########## File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFTP.java ########## @@ -72,7 +72,8 @@ final PropertyDescriptor port = new PropertyDescriptor.Builder().fromPropertyDescriptor(UNDEFAULTED_PORT).defaultValue("21").build(); final List<PropertyDescriptor> properties = new ArrayList<>(); - properties.add(LISTING_STRATEGY); + properties.add(FILE_TANSFER_LISTING_STRATEGY); + properties.add(TIME_ADJUSTMENT); Review comment: As mentioned above: I think this should be lower in the list ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org