This thread is conflating many issues.

* Putting temp files where they will not match the glob for the desired
output files
* Dealing with eventually-consistent filesystems (s3, GCS, ...)
* Properly cleaning up all temp files

They all need to get solved, but for now I think we only need to solve  the
first one.

Siblings fundamentally will not work. Consider the following
perfectly-valid output path: s3://bucket/file-SSS-NNN.txt . A sibling would
be a new bucket, so not guaranteed to exist.

On Thu, Oct 20, 2016 at 1:57 AM, Chamikara Jayalath <chamik...@apache.org>
wrote:

> Can this be prevented by moving temporary files (copy + delete
> individually) at finalization instead of copying all of them and performing
> a bulk delete ? You can support task failures by ignoring renames when the
> destination exists. Python SDK currently does this (and puts temp files in
> a sub-directory).
>
> Thanks,
> Cham
>
> On Wed, Oct 19, 2016 at 6:25 PM Eugene Kirpichov
> <kirpic...@google.com.invalid> wrote:
>
> Hello,
>
> This is a continuation of the discussion on PR
> https://github.com/apache/incubator-beam/pull/1050 which turned out more
> complex than expected.
>
> Short summary:
> Currently FileBasedSink, when writing to /path/to/foo (in practice,
> /path/to/foo-xxxxx-of-yyyyy where yyyyy is the total number of output
> files), puts temporary files into /path/to/foo-temp-$uid, and when
> finalizing the sink, it removes the temporary files by matching the pattern
> /path/to/foo-temp-* and removing everything that matches.
>
> There are a couple of issues with this:
> - FileBasedSink uses IOChannelFactory, which currently supports local
> filesystems and Google Cloud Storage (GCS). GCS's match() operation is
> currently eventually consistent. So, it may fail to return some of the
> files, so we won't remove them.
> - If the Beam job is cancelled or fails midway, then the temp files won't
> be deleted at all (that's subject to a separate discussion on cleanup API -
> AFAIK there's no JIRA for it yet, I believe peihe@ was thinking about this
> and was going to file one).
> - If a follow-up data processing job is reading /path/to/foo, then the way
> temp files are named, they will likely match the same glob pattern (e.g.
> "/path/to/foo*") as the one intending to match the final output in
> /path/to/foo, so if some temp files are leftover, the follow-up job will
> effectively read duplicate records (some from /path/to/foo, some from
> /path/to/foo-temp-$blah).
>
> I think, in the absence of a way to guarantee that all temp files will be
> deleted (I think it'd be very difficult or impossible to provide a hard
> guarantee of this, considering various possible failure conditions such as
> zombie workers), the cleanest way to solve this is put temp files in a
> location that's unlikely to match the same glob pattern as one that matches
> the final output.
>
> Some options for what that could be:
> 1. A subdirectory that is a sibling of the final path, sufficiently unique,
> and unlikely to match the same glob -
> /path/to/temp-beam-foo-$uid/$another_uid (that's the approach the PR
> currently takes)
> 2. A subdirectory under PipelineOptions.tempLocation - this might be flawed
> because PipelineOptions.tempLocation might be on a different filesystem, or
> have different ACLs, than the output of the FileBasedSink.
> 3. A subdirectory that the user *must* explicitly provide on their
> FileBasedSink. This is a reduction in usability, but there may be cases
> when this is necessary - e.g. if the final location of the FileBasedSink is
> such that we can't create siblings to it (e.g. the root path in a GCS
> bucket - gs://some-bucket/)
> 4. A subdirectory generated by a new IOChannelFactory call ("give me a temp
> directory for the given final path") which would do one of the above -
> reasonable, and simplifies FileBasedSink, but we still need to choose which
> of #1-#3 this call should do.
>
> There might be other things I missed. There might be radical restructurings
> of FileBasedSink that work around this problem entirely, though I couldn't
> think of any.
>
> In general, the requirements on the solution are:
> - It should be very unlikely that somebody reads the temp files in the same
> glob pattern as the final output by mistake.
> - It should continue to make sense as IOChannelFactory is extended with
> support for other filesystems.
> - It should ideally use the same filesystem as the final output, or perhaps
> even a location logically "close" to the final output, so that it could
> potentially take advantage of that filesystem's efficient bulk-copy or
> bulk-rename operations if available.
> - It should be easy to manually clean up the temp files if something went
> wrong and they weren't cleaned up by the Beam job.
>
> I'm personally in favor of #1 with fallback to #2 or #3, because I think a
> sibling directory achieves all of these requirements unless a sibling
> directory can't be created.
>
> Thoughts?
>

Reply via email to