I like the spirit of proposal #1 for addressing the critical duplication
problem, though as Dan points out the logic to choose a related but
collision-free name might be slightly more complex.

It is a nice bonus that it addresses the less critical issues and improves
usability for manual inspections and interventions.

The term "sibling" is being slightly misused here. I'd say #1 as proposed
is a "sibling of the parent" while today's behavior is "sibling". I'd say a
root cause of multiple problems is that our sharded file format is "a bunch
of files next to each other" and the sibling is "other files in the same
directory" so it takes some care, and explicit file name tracking instead
of globbing, to work with it correctly.

 AFAIK (corrections welcome) there is nothing special about
Write.to("s3://bucket/file") meaning write to
"s3://bucket/file-$shardnum-of-$totalshards". An alternative that seems
superior is to write to "s3://bucket/file/$shardnum-of-$totalshards" with
the convention that this prefix is fully owned by this file. Now the prefix
"s3://bucket/file/" _is_ the sharded file. It is conceptually simpler and
more glob and UI friendly. (any non "-" character would work for GCS and
S3, but the "/" convention is better, considering the broader world)

And bringing it back to this thread, the "sibling" is no longer "more files
in the same directory" now "s3://bucket/file-temp-$uid" which is on the
same filesystem with the same ACLs. It is also more UI friendly, easier to
clean up, and does more to explicitly indicate that this is really one
sharded file. Perhaps there's a pitfall I am overlooking?

Also since you mentioned local file support, FWIW the cleanup glob "file-*"
today breaks on Windows due to Java library vagaries, while "file/*" would
succeed.
On Thu, Oct 20, 2016, 09:14 Dan Halperin <dhalp...@google.com.invalid>
wrote:

This thread is conflating many issues.

* Putting temp files where they will not match the glob for the desired
output files
* Dealing with eventually-consistent filesystems (s3, GCS, ...)
* Properly cleaning up all temp files

They all need to get solved, but for now I think we only need to solve  the
first one.

Siblings fundamentally will not work. Consider the following
perfectly-valid output path: s3://bucket/file-SSS-NNN.txt . A sibling would
be a new bucket, so not guaranteed to exist.

On Thu, Oct 20, 2016 at 1:57 AM, Chamikara Jayalath <chamik...@apache.org>
wrote:

> Can this be prevented by moving temporary files (copy + delete
> individually) at finalization instead of copying all of them and
performing
> a bulk delete ? You can support task failures by ignoring renames when the
> destination exists. Python SDK currently does this (and puts temp files in
> a sub-directory).
>
> Thanks,
> Cham
>
> On Wed, Oct 19, 2016 at 6:25 PM Eugene Kirpichov
> <kirpic...@google.com.invalid> wrote:
>
> Hello,
>
> This is a continuation of the discussion on PR
> https://github.com/apache/incubator-beam/pull/1050 which turned out more
> complex than expected.
>
> Short summary:
> Currently FileBasedSink, when writing to /path/to/foo (in practice,
> /path/to/foo-xxxxx-of-yyyyy where yyyyy is the total number of output
> files), puts temporary files into /path/to/foo-temp-$uid, and when
> finalizing the sink, it removes the temporary files by matching the
pattern
> /path/to/foo-temp-* and removing everything that matches.
>
> There are a couple of issues with this:
> - FileBasedSink uses IOChannelFactory, which currently supports local
> filesystems and Google Cloud Storage (GCS). GCS's match() operation is
> currently eventually consistent. So, it may fail to return some of the
> files, so we won't remove them.
> - If the Beam job is cancelled or fails midway, then the temp files won't
> be deleted at all (that's subject to a separate discussion on cleanup API
-
> AFAIK there's no JIRA for it yet, I believe peihe@ was thinking about this
> and was going to file one).
> - If a follow-up data processing job is reading /path/to/foo, then the way
> temp files are named, they will likely match the same glob pattern (e.g.
> "/path/to/foo*") as the one intending to match the final output in
> /path/to/foo, so if some temp files are leftover, the follow-up job will
> effectively read duplicate records (some from /path/to/foo, some from
> /path/to/foo-temp-$blah).
>
> I think, in the absence of a way to guarantee that all temp files will be
> deleted (I think it'd be very difficult or impossible to provide a hard
> guarantee of this, considering various possible failure conditions such as
> zombie workers), the cleanest way to solve this is put temp files in a
> location that's unlikely to match the same glob pattern as one that
matches
> the final output.
>
> Some options for what that could be:
> 1. A subdirectory that is a sibling of the final path, sufficiently
unique,
> and unlikely to match the same glob -
> /path/to/temp-beam-foo-$uid/$another_uid (that's the approach the PR
> currently takes)
> 2. A subdirectory under PipelineOptions.tempLocation - this might be
flawed
> because PipelineOptions.tempLocation might be on a different filesystem,
or
> have different ACLs, than the output of the FileBasedSink.
> 3. A subdirectory that the user *must* explicitly provide on their
> FileBasedSink. This is a reduction in usability, but there may be cases
> when this is necessary - e.g. if the final location of the FileBasedSink
is
> such that we can't create siblings to it (e.g. the root path in a GCS
> bucket - gs://some-bucket/)
> 4. A subdirectory generated by a new IOChannelFactory call ("give me a
temp
> directory for the given final path") which would do one of the above -
> reasonable, and simplifies FileBasedSink, but we still need to choose
which
> of #1-#3 this call should do.
>
> There might be other things I missed. There might be radical
restructurings
> of FileBasedSink that work around this problem entirely, though I couldn't
> think of any.
>
> In general, the requirements on the solution are:
> - It should be very unlikely that somebody reads the temp files in the
same
> glob pattern as the final output by mistake.
> - It should continue to make sense as IOChannelFactory is extended with
> support for other filesystems.
> - It should ideally use the same filesystem as the final output, or
perhaps
> even a location logically "close" to the final output, so that it could
> potentially take advantage of that filesystem's efficient bulk-copy or
> bulk-rename operations if available.
> - It should be easy to manually clean up the temp files if something went
> wrong and they weren't cleaned up by the Beam job.
>
> I'm personally in favor of #1 with fallback to #2 or #3, because I think a
> sibling directory achieves all of these requirements unless a sibling
> directory can't be created.
>
> Thoughts?
>

Reply via email to