[ https://issues.apache.org/jira/browse/NIFI-4715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16302653#comment-16302653 ]
ASF GitHub Bot commented on NIFI-4715: -------------------------------------- Github user adamlamar commented on a diff in the pull request: https://github.com/apache/nifi/pull/2361#discussion_r158593061 --- Diff: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java --- @@ -267,26 +267,28 @@ public void onTrigger(final ProcessContext context, final ProcessSession session commit(context, session, listCount); listCount = 0; } while (bucketLister.isTruncated()); - currentTimestamp = maxTimestamp; + + if (maxTimestamp > currentTimestamp) { + currentTimestamp = maxTimestamp; + } final long listMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos); getLogger().info("Successfully listed S3 bucket {} in {} millis", new Object[]{bucket, listMillis}); if (!commit(context, session, listCount)) { - if (currentTimestamp > 0) { - persistState(context); - } getLogger().debug("No new objects in S3 bucket {} to list. Yielding.", new Object[]{bucket}); context.yield(); } + + // Persist all state, including any currentKeys + persistState(context); --- End diff -- Both exit paths already perform `persistState` - this just makes that more clear. > ListS3 produces duplicates in frequently updated buckets > -------------------------------------------------------- > > Key: NIFI-4715 > URL: https://issues.apache.org/jira/browse/NIFI-4715 > Project: Apache NiFi > Issue Type: Bug > Components: Core Framework > Affects Versions: 1.2.0, 1.3.0, 1.4.0 > Environment: All > Reporter: Milan Das > Attachments: List-S3-dup-issue.xml, screenshot-1.png > > > ListS3 state is implemented using HashSet. HashSet is not thread safe. When > ListS3 operates in multi threaded mode, sometimes it tries to list same > file from S3 bucket. Seems like HashSet data is getting corrupted. > currentKeys = new HashSet<>(); // need to be implemented Thread Safe like > currentKeys = //ConcurrentHashMap.newKeySet(); > *{color:red}+Update+{color}*: > This is not a HashSet issue: > Root cause is: > When the file gets uploaded to S3 simultaneously when List S3 is in progress. > onTrigger--> maxTimestamp is initiated as 0L. > This is clearing keys as per the code below > When lastModifiedTime on S3 object is same as currentTimestamp for the listed > key it should be skipped. As the key is cleared, it is loading the same file > again. > I think fix should be to initiate the maxTimestamp with currentTimestamp not > 0L. > {code} > long maxTimestamp = currentTimestamp; > {code} > Following block is clearing keys. > {code:title=org.apache.nifi.processors.aws.s3.ListS3.java|borderStyle=solid} > if (lastModified > maxTimestamp) { > maxTimestamp = lastModified; > currentKeys.clear(); > getLogger().debug("clearing keys"); > } > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)