[GitHub] nifi pull request #2361: NIFI-4715: ListS3 produces duplicates in frequently...
Github user asfgit closed the pull request at: https://github.com/apache/nifi/pull/2361 ---
[GitHub] nifi pull request #2361: NIFI-4715: ListS3 produces duplicates in frequently...
Github user ijokarumawak commented on a diff in the pull request: https://github.com/apache/nifi/pull/2361#discussion_r159118001 --- Diff: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java --- @@ -264,18 +265,19 @@ public void onTrigger(final ProcessContext context, final ProcessSession session } bucketLister.setNextMarker(); +totalListCount += listCount; commit(context, session, listCount); listCount = 0; } while (bucketLister.isTruncated()); + +// Update stateManger with the most recent timestamp currentTimestamp = maxTimestamp; +persistState(context); final long listMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos); getLogger().info("Successfully listed S3 bucket {} in {} millis", new Object[]{bucket, listMillis}); -if (!commit(context, session, listCount)) { -if (currentTimestamp > 0) { -persistState(context); -} +if (totalListCount == 0) { --- End diff -- Good catch! ---
[GitHub] nifi pull request #2361: NIFI-4715: ListS3 produces duplicates in frequently...
Github user ijokarumawak commented on a diff in the pull request: https://github.com/apache/nifi/pull/2361#discussion_r159117995 --- Diff: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java --- @@ -264,18 +265,19 @@ public void onTrigger(final ProcessContext context, final ProcessSession session } bucketLister.setNextMarker(); +totalListCount += listCount; commit(context, session, listCount); listCount = 0; } while (bucketLister.isTruncated()); + +// Update stateManger with the most recent timestamp currentTimestamp = maxTimestamp; +persistState(context); --- End diff -- These two lines of code can be embedded in `commit` method. ``` // Update stateManger with the most recent timestamp currentTimestamp = maxTimestamp; persistState(context); ``` ---
[GitHub] nifi pull request #2361: NIFI-4715: ListS3 produces duplicates in frequently...
Github user adamlamar commented on a diff in the pull request: https://github.com/apache/nifi/pull/2361#discussion_r159112711 --- Diff: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java --- @@ -264,18 +265,19 @@ public void onTrigger(final ProcessContext context, final ProcessSession session } bucketLister.setNextMarker(); +totalListCount += listCount; commit(context, session, listCount); listCount = 0; } while (bucketLister.isTruncated()); + +// Update stateManger with the most recent timestamp currentTimestamp = maxTimestamp; +persistState(context); final long listMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos); getLogger().info("Successfully listed S3 bucket {} in {} millis", new Object[]{bucket, listMillis}); -if (!commit(context, session, listCount)) { -if (currentTimestamp > 0) { -persistState(context); -} --- End diff -- Note that this `commit` isn't required, since the last part of the main do/while loop already does a `commit`. Further, it sets `listCount` to zero, so this branch would always be taken. ---
[GitHub] nifi pull request #2361: NIFI-4715: ListS3 produces duplicates in frequently...
Github user adamlamar commented on a diff in the pull request: https://github.com/apache/nifi/pull/2361#discussion_r159111452 --- Diff: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java --- @@ -267,26 +267,28 @@ public void onTrigger(final ProcessContext context, final ProcessSession session commit(context, session, listCount); listCount = 0; } while (bucketLister.isTruncated()); -currentTimestamp = maxTimestamp; + +if (maxTimestamp > currentTimestamp) { +currentTimestamp = maxTimestamp; +} final long listMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos); getLogger().info("Successfully listed S3 bucket {} in {} millis", new Object[]{bucket, listMillis}); if (!commit(context, session, listCount)) { -if (currentTimestamp > 0) { -persistState(context); -} getLogger().debug("No new objects in S3 bucket {} to list. Yielding.", new Object[]{bucket}); context.yield(); } + +// Persist all state, including any currentKeys +persistState(context); --- End diff -- @ijokarumawak I started writing an example, but then realized you are correct - there is no need to manually call `persistState` because any addition to `currentKeys` will also increment `listCount`, and the normal update mechanism will take over from there. We shouldn't need a `dirtyState` flag. ---
[GitHub] nifi pull request #2361: NIFI-4715: ListS3 produces duplicates in frequently...
Github user ijokarumawak commented on a diff in the pull request: https://github.com/apache/nifi/pull/2361#discussion_r158753087 --- Diff: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java --- @@ -267,26 +267,28 @@ public void onTrigger(final ProcessContext context, final ProcessSession session commit(context, session, listCount); listCount = 0; } while (bucketLister.isTruncated()); -currentTimestamp = maxTimestamp; + +if (maxTimestamp > currentTimestamp) { +currentTimestamp = maxTimestamp; +} final long listMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos); getLogger().info("Successfully listed S3 bucket {} in {} millis", new Object[]{bucket, listMillis}); if (!commit(context, session, listCount)) { -if (currentTimestamp > 0) { -persistState(context); -} getLogger().debug("No new objects in S3 bucket {} to list. Yielding.", new Object[]{bucket}); context.yield(); } + +// Persist all state, including any currentKeys +persistState(context); --- End diff -- @adamlamar Well, if I'm not overlooking anything, `currentKeys` is only modified in `onTrigger` method when only new entry is found. Would you show me an example? It still confounds me. If we can set a flag like `dirtyState`, then the condition should be clarify though. Thanks! ---
[GitHub] nifi pull request #2361: NIFI-4715: ListS3 produces duplicates in frequently...
Github user adamlamar commented on a diff in the pull request: https://github.com/apache/nifi/pull/2361#discussion_r158740756 --- Diff: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java --- @@ -267,26 +267,28 @@ public void onTrigger(final ProcessContext context, final ProcessSession session commit(context, session, listCount); listCount = 0; } while (bucketLister.isTruncated()); -currentTimestamp = maxTimestamp; + +if (maxTimestamp > currentTimestamp) { +currentTimestamp = maxTimestamp; +} final long listMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos); getLogger().info("Successfully listed S3 bucket {} in {} millis", new Object[]{bucket, listMillis}); if (!commit(context, session, listCount)) { -if (currentTimestamp > 0) { -persistState(context); -} getLogger().debug("No new objects in S3 bucket {} to list. Yielding.", new Object[]{bucket}); context.yield(); } + +// Persist all state, including any currentKeys +persistState(context); --- End diff -- @ijokarumawak Its important to `persistState` when `currentKeys` has been modified, even if the `currentTimestamp` hasn't been modified, to avoid producing duplicates when multiple files are listed during the same millisecond. This is a related but distinct issue. Its a rare condition though. You're correct that this change will cause more load on the state manager. How about setting a flag like `dirtyState` that would avoid calling `setState` if it has not been modified? ---
[GitHub] nifi pull request #2361: NIFI-4715: ListS3 produces duplicates in frequently...
Github user ijokarumawak commented on a diff in the pull request: https://github.com/apache/nifi/pull/2361#discussion_r158674300 --- Diff: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java --- @@ -267,26 +267,28 @@ public void onTrigger(final ProcessContext context, final ProcessSession session commit(context, session, listCount); listCount = 0; } while (bucketLister.isTruncated()); -currentTimestamp = maxTimestamp; + +if (maxTimestamp > currentTimestamp) { +currentTimestamp = maxTimestamp; +} final long listMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos); getLogger().info("Successfully listed S3 bucket {} in {} millis", new Object[]{bucket, listMillis}); if (!commit(context, session, listCount)) { -if (currentTimestamp > 0) { -persistState(context); -} getLogger().debug("No new objects in S3 bucket {} to list. Yielding.", new Object[]{bucket}); context.yield(); } + +// Persist all state, including any currentKeys +persistState(context); --- End diff -- Do we still need this? Isn't updating state within commit() enough? We should minimize the number of status updates as some state storage is not designed for frequently updates, e.g. Zookeeper. I think if the processor didn't find any new file to list, then it does not have to update state, does it? I might be missing something as the original reason is not clear to me, to call persistState() when there was nothing to commit. ---
[GitHub] nifi pull request #2361: NIFI-4715: ListS3 produces duplicates in frequently...
Github user adamlamar commented on a diff in the pull request: https://github.com/apache/nifi/pull/2361#discussion_r158593055 --- Diff: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java --- @@ -267,26 +267,28 @@ public void onTrigger(final ProcessContext context, final ProcessSession session commit(context, session, listCount); listCount = 0; } while (bucketLister.isTruncated()); -currentTimestamp = maxTimestamp; + +if (maxTimestamp > currentTimestamp) { +currentTimestamp = maxTimestamp; +} final long listMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos); getLogger().info("Successfully listed S3 bucket {} in {} millis", new Object[]{bucket, listMillis}); if (!commit(context, session, listCount)) { -if (currentTimestamp > 0) { -persistState(context); -} --- End diff -- Since `currentTimestamp` is never overwritten by a `maxTimestamp` with a value of zero, this check shouldn't be necessary anymore. ---
[GitHub] nifi pull request #2361: NIFI-4715: ListS3 produces duplicates in frequently...
Github user adamlamar commented on a diff in the pull request: https://github.com/apache/nifi/pull/2361#discussion_r158593053 --- Diff: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java --- @@ -267,26 +267,28 @@ public void onTrigger(final ProcessContext context, final ProcessSession session commit(context, session, listCount); listCount = 0; } while (bucketLister.isTruncated()); -currentTimestamp = maxTimestamp; + +if (maxTimestamp > currentTimestamp) { +currentTimestamp = maxTimestamp; +} --- End diff -- `maxTimestamp` should always be greater than `currentTimestamp`, but this adds a sanity check. ---
[GitHub] nifi pull request #2361: NIFI-4715: ListS3 produces duplicates in frequently...
Github user adamlamar commented on a diff in the pull request: https://github.com/apache/nifi/pull/2361#discussion_r158593061 --- Diff: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java --- @@ -267,26 +267,28 @@ public void onTrigger(final ProcessContext context, final ProcessSession session commit(context, session, listCount); listCount = 0; } while (bucketLister.isTruncated()); -currentTimestamp = maxTimestamp; + +if (maxTimestamp > currentTimestamp) { +currentTimestamp = maxTimestamp; +} final long listMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos); getLogger().info("Successfully listed S3 bucket {} in {} millis", new Object[]{bucket, listMillis}); if (!commit(context, session, listCount)) { -if (currentTimestamp > 0) { -persistState(context); -} getLogger().debug("No new objects in S3 bucket {} to list. Yielding.", new Object[]{bucket}); context.yield(); } + +// Persist all state, including any currentKeys +persistState(context); --- End diff -- Both exit paths already perform `persistState` - this just makes that more clear. ---
[GitHub] nifi pull request #2361: NIFI-4715: ListS3 produces duplicates in frequently...
GitHub user adamlamar opened a pull request: https://github.com/apache/nifi/pull/2361 NIFI-4715: ListS3 produces duplicates in frequently updated buckets Thank you for submitting a contribution to Apache NiFi. In order to streamline the review of the contribution we ask you to ensure the following steps have been taken: ### For all changes: - [Y] Is there a JIRA ticket associated with this PR? Is it referenced in the commit message? - [Y] Does your PR title start with NIFI- where is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character. - [Y] Has your PR been rebased against the latest commit within the target branch (typically master)? - [Y] Is your initial contribution a single, squashed commit? ### For code changes: - [Y] Have you ensured that the full suite of tests is executed via mvn -Pcontrib-check clean install at the root nifi folder? - [N] Have you written or updated unit tests to verify your changes? - [NA] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? - [NA] If applicable, have you updated the LICENSE file, including the main LICENSE file under nifi-assembly? - [NA] If applicable, have you updated the NOTICE file, including the main NOTICE file found under nifi-assembly? - [NA] If adding new Properties, have you added .displayName in addition to .name (programmatic access) for each of the new properties? ### For documentation related changes: - [NA] Have you ensured that format looks appropriate for the output in which it is rendered? ### Note: Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible. You can merge this pull request into a Git repository by running: $ git pull https://github.com/adamlamar/nifi NIFI-4715 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/nifi/pull/2361.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2361 commit bcaa84bba251c3a70c27a466185f6b20863eab93 Author: Adam Lamar Date: 2017-12-24T03:29:02Z NIFI-4715: ListS3 produces duplicates in frequently updated buckets ---