[GitHub] nifi pull request #2361: NIFI-4715: ListS3 produces duplicates in frequently...

2018-11-03 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/nifi/pull/2361


---


[GitHub] nifi pull request #2361: NIFI-4715: ListS3 produces duplicates in frequently...

2017-12-29 Thread ijokarumawak
Github user ijokarumawak commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2361#discussion_r159118001
  
--- Diff: 
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java
 ---
@@ -264,18 +265,19 @@ public void onTrigger(final ProcessContext context, 
final ProcessSession session
 }
 bucketLister.setNextMarker();
 
+totalListCount += listCount;
 commit(context, session, listCount);
 listCount = 0;
 } while (bucketLister.isTruncated());
+
+// Update stateManger with the most recent timestamp
 currentTimestamp = maxTimestamp;
+persistState(context);
 
 final long listMillis = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
 getLogger().info("Successfully listed S3 bucket {} in {} millis", 
new Object[]{bucket, listMillis});
 
-if (!commit(context, session, listCount)) {
-if (currentTimestamp > 0) {
-persistState(context);
-}
+if (totalListCount == 0) {
--- End diff --

Good catch!


---


[GitHub] nifi pull request #2361: NIFI-4715: ListS3 produces duplicates in frequently...

2017-12-29 Thread ijokarumawak
Github user ijokarumawak commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2361#discussion_r159117995
  
--- Diff: 
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java
 ---
@@ -264,18 +265,19 @@ public void onTrigger(final ProcessContext context, 
final ProcessSession session
 }
 bucketLister.setNextMarker();
 
+totalListCount += listCount;
 commit(context, session, listCount);
 listCount = 0;
 } while (bucketLister.isTruncated());
+
+// Update stateManger with the most recent timestamp
 currentTimestamp = maxTimestamp;
+persistState(context);
--- End diff --

These two lines of code can be embedded in `commit` method.
```
 // Update stateManger with the most recent timestamp
 currentTimestamp = maxTimestamp;
 persistState(context);
```


---


[GitHub] nifi pull request #2361: NIFI-4715: ListS3 produces duplicates in frequently...

2017-12-29 Thread adamlamar
Github user adamlamar commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2361#discussion_r159112711
  
--- Diff: 
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java
 ---
@@ -264,18 +265,19 @@ public void onTrigger(final ProcessContext context, 
final ProcessSession session
 }
 bucketLister.setNextMarker();
 
+totalListCount += listCount;
 commit(context, session, listCount);
 listCount = 0;
 } while (bucketLister.isTruncated());
+
+// Update stateManger with the most recent timestamp
 currentTimestamp = maxTimestamp;
+persistState(context);
 
 final long listMillis = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
 getLogger().info("Successfully listed S3 bucket {} in {} millis", 
new Object[]{bucket, listMillis});
 
