This thread is conflating many issues. * Putting temp files where they will not match the glob for the desired output files * Dealing with eventually-consistent filesystems (s3, GCS, ...) * Properly cleaning up all temp files
They all need to get solved, but for now I think we only need to solve the first one. Siblings fundamentally will not work. Consider the following perfectly-valid output path: s3://bucket/file-SSS-NNN.txt . A sibling would be a new bucket, so not guaranteed to exist. On Thu, Oct 20, 2016 at 1:57 AM, Chamikara Jayalath <chamik...@apache.org> wrote: > Can this be prevented by moving temporary files (copy + delete > individually) at finalization instead of copying all of them and performing > a bulk delete ? You can support task failures by ignoring renames when the > destination exists. Python SDK currently does this (and puts temp files in > a sub-directory). > > Thanks, > Cham > > On Wed, Oct 19, 2016 at 6:25 PM Eugene Kirpichov > <kirpic...@google.com.invalid> wrote: > > Hello, > > This is a continuation of the discussion on PR > https://github.com/apache/incubator-beam/pull/1050 which turned out more > complex than expected. > > Short summary: > Currently FileBasedSink, when writing to /path/to/foo (in practice, > /path/to/foo-xxxxx-of-yyyyy where yyyyy is the total number of output > files), puts temporary files into /path/to/foo-temp-$uid, and when > finalizing the sink, it removes the temporary files by matching the pattern > /path/to/foo-temp-* and removing everything that matches. > > There are a couple of issues with this: > - FileBasedSink uses IOChannelFactory, which currently supports local > filesystems and Google Cloud Storage (GCS). GCS's match() operation is > currently eventually consistent. So, it may fail to return some of the > files, so we won't remove them. > - If the Beam job is cancelled or fails midway, then the temp files won't > be deleted at all (that's subject to a separate discussion on cleanup API - > AFAIK there's no JIRA for it yet, I believe peihe@ was thinking about this > and was going to file one). > - If a follow-up data processing job is reading /path/to/foo, then the way > temp files are named, they will likely match the same glob pattern (e.g. > "/path/to/foo*") as the one intending to match the final output in > /path/to/foo, so if some temp files are leftover, the follow-up job will > effectively read duplicate records (some from /path/to/foo, some from > /path/to/foo-temp-$blah). > > I think, in the absence of a way to guarantee that all temp files will be > deleted (I think it'd be very difficult or impossible to provide a hard > guarantee of this, considering various possible failure conditions such as > zombie workers), the cleanest way to solve this is put temp files in a > location that's unlikely to match the same glob pattern as one that matches > the final output. > > Some options for what that could be: > 1. A subdirectory that is a sibling of the final path, sufficiently unique, > and unlikely to match the same glob - > /path/to/temp-beam-foo-$uid/$another_uid (that's the approach the PR > currently takes) > 2. A subdirectory under PipelineOptions.tempLocation - this might be flawed > because PipelineOptions.tempLocation might be on a different filesystem, or > have different ACLs, than the output of the FileBasedSink. > 3. A subdirectory that the user *must* explicitly provide on their > FileBasedSink. This is a reduction in usability, but there may be cases > when this is necessary - e.g. if the final location of the FileBasedSink is > such that we can't create siblings to it (e.g. the root path in a GCS > bucket - gs://some-bucket/) > 4. A subdirectory generated by a new IOChannelFactory call ("give me a temp > directory for the given final path") which would do one of the above - > reasonable, and simplifies FileBasedSink, but we still need to choose which > of #1-#3 this call should do. > > There might be other things I missed. There might be radical restructurings > of FileBasedSink that work around this problem entirely, though I couldn't > think of any. > > In general, the requirements on the solution are: > - It should be very unlikely that somebody reads the temp files in the same > glob pattern as the final output by mistake. > - It should continue to make sense as IOChannelFactory is extended with > support for other filesystems. > - It should ideally use the same filesystem as the final output, or perhaps > even a location logically "close" to the final output, so that it could > potentially take advantage of that filesystem's efficient bulk-copy or > bulk-rename operations if available. > - It should be easy to manually clean up the temp files if something went > wrong and they weren't cleaned up by the Beam job. > > I'm personally in favor of #1 with fallback to #2 or #3, because I think a > sibling directory achieves all of these requirements unless a sibling > directory can't be created. > > Thoughts? >