[ 
https://issues.apache.org/jira/browse/NIFI-4715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16306602#comment-16306602
 ] 

ASF GitHub Bot commented on NIFI-4715:
--------------------------------------

Github user adamlamar commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2361#discussion_r159111452
  
    --- Diff: 
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java
 ---
    @@ -267,26 +267,28 @@ public void onTrigger(final ProcessContext context, 
final ProcessSession session
                 commit(context, session, listCount);
                 listCount = 0;
             } while (bucketLister.isTruncated());
    -        currentTimestamp = maxTimestamp;
    +
    +        if (maxTimestamp > currentTimestamp) {
    +            currentTimestamp = maxTimestamp;
    +        }
     
             final long listMillis = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
             getLogger().info("Successfully listed S3 bucket {} in {} millis", 
new Object[]{bucket, listMillis});
     
             if (!commit(context, session, listCount)) {
    -            if (currentTimestamp > 0) {
    -                persistState(context);
    -            }
                 getLogger().debug("No new objects in S3 bucket {} to list. 
Yielding.", new Object[]{bucket});
                 context.yield();
             }
    +
    +        // Persist all state, including any currentKeys
    +        persistState(context);
    --- End diff --
    
    @ijokarumawak I started writing an example, but then realized you are 
correct - there is no need to manually call `persistState` because any addition 
to `currentKeys` will also increment `listCount`, and the normal update 
mechanism will take over from there. We shouldn't need a `dirtyState` flag.


> ListS3 produces duplicates in frequently updated buckets
> --------------------------------------------------------
>
>                 Key: NIFI-4715
>                 URL: https://issues.apache.org/jira/browse/NIFI-4715
>             Project: Apache NiFi
>          Issue Type: Bug
>          Components: Core Framework
>    Affects Versions: 1.2.0, 1.3.0, 1.4.0
>         Environment: All
>            Reporter: Milan Das
>         Attachments: List-S3-dup-issue.xml, screenshot-1.png
>
>
> ListS3 state is implemented using HashSet. HashSet is not thread safe. When 
> ListS3 operates in multi threaded mode, sometimes it  tries to list  same 
> file from S3 bucket.  Seems like HashSet data is getting corrupted.
> currentKeys = new HashSet<>(); // need to be implemented Thread Safe like 
> currentKeys = //ConcurrentHashMap.newKeySet();
> *{color:red}+Update+{color}*:
> This is not a HashSet issue:
> Root cause is: 
> When the file gets uploaded to S3 simultaneously  when List S3 is in progress.
> onTrigger-->  maxTimestamp is initiated as 0L.
> This is clearing keys as per the code below
> When lastModifiedTime on S3 object is same as currentTimestamp for the listed 
> key it should be skipped. As the key is cleared, it is loading the same file 
> again. 
> I think fix should be to initiate the maxTimestamp with currentTimestamp not 
> 0L.
> {code}
>  long maxTimestamp = currentTimestamp;
> {code}
> Following block is clearing keys.
> {code:title=org.apache.nifi.processors.aws.s3.ListS3.java|borderStyle=solid}
>  if (lastModified > maxTimestamp) {
>                     maxTimestamp = lastModified;
>                     currentKeys.clear();
>                     getLogger().debug("clearing keys");
>                 }
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to