-if (!commit(context, session, listCount)) {
-if (currentTimestamp > 0) {
-persistState(context);
-}
--- End diff --

Note that this `commit` isn't required, since the last part of the main 
do/while loop already does a `commit`. Further, it sets `listCount` to zero, so 
this branch would always be taken.


---


[GitHub] nifi pull request #2361: NIFI-4715: ListS3 produces duplicates in frequently...

2017-12-29 Thread adamlamar
Github user adamlamar commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2361#discussion_r159111452
  
--- Diff: 
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java
 ---
@@ -267,26 +267,28 @@ public void onTrigger(final ProcessContext context, 
final ProcessSession session
 commit(context, session, listCount);
 listCount = 0;
 } while (bucketLister.isTruncated());
-currentTimestamp = maxTimestamp;
+
+if (maxTimestamp > currentTimestamp) {
+currentTimestamp = maxTimestamp;
+}
 
 final long listMillis = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
 getLogger().info("Successfully listed S3 bucket {} in {} millis", 
new Object[]{bucket, listMillis});
 
 if (!commit(context, session, listCount)) {
-if (currentTimestamp > 0) {
-persistState(context);
-}
 getLogger().debug("No new objects in S3 bucket {} to list. 
Yielding.", new Object[]{bucket});
 context.yield();
 }
+
+// Persist all state, including any currentKeys
+persistState(context);
--- End diff --

@ijokarumawak I started writing an example, but then realized you are 
correct - there is no need to manually call `persistState` because any addition 
to `currentKeys` will also increment `listCount`, and the normal update 
mechanism will take over from there. We shouldn't need a `dirtyState` flag.


---


[GitHub] nifi pull request #2361: NIFI-4715: ListS3 produces duplicates in frequently...

2017-12-26 Thread ijokarumawak
Github user ijokarumawak commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2361#discussion_r158753087
  
--- Diff: 
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java
 ---
@@ -267,26 +267,28 @@ public void onTrigger(final ProcessContext context, 
final ProcessSession session
 commit(context, session, listCount);
 listCount = 0;
 } while (bucketLister.isTruncated());
-currentTimestamp = maxTimestamp;
+
+if (maxTimestamp > currentTimestamp) {
+currentTimestamp = maxTimestamp;
+}
 
 final long listMillis = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
 getLogger().info("Successfully listed S3 bucket {} in {} millis", 
new Object[]{bucket, listMillis});
 
 if (!commit(context, session, listCount)) {
-if (currentTimestamp > 0) {
-persistState(context);
-}
 getLogger().debug("No new objects in S3 bucket {} to list. 
Yielding.", new Object[]{bucket});
 context.yield();
 }
+
+// Persist all state, including any currentKeys
+persistState(context);
--- End diff --

@adamlamar Well, if I'm not overlooking anything, `currentKeys` is only 
modified in `onTrigger` method when only new entry is found. Would you show me 
an example? It still confounds me. If we can set a flag like `dirtyState`, then 
the condition should be clarify though. Thanks!


---


[GitHub] nifi pull request #2361: NIFI-4715: ListS3 produces duplicates in frequently...

2017-12-26 Thread adamlamar
Github user adamlamar commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2361#discussion_r158740756
  
--- Diff: 
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java
 ---
@@ -267,26 +267,28 @@ public void onTrigger(final ProcessContext context, 
final ProcessSession session
 commit(context, session, listCount);
 listCount = 0;
 } while (bucketLister.isTruncated());
-currentTimestamp = maxTimestamp;
+
+if (maxTimestamp > currentTimestamp) {
+currentTimestamp = maxTimestamp;
+}
 
 final long listMillis = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
 getLogger().info("Successfully listed S3 bucket {} in {} millis", 
new Object[]{bucket, listMillis});
 
 if (!commit(context, session, listCount)) {
-if (currentTimestamp > 0) {
-persistState(context);
-}
 getLogger().debug("No new objects in S3 bucket {} to list. 
Yielding.", new Object[]{bucket});
 context.yield();
 }
+
+// Persist all state, including any currentKeys
+persistState(context);
--- End diff --

@ijokarumawak Its important to `persistState` when `currentKeys` has been 
modified, even if the `currentTimestamp` hasn't been modified, to avoid 
producing duplicates when multiple files are listed during the same 
millisecond. This is a related but distinct issue. Its a rare condition though.

You're correct that this change will cause more load on the state manager. 
How about setting a flag like `dirtyState` that would avoid calling `setState` 
if it has not been modified?


---


[GitHub] nifi pull request #2361: NIFI-4715: ListS3 produces duplicates in frequently...

2017-12-25 Thread ijokarumawak
Github user ijokarumawak commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2361#discussion_r158674300
  
--- Diff: 
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java
 ---
@@ -267,26 +267,28 @@ public void onTrigger(final ProcessContext context, 
final ProcessSession session
 commit(context, session, listCount);
 listCount = 0;
 } while (bucketLister.isTruncated());
-currentTimestamp = maxTimestamp;
+
+if (maxTimestamp > currentTimestamp) {
+currentTimestamp = maxTimestamp;
+}
 
 final long listMillis = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
 getLogger().info("Successfully listed S3 bucket {} in {} millis", 
new Object[]{bucket, listMillis});
 
 if (!commit(context, session, listCount)) {
-if (currentTimestamp > 0) {
-persistState(context);
-}
 getLogger().debug("No new objects in S3 bucket {} to list. 
Yielding.", new Object[]{bucket});
 context.yield();
 }
+
+// Persist all state, including any currentKeys
+persistState(context);
--- End diff --

Do we still need this? Isn't updating state within commit() enough? We 
should minimize the number of status updates as some state storage is not 
designed for frequently updates, e.g. Zookeeper. I think if the processor 
didn't find any new file to list, then it does not have to update state, does 
it?

I might be missing something as the original reason is not clear to me, to 
call persistState() when there was nothing to commit.


---


[GitHub] nifi pull request #2361: NIFI-4715: ListS3 produces duplicates in frequently...

2017-12-23 Thread adamlamar
Github user adamlamar commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2361#discussion_r158593055
  
--- Diff: 
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java
 ---
@@ -267,26 +267,28 @@ public void onTrigger(final ProcessContext context, 
final ProcessSession session
 commit(context, session, listCount);
 listCount = 0;
 } while (bucketLister.isTruncated());
-currentTimestamp = maxTimestamp;
+
+if (maxTimestamp > currentTimestamp) {
+currentTimestamp = maxTimestamp;
+}
 
 final long listMillis = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
 getLogger().info("Successfully listed S3 bucket {} in {} millis", 
new Object[]{bucket, listMillis});
 
 if (!commit(context, session, listCount)) {
-if (currentTimestamp > 0) {
-persistState(context);
-}
--- End diff --

Since `currentTimestamp` is never overwritten by a `maxTimestamp` with a 
value of zero, this check shouldn't be necessary anymore.


---


[GitHub] nifi pull request #2361: NIFI-4715: ListS3 produces duplicates in frequently...

2017-12-23 Thread adamlamar
Github user adamlamar commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2361#discussion_r158593053
  
--- Diff: 
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java
 ---
@@ -267,26 +267,28 @@ public void onTrigger(final ProcessContext context, 
final ProcessSession session
 commit(context, session, listCount);
 listCount = 0;
 } while (bucketLister.isTruncated());
-currentTimestamp = maxTimestamp;
+
+if (maxTimestamp > currentTimestamp) {
+currentTimestamp = maxTimestamp;
+}
--- End diff --

`maxTimestamp` should always be greater than `currentTimestamp`, but this 
adds a sanity check.


---


[GitHub] nifi pull request #2361: NIFI-4715: ListS3 produces duplicates in frequently...

2017-12-23 Thread adamlamar
Github user adamlamar commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2361#discussion_r158593061
  
--- Diff: 
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java
 ---
@@ -267,26 +267,28 @@ public void onTrigger(final ProcessContext context, 
final ProcessSession session
 commit(context, session, listCount);
 listCount = 0;
 } while (bucketLister.isTruncated());
-currentTimestamp = maxTimestamp;
+
+if (maxTimestamp > currentTimestamp) {
+currentTimestamp = maxTimestamp;
+}
 
 final long listMillis = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
 getLogger().info("Successfully listed S3 bucket {} in {} millis", 
new Object[]{bucket, listMillis});
 
 if (!commit(context, session, listCount)) {
-if (currentTimestamp > 0) {
-persistState(context);
-}
 getLogger().debug("No new objects in S3 bucket {} to list. 
Yielding.", new Object[]{bucket});
 context.yield();
 }
+
+// Persist all state, including any currentKeys
+persistState(context);
--- End diff --

Both exit paths already perform `persistState` - this just makes that more 
clear.


---


[GitHub] nifi pull request #2361: NIFI-4715: ListS3 produces duplicates in frequently...

2017-12-23 Thread adamlamar
GitHub user adamlamar opened a pull request:

https://github.com/apache/nifi/pull/2361

NIFI-4715: ListS3 produces duplicates in frequently updated buckets

Thank you for submitting a contribution to Apache NiFi.

In order to streamline the review of the contribution we ask you
to ensure the following steps have been taken:

### For all changes:
- [Y] Is there a JIRA ticket associated with this PR? Is it referenced 
 in the commit message?

- [Y] Does your PR title start with NIFI- where  is the JIRA number 
you are trying to resolve? Pay particular attention to the hyphen "-" character.

- [Y] Has your PR been rebased against the latest commit within the target 
branch (typically master)?

- [Y] Is your initial contribution a single, squashed commit?

### For code changes:
- [Y] Have you ensured that the full suite of tests is executed via mvn 
-Pcontrib-check clean install at the root nifi folder?
- [N] Have you written or updated unit tests to verify your changes?
- [NA] If adding new dependencies to the code, are these dependencies 
licensed in a way that is compatible for inclusion under [ASF 
2.0](http://www.apache.org/legal/resolved.html#category-a)? 
- [NA] If applicable, have you updated the LICENSE file, including the main 
LICENSE file under nifi-assembly?
- [NA] If applicable, have you updated the NOTICE file, including the main 
NOTICE file found under nifi-assembly?
- [NA] If adding new Properties, have you added .displayName in addition to 
.name (programmatic access) for each of the new properties?

### For documentation related changes:
- [NA] Have you ensured that format looks appropriate for the output in 
which it is rendered?

### Note:
Please ensure that once the PR is submitted, you check travis-ci for build 
issues and submit an update to your PR as soon as possible.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/adamlamar/nifi NIFI-4715

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/nifi/pull/2361.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2361


commit bcaa84bba251c3a70c27a466185f6b20863eab93
Author: Adam Lamar 
Date:   2017-12-24T03:29:02Z

NIFI-4715: ListS3 produces duplicates in frequently updated buckets




---