Can this be prevented by moving temporary files (copy + delete
individually) at finalization instead of copying all of them and performing
a bulk delete ? You can support task failures by ignoring renames when the
destination exists. Python SDK currently does this (and puts temp files in
a sub-directory).

Thanks,
Cham

On Wed, Oct 19, 2016 at 6:25 PM Eugene Kirpichov
<kirpic...@google.com.invalid> wrote:

Hello,

This is a continuation of the discussion on PR
https://github.com/apache/incubator-beam/pull/1050 which turned out more
complex than expected.

Short summary:
Currently FileBasedSink, when writing to /path/to/foo (in practice,
/path/to/foo-xxxxx-of-yyyyy where yyyyy is the total number of output
files), puts temporary files into /path/to/foo-temp-$uid, and when
finalizing the sink, it removes the temporary files by matching the pattern
/path/to/foo-temp-* and removing everything that matches.

There are a couple of issues with this:
- FileBasedSink uses IOChannelFactory, which currently supports local
filesystems and Google Cloud Storage (GCS). GCS's match() operation is
currently eventually consistent. So, it may fail to return some of the
files, so we won't remove them.
- If the Beam job is cancelled or fails midway, then the temp files won't
be deleted at all (that's subject to a separate discussion on cleanup API -
AFAIK there's no JIRA for it yet, I believe peihe@ was thinking about this
and was going to file one).
- If a follow-up data processing job is reading /path/to/foo, then the way
temp files are named, they will likely match the same glob pattern (e.g.
"/path/to/foo*") as the one intending to match the final output in
/path/to/foo, so if some temp files are leftover, the follow-up job will
effectively read duplicate records (some from /path/to/foo, some from
/path/to/foo-temp-$blah).

I think, in the absence of a way to guarantee that all temp files will be
deleted (I think it'd be very difficult or impossible to provide a hard
guarantee of this, considering various possible failure conditions such as
zombie workers), the cleanest way to solve this is put temp files in a
location that's unlikely to match the same glob pattern as one that matches
the final output.

Some options for what that could be:
1. A subdirectory that is a sibling of the final path, sufficiently unique,
and unlikely to match the same glob -
/path/to/temp-beam-foo-$uid/$another_uid (that's the approach the PR
currently takes)
2. A subdirectory under PipelineOptions.tempLocation - this might be flawed
because PipelineOptions.tempLocation might be on a different filesystem, or
have different ACLs, than the output of the FileBasedSink.
3. A subdirectory that the user *must* explicitly provide on their
FileBasedSink. This is a reduction in usability, but there may be cases
when this is necessary - e.g. if the final location of the FileBasedSink is
such that we can't create siblings to it (e.g. the root path in a GCS
bucket - gs://some-bucket/)
4. A subdirectory generated by a new IOChannelFactory call ("give me a temp
directory for the given final path") which would do one of the above -
reasonable, and simplifies FileBasedSink, but we still need to choose which
of #1-#3 this call should do.

There might be other things I missed. There might be radical restructurings
of FileBasedSink that work around this problem entirely, though I couldn't
think of any.

In general, the requirements on the solution are:
- It should be very unlikely that somebody reads the temp files in the same
glob pattern as the final output by mistake.
- It should continue to make sense as IOChannelFactory is extended with
support for other filesystems.
- It should ideally use the same filesystem as the final output, or perhaps
even a location logically "close" to the final output, so that it could
potentially take advantage of that filesystem's efficient bulk-copy or
bulk-rename operations if available.
- It should be easy to manually clean up the temp files if something went
wrong and they weren't cleaned up by the Beam job.

I'm personally in favor of #1 with fallback to #2 or #3, because I think a
sibling directory achieves all of these requirements unless a sibling
directory can't be created.

Thoughts?

Reply via email to