On Thu, Oct 27, 2016 at 1:27 PM Eugene Kirpichov <kirpic...@google.com.invalid> wrote:
> Getting back to this. I noticed that the original user's job mentioned in > > http://stackoverflow.com/questions/39822859/temp-files-remain-in-gcs-after-a-dataflow-job-succeeded > is > configured to write to /path/to/$date/foo-xxxxx-of-yyyyy and another job > then reads from /path/to/$date/*, so sibling files won't work - it's > necessary to put temp files either into a subdirectory, or in a location > completely outside /path/to/$date/. > I think, at least for GCS, glob pattern '/path/to/$date/*' will include files that are within any immediate sub-directory '/path/to/$date/uuid/'. So unless users use the pattern '/path/to/$date/foo*' they could run into the same issue. Thanks, Cham > > By the way, if we ever support recursive globs (e.g. /path/to/foo/**/*), > then a subdirectory won't help; and if a user has another job that reads > from, say, /path/to/**/* (without the "foo" component - e.g. if foo is a > date, and they have a job that reads all data for all dates), then a > sibling directory won't help either. > > I think these two cases are good motivation for allowing the user to > provide a specific temp directory, as a last resort. > > To sum up: > - in order to solve the user's problem, we need to use a directory > - in the future we'll need to allow users to configure the temp directory > on FileBasedSink. > > The current PR takes the "directory sibling to the write path" approach, > and I don't see a better option that would address the needs of most users > automatically. > > Dan - you mentioned on the PR that you would prefer a subdirectory to a > sibling directory, but this *is* a subdirectory (specified write path is > /path/to/$date/foo-xxxxx-of-yyyyy and the suggested temp path is > /path/to/$date/temp-beam-foo-$uid/ which is a subdirectory of the directory > to which the sink is writing). > > Any alternatives / objections to proceeding with the approach in the PR > as-is? > > On Thu, Oct 20, 2016 at 6:26 PM Kenneth Knowles <k...@google.com.invalid> > wrote: > > > @Eugene, we can make breaking changes. But if we really don't want to, we > > can add it under a new name easily. That particular inheritance hierarchy > > is not precious IMO. > > > > On Thu, Oct 20, 2016, 14:03 Eugene Kirpichov > <kirpic...@google.com.invalid > > > > > wrote: > > > > > @Cham - this addresses temporary files that were written by successful > > > bundles, but not by failed bundles (and not the case when the entire > > > pipeline fails), so it's not sufficient. > > > > > > @Dan - yes, there are situations when it's impossible to create a > > sibling. > > > In that case, we'd need a fallback - either something the user needs to > > > explicitly specify ("your path is such that we don't know where to > place > > > temporary files, please specify withTempLocation or something"), or I > > like > > > Robert's option of using sibling but differently-named files in this > > case. > > > > > > @Kenn - yeah, a directory-based format would be great > > > (/path/to/foo/xxxxx-of-yyyyy), but this would be a breaking change to > the > > > expected behavior. > > > > > > I actually really like the option of sibling-but-differently-named > files > > > (/path/to/temp-beam-foo-$uid) which would be a very non-invasive change > > to > > > the current (/path/to/foo-temp-$uid) and indeed would not involve > > creating > > > new directories or needing new IOChannelFactory APIs. It will still > > match a > > > glob like /path/to/* though (which a user could conceivably specify in > a > > > situation like gs://my-logs-bucket/*), but it might be good enough. > > > > > > > > > On Thu, Oct 20, 2016 at 10:14 AM Robert Bradshaw > > > <rober...@google.com.invalid> wrote: > > > > > > > On Thu, Oct 20, 2016 at 9:58 AM, Kenneth Knowles > > <k...@google.com.invalid > > > > > > > > wrote: > > > > > I like the spirit of proposal #1 for addressing the critical > > > duplication > > > > > problem, though as Dan points out the logic to choose a related but > > > > > collision-free name might be slightly more complex. > > > > > > > > > > It is a nice bonus that it addresses the less critical issues and > > > > improves > > > > > usability for manual inspections and interventions. > > > > > > > > > > The term "sibling" is being slightly misused here. I'd say #1 as > > > proposed > > > > > is a "sibling of the parent" while today's behavior is "sibling". > I'd > > > > say a > > > > > root cause of multiple problems is that our sharded file format is > "a > > > > bunch > > > > > of files next to each other" and the sibling is "other files in the > > > same > > > > > directory" so it takes some care, and explicit file name tracking > > > instead > > > > > of globbing, to work with it correctly. > > > > > > > > > > AFAIK (corrections welcome) there is nothing special about > > > > > Write.to("s3://bucket/file") meaning write to > > > > > "s3://bucket/file-$shardnum-of-$totalshards". An alternative that > > seems > > > > > superior is to write to > "s3://bucket/file/$shardnum-of-$totalshards" > > > with > > > > > the convention that this prefix is fully owned by this file. Now > the > > > > prefix > > > > > "s3://bucket/file/" _is_ the sharded file. It is conceptually > simpler > > > and > > > > > more glob and UI friendly. (any non "-" character would work for > GCS > > > and > > > > > S3, but the "/" convention is better, considering the broader > world) > > > > > > > > > > And bringing it back to this thread, the "sibling" is no longer > "more > > > > files > > > > > in the same directory" now "s3://bucket/file-temp-$uid" which is on > > the > > > > > same filesystem with the same ACLs. It is also more UI friendly, > > easier > > > > to > > > > > clean up, and does more to explicitly indicate that this is really > > one > > > > > sharded file. Perhaps there's a pitfall I am overlooking? > > > > > > > > Using directories rather than prefixes is a big change, and > introduces > > > > complications like dealing with hidden dot files (some placed > > > > implicitly by the system or applications, and worrying about > > > > executable bits rather than just the rw ones and possibly more > > > > complicated permission inheritance). > > > > > > > > > Also since you mentioned local file support, FWIW the cleanup glob > > > > "file-*" > > > > > today breaks on Windows due to Java library vagaries, while > "file/*" > > > > would > > > > > succeed. > > > > > On Thu, Oct 20, 2016, 09:14 Dan Halperin > <dhalp...@google.com.invalid > > > > > > > > wrote: > > > > > > > > > > This thread is conflating many issues. > > > > > > > > > > * Putting temp files where they will not match the glob for the > > desired > > > > > output files > > > > > * Dealing with eventually-consistent filesystems (s3, GCS, ...) > > > > > * Properly cleaning up all temp files > > > > > > > > > > They all need to get solved, but for now I think we only need to > > solve > > > > the > > > > > first one. > > > > > > > > > > Siblings fundamentally will not work. Consider the following > > > > > perfectly-valid output path: s3://bucket/file-SSS-NNN.txt . A > sibling > > > > would > > > > > be a new bucket, so not guaranteed to exist. > > > > > > > > > > On Thu, Oct 20, 2016 at 1:57 AM, Chamikara Jayalath < > > > > chamik...@apache.org> > > > > > wrote: > > > > > > > > > >> Can this be prevented by moving temporary files (copy + delete > > > > >> individually) at finalization instead of copying all of them and > > > > > performing > > > > >> a bulk delete ? You can support task failures by ignoring renames > > when > > > > the > > > > >> destination exists. Python SDK currently does this (and puts temp > > > files > > > > in > > > > >> a sub-directory). > > > > >> > > > > >> Thanks, > > > > >> Cham > > > > >> > > > > >> On Wed, Oct 19, 2016 at 6:25 PM Eugene Kirpichov > > > > >> <kirpic...@google.com.invalid> wrote: > > > > >> > > > > >> Hello, > > > > >> > > > > >> This is a continuation of the discussion on PR > > > > >> https://github.com/apache/incubator-beam/pull/1050 which turned > out > > > > more > > > > >> complex than expected. > > > > >> > > > > >> Short summary: > > > > >> Currently FileBasedSink, when writing to /path/to/foo (in > practice, > > > > >> /path/to/foo-xxxxx-of-yyyyy where yyyyy is the total number of > > output > > > > >> files), puts temporary files into /path/to/foo-temp-$uid, and when > > > > >> finalizing the sink, it removes the temporary files by matching > the > > > > > pattern > > > > >> /path/to/foo-temp-* and removing everything that matches. > > > > >> > > > > >> There are a couple of issues with this: > > > > >> - FileBasedSink uses IOChannelFactory, which currently supports > > local > > > > >> filesystems and Google Cloud Storage (GCS). GCS's match() > operation > > is > > > > >> currently eventually consistent. So, it may fail to return some of > > the > > > > >> files, so we won't remove them. > > > > >> - If the Beam job is cancelled or fails midway, then the temp > files > > > > won't > > > > >> be deleted at all (that's subject to a separate discussion on > > cleanup > > > > API > > > > > - > > > > >> AFAIK there's no JIRA for it yet, I believe peihe@ was thinking > > about > > > > this > > > > >> and was going to file one). > > > > >> - If a follow-up data processing job is reading /path/to/foo, then > > the > > > > way > > > > >> temp files are named, they will likely match the same glob pattern > > > (e.g. > > > > >> "/path/to/foo*") as the one intending to match the final output in > > > > >> /path/to/foo, so if some temp files are leftover, the follow-up > job > > > will > > > > >> effectively read duplicate records (some from /path/to/foo, some > > from > > > > >> /path/to/foo-temp-$blah). > > > > >> > > > > >> I think, in the absence of a way to guarantee that all temp files > > will > > > > be > > > > >> deleted (I think it'd be very difficult or impossible to provide a > > > hard > > > > >> guarantee of this, considering various possible failure conditions > > > such > > > > as > > > > >> zombie workers), the cleanest way to solve this is put temp files > > in a > > > > >> location that's unlikely to match the same glob pattern as one > that > > > > > matches > > > > >> the final output. > > > > >> > > > > >> Some options for what that could be: > > > > >> 1. A subdirectory that is a sibling of the final path, > sufficiently > > > > > unique, > > > > >> and unlikely to match the same glob - > > > > >> /path/to/temp-beam-foo-$uid/$another_uid (that's the approach the > PR > > > > >> currently takes) > > > > >> 2. A subdirectory under PipelineOptions.tempLocation - this might > be > > > > > flawed > > > > >> because PipelineOptions.tempLocation might be on a different > > > filesystem, > > > > > or > > > > >> have different ACLs, than the output of the FileBasedSink. > > > > >> 3. A subdirectory that the user *must* explicitly provide on their > > > > >> FileBasedSink. This is a reduction in usability, but there may be > > > cases > > > > >> when this is necessary - e.g. if the final location of the > > > FileBasedSink > > > > > is > > > > >> such that we can't create siblings to it (e.g. the root path in a > > GCS > > > > >> bucket - gs://some-bucket/) > > > > >> 4. A subdirectory generated by a new IOChannelFactory call ("give > > me a > > > > > temp > > > > >> directory for the given final path") which would do one of the > > above - > > > > >> reasonable, and simplifies FileBasedSink, but we still need to > > choose > > > > > which > > > > >> of #1-#3 this call should do. > > > > >> > > > > >> There might be other things I missed. There might be radical > > > > > restructurings > > > > >> of FileBasedSink that work around this problem entirely, though I > > > > couldn't > > > > >> think of any. > > > > >> > > > > >> In general, the requirements on the solution are: > > > > >> - It should be very unlikely that somebody reads the temp files in > > the > > > > > same > > > > >> glob pattern as the final output by mistake. > > > > >> - It should continue to make sense as IOChannelFactory is extended > > > with > > > > >> support for other filesystems. > > > > >> - It should ideally use the same filesystem as the final output, > or > > > > > perhaps > > > > >> even a location logically "close" to the final output, so that it > > > could > > > > >> potentially take advantage of that filesystem's efficient > bulk-copy > > or > > > > >> bulk-rename operations if available. > > > > >> - It should be easy to manually clean up the temp files if > something > > > > went > > > > >> wrong and they weren't cleaned up by the Beam job. > > > > >> > > > > >> I'm personally in favor of #1 with fallback to #2 or #3, because I > > > > think a > > > > >> sibling directory achieves all of these requirements unless a > > sibling > > > > >> directory can't be created. > > > > >> > > > > >> Thoughts? > > > > >> > > > > > > > > > >