@Cham - this addresses temporary files that were written by successful bundles, but not by failed bundles (and not the case when the entire pipeline fails), so it's not sufficient.
@Dan - yes, there are situations when it's impossible to create a sibling. In that case, we'd need a fallback - either something the user needs to explicitly specify ("your path is such that we don't know where to place temporary files, please specify withTempLocation or something"), or I like Robert's option of using sibling but differently-named files in this case. @Kenn - yeah, a directory-based format would be great (/path/to/foo/xxxxx-of-yyyyy), but this would be a breaking change to the expected behavior. I actually really like the option of sibling-but-differently-named files (/path/to/temp-beam-foo-$uid) which would be a very non-invasive change to the current (/path/to/foo-temp-$uid) and indeed would not involve creating new directories or needing new IOChannelFactory APIs. It will still match a glob like /path/to/* though (which a user could conceivably specify in a situation like gs://my-logs-bucket/*), but it might be good enough. On Thu, Oct 20, 2016 at 10:14 AM Robert Bradshaw <rober...@google.com.invalid> wrote: > On Thu, Oct 20, 2016 at 9:58 AM, Kenneth Knowles <k...@google.com.invalid> > wrote: > > I like the spirit of proposal #1 for addressing the critical duplication > > problem, though as Dan points out the logic to choose a related but > > collision-free name might be slightly more complex. > > > > It is a nice bonus that it addresses the less critical issues and > improves > > usability for manual inspections and interventions. > > > > The term "sibling" is being slightly misused here. I'd say #1 as proposed > > is a "sibling of the parent" while today's behavior is "sibling". I'd > say a > > root cause of multiple problems is that our sharded file format is "a > bunch > > of files next to each other" and the sibling is "other files in the same > > directory" so it takes some care, and explicit file name tracking instead > > of globbing, to work with it correctly. > > > > AFAIK (corrections welcome) there is nothing special about > > Write.to("s3://bucket/file") meaning write to > > "s3://bucket/file-$shardnum-of-$totalshards". An alternative that seems > > superior is to write to "s3://bucket/file/$shardnum-of-$totalshards" with > > the convention that this prefix is fully owned by this file. Now the > prefix > > "s3://bucket/file/" _is_ the sharded file. It is conceptually simpler and > > more glob and UI friendly. (any non "-" character would work for GCS and > > S3, but the "/" convention is better, considering the broader world) > > > > And bringing it back to this thread, the "sibling" is no longer "more > files > > in the same directory" now "s3://bucket/file-temp-$uid" which is on the > > same filesystem with the same ACLs. It is also more UI friendly, easier > to > > clean up, and does more to explicitly indicate that this is really one > > sharded file. Perhaps there's a pitfall I am overlooking? > > Using directories rather than prefixes is a big change, and introduces > complications like dealing with hidden dot files (some placed > implicitly by the system or applications, and worrying about > executable bits rather than just the rw ones and possibly more > complicated permission inheritance). > > > Also since you mentioned local file support, FWIW the cleanup glob > "file-*" > > today breaks on Windows due to Java library vagaries, while "file/*" > would > > succeed. > > On Thu, Oct 20, 2016, 09:14 Dan Halperin <dhalp...@google.com.invalid> > > wrote: > > > > This thread is conflating many issues. > > > > * Putting temp files where they will not match the glob for the desired > > output files > > * Dealing with eventually-consistent filesystems (s3, GCS, ...) > > * Properly cleaning up all temp files > > > > They all need to get solved, but for now I think we only need to solve > the > > first one. > > > > Siblings fundamentally will not work. Consider the following > > perfectly-valid output path: s3://bucket/file-SSS-NNN.txt . A sibling > would > > be a new bucket, so not guaranteed to exist. > > > > On Thu, Oct 20, 2016 at 1:57 AM, Chamikara Jayalath < > chamik...@apache.org> > > wrote: > > > >> Can this be prevented by moving temporary files (copy + delete > >> individually) at finalization instead of copying all of them and > > performing > >> a bulk delete ? You can support task failures by ignoring renames when > the > >> destination exists. Python SDK currently does this (and puts temp files > in > >> a sub-directory). > >> > >> Thanks, > >> Cham > >> > >> On Wed, Oct 19, 2016 at 6:25 PM Eugene Kirpichov > >> <kirpic...@google.com.invalid> wrote: > >> > >> Hello, > >> > >> This is a continuation of the discussion on PR > >> https://github.com/apache/incubator-beam/pull/1050 which turned out > more > >> complex than expected. > >> > >> Short summary: > >> Currently FileBasedSink, when writing to /path/to/foo (in practice, > >> /path/to/foo-xxxxx-of-yyyyy where yyyyy is the total number of output > >> files), puts temporary files into /path/to/foo-temp-$uid, and when > >> finalizing the sink, it removes the temporary files by matching the > > pattern > >> /path/to/foo-temp-* and removing everything that matches. > >> > >> There are a couple of issues with this: > >> - FileBasedSink uses IOChannelFactory, which currently supports local > >> filesystems and Google Cloud Storage (GCS). GCS's match() operation is > >> currently eventually consistent. So, it may fail to return some of the > >> files, so we won't remove them. > >> - If the Beam job is cancelled or fails midway, then the temp files > won't > >> be deleted at all (that's subject to a separate discussion on cleanup > API > > - > >> AFAIK there's no JIRA for it yet, I believe peihe@ was thinking about > this > >> and was going to file one). > >> - If a follow-up data processing job is reading /path/to/foo, then the > way > >> temp files are named, they will likely match the same glob pattern (e.g. > >> "/path/to/foo*") as the one intending to match the final output in > >> /path/to/foo, so if some temp files are leftover, the follow-up job will > >> effectively read duplicate records (some from /path/to/foo, some from > >> /path/to/foo-temp-$blah). > >> > >> I think, in the absence of a way to guarantee that all temp files will > be > >> deleted (I think it'd be very difficult or impossible to provide a hard > >> guarantee of this, considering various possible failure conditions such > as > >> zombie workers), the cleanest way to solve this is put temp files in a > >> location that's unlikely to match the same glob pattern as one that > > matches > >> the final output. > >> > >> Some options for what that could be: > >> 1. A subdirectory that is a sibling of the final path, sufficiently > > unique, > >> and unlikely to match the same glob - > >> /path/to/temp-beam-foo-$uid/$another_uid (that's the approach the PR > >> currently takes) > >> 2. A subdirectory under PipelineOptions.tempLocation - this might be > > flawed > >> because PipelineOptions.tempLocation might be on a different filesystem, > > or > >> have different ACLs, than the output of the FileBasedSink. > >> 3. A subdirectory that the user *must* explicitly provide on their > >> FileBasedSink. This is a reduction in usability, but there may be cases > >> when this is necessary - e.g. if the final location of the FileBasedSink > > is > >> such that we can't create siblings to it (e.g. the root path in a GCS > >> bucket - gs://some-bucket/) > >> 4. A subdirectory generated by a new IOChannelFactory call ("give me a > > temp > >> directory for the given final path") which would do one of the above - > >> reasonable, and simplifies FileBasedSink, but we still need to choose > > which > >> of #1-#3 this call should do. > >> > >> There might be other things I missed. There might be radical > > restructurings > >> of FileBasedSink that work around this problem entirely, though I > couldn't > >> think of any. > >> > >> In general, the requirements on the solution are: > >> - It should be very unlikely that somebody reads the temp files in the > > same > >> glob pattern as the final output by mistake. > >> - It should continue to make sense as IOChannelFactory is extended with > >> support for other filesystems. > >> - It should ideally use the same filesystem as the final output, or > > perhaps > >> even a location logically "close" to the final output, so that it could > >> potentially take advantage of that filesystem's efficient bulk-copy or > >> bulk-rename operations if available. > >> - It should be easy to manually clean up the temp files if something > went > >> wrong and they weren't cleaned up by the Beam job. > >> > >> I'm personally in favor of #1 with fallback to #2 or #3, because I > think a > >> sibling directory achieves all of these requirements unless a sibling > >> directory can't be created. > >> > >> Thoughts? > >> >