Can this be prevented by moving temporary files (copy + delete individually) at finalization instead of copying all of them and performing a bulk delete ? You can support task failures by ignoring renames when the destination exists. Python SDK currently does this (and puts temp files in a sub-directory).
Thanks, Cham On Wed, Oct 19, 2016 at 6:25 PM Eugene Kirpichov <kirpic...@google.com.invalid> wrote: Hello, This is a continuation of the discussion on PR https://github.com/apache/incubator-beam/pull/1050 which turned out more complex than expected. Short summary: Currently FileBasedSink, when writing to /path/to/foo (in practice, /path/to/foo-xxxxx-of-yyyyy where yyyyy is the total number of output files), puts temporary files into /path/to/foo-temp-$uid, and when finalizing the sink, it removes the temporary files by matching the pattern /path/to/foo-temp-* and removing everything that matches. There are a couple of issues with this: - FileBasedSink uses IOChannelFactory, which currently supports local filesystems and Google Cloud Storage (GCS). GCS's match() operation is currently eventually consistent. So, it may fail to return some of the files, so we won't remove them. - If the Beam job is cancelled or fails midway, then the temp files won't be deleted at all (that's subject to a separate discussion on cleanup API - AFAIK there's no JIRA for it yet, I believe peihe@ was thinking about this and was going to file one). - If a follow-up data processing job is reading /path/to/foo, then the way temp files are named, they will likely match the same glob pattern (e.g. "/path/to/foo*") as the one intending to match the final output in /path/to/foo, so if some temp files are leftover, the follow-up job will effectively read duplicate records (some from /path/to/foo, some from /path/to/foo-temp-$blah). I think, in the absence of a way to guarantee that all temp files will be deleted (I think it'd be very difficult or impossible to provide a hard guarantee of this, considering various possible failure conditions such as zombie workers), the cleanest way to solve this is put temp files in a location that's unlikely to match the same glob pattern as one that matches the final output. Some options for what that could be: 1. A subdirectory that is a sibling of the final path, sufficiently unique, and unlikely to match the same glob - /path/to/temp-beam-foo-$uid/$another_uid (that's the approach the PR currently takes) 2. A subdirectory under PipelineOptions.tempLocation - this might be flawed because PipelineOptions.tempLocation might be on a different filesystem, or have different ACLs, than the output of the FileBasedSink. 3. A subdirectory that the user *must* explicitly provide on their FileBasedSink. This is a reduction in usability, but there may be cases when this is necessary - e.g. if the final location of the FileBasedSink is such that we can't create siblings to it (e.g. the root path in a GCS bucket - gs://some-bucket/) 4. A subdirectory generated by a new IOChannelFactory call ("give me a temp directory for the given final path") which would do one of the above - reasonable, and simplifies FileBasedSink, but we still need to choose which of #1-#3 this call should do. There might be other things I missed. There might be radical restructurings of FileBasedSink that work around this problem entirely, though I couldn't think of any. In general, the requirements on the solution are: - It should be very unlikely that somebody reads the temp files in the same glob pattern as the final output by mistake. - It should continue to make sense as IOChannelFactory is extended with support for other filesystems. - It should ideally use the same filesystem as the final output, or perhaps even a location logically "close" to the final output, so that it could potentially take advantage of that filesystem's efficient bulk-copy or bulk-rename operations if available. - It should be easy to manually clean up the temp files if something went wrong and they weren't cleaned up by the Beam job. I'm personally in favor of #1 with fallback to #2 or #3, because I think a sibling directory achieves all of these requirements unless a sibling directory can't be created. Thoughts?