@Cham - this addresses temporary files that were written by successful
bundles, but not by failed bundles (and not the case when the entire
pipeline fails), so it's not sufficient.

@Dan - yes, there are situations when it's impossible to create a sibling.
In that case, we'd need a fallback - either something the user needs to
explicitly specify ("your path is such that we don't know where to place
temporary files, please specify withTempLocation or something"), or I like
Robert's option of using sibling but differently-named files in this case.

@Kenn - yeah, a directory-based format would be great
(/path/to/foo/xxxxx-of-yyyyy), but this would be a breaking change to the
expected behavior.

I actually really like the option of sibling-but-differently-named files
(/path/to/temp-beam-foo-$uid) which would be a very non-invasive change to
the current (/path/to/foo-temp-$uid) and indeed would not involve creating
new directories or needing new IOChannelFactory APIs. It will still match a
glob like /path/to/* though (which a user could conceivably specify in a
situation like gs://my-logs-bucket/*), but it might be good enough.


On Thu, Oct 20, 2016 at 10:14 AM Robert Bradshaw
<rober...@google.com.invalid> wrote:

> On Thu, Oct 20, 2016 at 9:58 AM, Kenneth Knowles <k...@google.com.invalid>
> wrote:
> > I like the spirit of proposal #1 for addressing the critical duplication
> > problem, though as Dan points out the logic to choose a related but
> > collision-free name might be slightly more complex.
> >
> > It is a nice bonus that it addresses the less critical issues and
> improves
> > usability for manual inspections and interventions.
> >
> > The term "sibling" is being slightly misused here. I'd say #1 as proposed
> > is a "sibling of the parent" while today's behavior is "sibling". I'd
> say a
> > root cause of multiple problems is that our sharded file format is "a
> bunch
> > of files next to each other" and the sibling is "other files in the same
> > directory" so it takes some care, and explicit file name tracking instead
> > of globbing, to work with it correctly.
> >
> >  AFAIK (corrections welcome) there is nothing special about
> > Write.to("s3://bucket/file") meaning write to
> > "s3://bucket/file-$shardnum-of-$totalshards". An alternative that seems
> > superior is to write to "s3://bucket/file/$shardnum-of-$totalshards" with
> > the convention that this prefix is fully owned by this file. Now the
> prefix
> > "s3://bucket/file/" _is_ the sharded file. It is conceptually simpler and
> > more glob and UI friendly. (any non "-" character would work for GCS and
> > S3, but the "/" convention is better, considering the broader world)
> >
> > And bringing it back to this thread, the "sibling" is no longer "more
> files
> > in the same directory" now "s3://bucket/file-temp-$uid" which is on the
> > same filesystem with the same ACLs. It is also more UI friendly, easier
> to
> > clean up, and does more to explicitly indicate that this is really one
> > sharded file. Perhaps there's a pitfall I am overlooking?
>
> Using directories rather than prefixes is a big change, and introduces
> complications like dealing with hidden dot files (some placed
> implicitly by the system or applications, and worrying about
> executable bits rather than just the rw ones and possibly more
> complicated permission inheritance).
>
> > Also since you mentioned local file support, FWIW the cleanup glob
> "file-*"
> > today breaks on Windows due to Java library vagaries, while "file/*"
> would
> > succeed.
> > On Thu, Oct 20, 2016, 09:14 Dan Halperin <dhalp...@google.com.invalid>
> > wrote:
> >
> > This thread is conflating many issues.
> >
> > * Putting temp files where they will not match the glob for the desired
> > output files
> > * Dealing with eventually-consistent filesystems (s3, GCS, ...)
> > * Properly cleaning up all temp files
> >
> > They all need to get solved, but for now I think we only need to solve
> the
> > first one.
> >
> > Siblings fundamentally will not work. Consider the following
> > perfectly-valid output path: s3://bucket/file-SSS-NNN.txt . A sibling
> would
> > be a new bucket, so not guaranteed to exist.
> >
> > On Thu, Oct 20, 2016 at 1:57 AM, Chamikara Jayalath <
> chamik...@apache.org>
> > wrote:
> >
> >> Can this be prevented by moving temporary files (copy + delete
> >> individually) at finalization instead of copying all of them and
> > performing
> >> a bulk delete ? You can support task failures by ignoring renames when
> the
> >> destination exists. Python SDK currently does this (and puts temp files
> in
> >> a sub-directory).
> >>
> >> Thanks,
> >> Cham
> >>
> >> On Wed, Oct 19, 2016 at 6:25 PM Eugene Kirpichov
> >> <kirpic...@google.com.invalid> wrote:
> >>
> >> Hello,
> >>
> >> This is a continuation of the discussion on PR
> >> https://github.com/apache/incubator-beam/pull/1050 which turned out
> more
> >> complex than expected.
> >>
> >> Short summary:
> >> Currently FileBasedSink, when writing to /path/to/foo (in practice,
> >> /path/to/foo-xxxxx-of-yyyyy where yyyyy is the total number of output
> >> files), puts temporary files into /path/to/foo-temp-$uid, and when
> >> finalizing the sink, it removes the temporary files by matching the
> > pattern
> >> /path/to/foo-temp-* and removing everything that matches.
> >>
> >> There are a couple of issues with this:
> >> - FileBasedSink uses IOChannelFactory, which currently supports local
> >> filesystems and Google Cloud Storage (GCS). GCS's match() operation is
> >> currently eventually consistent. So, it may fail to return some of the
> >> files, so we won't remove them.
> >> - If the Beam job is cancelled or fails midway, then the temp files
> won't
> >> be deleted at all (that's subject to a separate discussion on cleanup
> API
> > -
> >> AFAIK there's no JIRA for it yet, I believe peihe@ was thinking about
> this
> >> and was going to file one).
> >> - If a follow-up data processing job is reading /path/to/foo, then the
> way
> >> temp files are named, they will likely match the same glob pattern (e.g.
> >> "/path/to/foo*") as the one intending to match the final output in
> >> /path/to/foo, so if some temp files are leftover, the follow-up job will
> >> effectively read duplicate records (some from /path/to/foo, some from
> >> /path/to/foo-temp-$blah).
> >>
> >> I think, in the absence of a way to guarantee that all temp files will
> be
> >> deleted (I think it'd be very difficult or impossible to provide a hard
> >> guarantee of this, considering various possible failure conditions such
> as
> >> zombie workers), the cleanest way to solve this is put temp files in a
> >> location that's unlikely to match the same glob pattern as one that
> > matches
> >> the final output.
> >>
> >> Some options for what that could be:
> >> 1. A subdirectory that is a sibling of the final path, sufficiently
> > unique,
> >> and unlikely to match the same glob -
> >> /path/to/temp-beam-foo-$uid/$another_uid (that's the approach the PR
> >> currently takes)
> >> 2. A subdirectory under PipelineOptions.tempLocation - this might be
> > flawed
> >> because PipelineOptions.tempLocation might be on a different filesystem,
> > or
> >> have different ACLs, than the output of the FileBasedSink.
> >> 3. A subdirectory that the user *must* explicitly provide on their
> >> FileBasedSink. This is a reduction in usability, but there may be cases
> >> when this is necessary - e.g. if the final location of the FileBasedSink
> > is
> >> such that we can't create siblings to it (e.g. the root path in a GCS
> >> bucket - gs://some-bucket/)
> >> 4. A subdirectory generated by a new IOChannelFactory call ("give me a
> > temp
> >> directory for the given final path") which would do one of the above -
> >> reasonable, and simplifies FileBasedSink, but we still need to choose
> > which
> >> of #1-#3 this call should do.
> >>
> >> There might be other things I missed. There might be radical
> > restructurings
> >> of FileBasedSink that work around this problem entirely, though I
> couldn't
> >> think of any.
> >>
> >> In general, the requirements on the solution are:
> >> - It should be very unlikely that somebody reads the temp files in the
> > same
> >> glob pattern as the final output by mistake.
> >> - It should continue to make sense as IOChannelFactory is extended with
> >> support for other filesystems.
> >> - It should ideally use the same filesystem as the final output, or
> > perhaps
> >> even a location logically "close" to the final output, so that it could
> >> potentially take advantage of that filesystem's efficient bulk-copy or
> >> bulk-rename operations if available.
> >> - It should be easy to manually clean up the temp files if something
> went
> >> wrong and they weren't cleaned up by the Beam job.
> >>
> >> I'm personally in favor of #1 with fallback to #2 or #3, because I
> think a
> >> sibling directory achieves all of these requirements unless a sibling
> >> directory can't be created.
> >>
> >> Thoughts?
> >>
>

Reply via email to