[ 
https://issues.apache.org/jira/browse/FLINK-5083?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15671880#comment-15671880
 ] 

Kostas Kloudas commented on FLINK-5083:
---------------------------------------

Thanks for reporting it!

There is a pending pull request here: https://github.com/apache/flink/pull/2797 
that removes the deleting all together. 
The reason is that deletion of lingering files does not play well with 
rescaling, which re-shuffles the different state of 
individual tasks.

Given that this PR is about to be merged, I suppose that this issue will be 
resolved.
In addition I also have another PR for the RollingSink ready to open as soon as 
the aforementioned one gets merged.

> Race condition in Rolling/Bucketing Sink pending files cleanup
> --------------------------------------------------------------
>
>                 Key: FLINK-5083
>                 URL: https://issues.apache.org/jira/browse/FLINK-5083
>             Project: Flink
>          Issue Type: Bug
>          Components: DataStream API
>    Affects Versions: 1.2.0, 1.1.3
>            Reporter: Cliff Resnick
>
> In both Open and Restore methods there is code that:
> 1. gets a recursive listing from baseDir
> 2. iterates listing and name checks filenames based on subtaskIndex and other 
> criteria to find pending or in-progress files. If found delete.
> The problem is that the recursive listing gets all files for all 
> subtaskIndexes. The race error is when #hasNext is called as part of the 
> iteration, a hidden existence check is made on the "next" file, which was 
> deleted by another task after-listing but pre-iteration, so an error is 
> thrown and the job fails. 
> Depending on the number of pending files, this condition may outlast the 
> number of job retries, each failing on a different file.
> A solution would be use #listStatus instead. The hadoop FileSystem supports a 
> PathFilter in its #listStatus calls, but not in the recursive #listFiles 
> call. The cleanup is performed from the baseDir so the recursive listing 
> would have to be in Flink. 
> This touches on another issue. Over time, the directory listing is bound to 
> get very large, and re-listing everything from the baseDir may get 
> increasingly expensive, especially if the Fs is S3. Maybe we can have a 
> Bucketer callback to return a list of cleanup root directories based on the 
> current file? I'm guessing most people are using time based bucketing, so 
> there's only so much of a period where cleanup will matter. If so, then this 
> would solve for the above recursive listing problem.
>  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to