[
https://issues.apache.org/jira/browse/FLINK-5083?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15671880#comment-15671880
]
Kostas Kloudas commented on FLINK-5083:
---------------------------------------
Thanks for reporting it!
There is a pending pull request here: https://github.com/apache/flink/pull/2797
that removes the deleting all together.
The reason is that deletion of lingering files does not play well with
rescaling, which re-shuffles the different state of
individual tasks.
Given that this PR is about to be merged, I suppose that this issue will be
resolved.
In addition I also have another PR for the RollingSink ready to open as soon as
the aforementioned one gets merged.
> Race condition in Rolling/Bucketing Sink pending files cleanup
> --------------------------------------------------------------
>
> Key: FLINK-5083
> URL: https://issues.apache.org/jira/browse/FLINK-5083
> Project: Flink
> Issue Type: Bug
> Components: DataStream API
> Affects Versions: 1.2.0, 1.1.3
> Reporter: Cliff Resnick
>
> In both Open and Restore methods there is code that:
> 1. gets a recursive listing from baseDir
> 2. iterates listing and name checks filenames based on subtaskIndex and other
> criteria to find pending or in-progress files. If found delete.
> The problem is that the recursive listing gets all files for all
> subtaskIndexes. The race error is when #hasNext is called as part of the
> iteration, a hidden existence check is made on the "next" file, which was
> deleted by another task after-listing but pre-iteration, so an error is
> thrown and the job fails.
> Depending on the number of pending files, this condition may outlast the
> number of job retries, each failing on a different file.
> A solution would be use #listStatus instead. The hadoop FileSystem supports a
> PathFilter in its #listStatus calls, but not in the recursive #listFiles
> call. The cleanup is performed from the baseDir so the recursive listing
> would have to be in Flink.
> This touches on another issue. Over time, the directory listing is bound to
> get very large, and re-listing everything from the baseDir may get
> increasingly expensive, especially if the Fs is S3. Maybe we can have a
> Bucketer callback to return a list of cleanup root directories based on the
> current file? I'm guessing most people are using time based bucketing, so
> there's only so much of a period where cleanup will matter. If so, then this
> would solve for the above recursive listing problem.
>
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)