Re: Placement of temporary files by FileBasedSink
Yeah, you are right. I was testing using 'gsutil' which behaves differently. Thanks, Cham On Thu, Oct 27, 2016 at 2:06 PM Eugene Kirpichov wrote: > Indeed IOChannelFactory uses GcsUtil for GCS, and GcsUtil in fact does not > recurse into subdirectories inside a "*" pattern (see > > https://github.com/apache/incubator-beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java#L598 > ) > , and it does not support "**" patterns. However, this is not unit-tested > by GcsUtilTest, which is a separate issue. > > On Thu, Oct 27, 2016 at 1:56 PM Eugene Kirpichov > wrote: > > > I don't think your assessment of behavior of glob patterns correct, per > > > https://cloud.google.com/storage/docs/gsutil/addlhelp/WildcardNames#directory-by-directory-vs-recursive-wildcards > > . > > I believe (and hope) that behavior of IOChannelFactory.match() matches > the > > behavior of gsutil. > > > > On Thu, Oct 27, 2016 at 1:48 PM Chamikara Jayalath > > > wrote: > > > > BTW I'm in favor of using a sub-directory and possibly asking users to > > update their glob pattern while also allowing users to optionally > specify a > > temporary path in the future, as you propose. > > > > Thanks, > > Cham > > > > On Thu, Oct 27, 2016 at 1:45 PM Chamikara Jayalath > > > wrote: > > > > > On Thu, Oct 27, 2016 at 1:27 PM Eugene Kirpichov > > > wrote: > > > > > > Getting back to this. I noticed that the original user's job mentioned > in > > > > > > > > > http://stackoverflow.com/questions/39822859/temp-files-remain-in-gcs-after-a-dataflow-job-succeeded > > > is > > > configured to write to /path/to/$date/foo-x-of-y and another > job > > > then reads from /path/to/$date/*, so sibling files won't work - it's > > > necessary to put temp files either into a subdirectory, or in a > location > > > completely outside /path/to/$date/. > > > > > > > > > I think, at least for GCS, glob pattern '/path/to/$date/*' will include > > > files that are within any immediate sub-directory > '/path/to/$date/uuid/'. > > > So unless users use the pattern '/path/to/$date/foo*' they could run > into > > > the same issue. > > > > > > Thanks, > > > Cham > > > > > > > > > > > > By the way, if we ever support recursive globs (e.g. > /path/to/foo/**/*), > > > then a subdirectory won't help; and if a user has another job that > reads > > > from, say, /path/to/**/* (without the "foo" component - e.g. if foo is > a > > > date, and they have a job that reads all data for all dates), then a > > > sibling directory won't help either. > > > > > > I think these two cases are good motivation for allowing the user to > > > provide a specific temp directory, as a last resort. > > > > > > To sum up: > > > - in order to solve the user's problem, we need to use a directory > > > - in the future we'll need to allow users to configure the temp > directory > > > on FileBasedSink. > > > > > > The current PR takes the "directory sibling to the write path" > approach, > > > and I don't see a better option that would address the needs of most > > users > > > automatically. > > > > > > Dan - you mentioned on the PR that you would prefer a subdirectory to a > > > sibling directory, but this *is* a subdirectory (specified write path > is > > > /path/to/$date/foo-x-of-y and the suggested temp path is > > > /path/to/$date/temp-beam-foo-$uid/ which is a subdirectory of the > > directory > > > to which the sink is writing). > > > > > > Any alternatives / objections to proceeding with the approach in the PR > > > as-is? > > > > > > On Thu, Oct 20, 2016 at 6:26 PM Kenneth Knowles > > > > wrote: > > > > > > > @Eugene, we can make breaking changes. But if we really don't want > to, > > we > > > > can add it under a new name easily. That particular inheritance > > hierarchy > > > > is not precious IMO. > > > > > > > > On Thu, Oct 20, 2016, 14:03 Eugene Kirpichov > > > > > > > > > > > wrote: > > > > > > > > > @Cham - this addresses temporary files that were written by > > successful > > > > > bundles, but not by failed bundles (and not the case when the > entire > > > > > pipeline fails), so it's not sufficient. > > > > > > > > > > @Dan - yes, there are situations when it's impossible to create a > > > > sibling. > > > > > In that case, we'd need a fallback - either something the user > needs > > to > > > > > explicitly specify ("your path is such that we don't know where to > > > place > > > > > temporary files, please specify withTempLocation or something"), > or I > > > > like > > > > > Robert's option of using sibling but differently-named files in > this > > > > case. > > > > > > > > > > @Kenn - yeah, a directory-based format would be great > > > > > (/path/to/foo/x-of-y), but this would be a breaking change > to > > > the > > > > > expected behavior. > > > > > > > > > > I actually really like the option of sibling-but-differently-named > > > files > > > > > (/path/to/temp-beam-foo-$uid) which would be a very non-invasive > > change > > > > to >
Re: Placement of temporary files by FileBasedSink
Indeed IOChannelFactory uses GcsUtil for GCS, and GcsUtil in fact does not recurse into subdirectories inside a "*" pattern (see https://github.com/apache/incubator-beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java#L598) , and it does not support "**" patterns. However, this is not unit-tested by GcsUtilTest, which is a separate issue. On Thu, Oct 27, 2016 at 1:56 PM Eugene Kirpichov wrote: > I don't think your assessment of behavior of glob patterns correct, per > https://cloud.google.com/storage/docs/gsutil/addlhelp/WildcardNames#directory-by-directory-vs-recursive-wildcards > . > I believe (and hope) that behavior of IOChannelFactory.match() matches the > behavior of gsutil. > > On Thu, Oct 27, 2016 at 1:48 PM Chamikara Jayalath > wrote: > > BTW I'm in favor of using a sub-directory and possibly asking users to > update their glob pattern while also allowing users to optionally specify a > temporary path in the future, as you propose. > > Thanks, > Cham > > On Thu, Oct 27, 2016 at 1:45 PM Chamikara Jayalath > wrote: > > > On Thu, Oct 27, 2016 at 1:27 PM Eugene Kirpichov > > wrote: > > > > Getting back to this. I noticed that the original user's job mentioned in > > > > > http://stackoverflow.com/questions/39822859/temp-files-remain-in-gcs-after-a-dataflow-job-succeeded > > is > > configured to write to /path/to/$date/foo-x-of-y and another job > > then reads from /path/to/$date/*, so sibling files won't work - it's > > necessary to put temp files either into a subdirectory, or in a location > > completely outside /path/to/$date/. > > > > > > I think, at least for GCS, glob pattern '/path/to/$date/*' will include > > files that are within any immediate sub-directory '/path/to/$date/uuid/'. > > So unless users use the pattern '/path/to/$date/foo*' they could run into > > the same issue. > > > > Thanks, > > Cham > > > > > > > > By the way, if we ever support recursive globs (e.g. /path/to/foo/**/*), > > then a subdirectory won't help; and if a user has another job that reads > > from, say, /path/to/**/* (without the "foo" component - e.g. if foo is a > > date, and they have a job that reads all data for all dates), then a > > sibling directory won't help either. > > > > I think these two cases are good motivation for allowing the user to > > provide a specific temp directory, as a last resort. > > > > To sum up: > > - in order to solve the user's problem, we need to use a directory > > - in the future we'll need to allow users to configure the temp directory > > on FileBasedSink. > > > > The current PR takes the "directory sibling to the write path" approach, > > and I don't see a better option that would address the needs of most > users > > automatically. > > > > Dan - you mentioned on the PR that you would prefer a subdirectory to a > > sibling directory, but this *is* a subdirectory (specified write path is > > /path/to/$date/foo-x-of-y and the suggested temp path is > > /path/to/$date/temp-beam-foo-$uid/ which is a subdirectory of the > directory > > to which the sink is writing). > > > > Any alternatives / objections to proceeding with the approach in the PR > > as-is? > > > > On Thu, Oct 20, 2016 at 6:26 PM Kenneth Knowles > > wrote: > > > > > @Eugene, we can make breaking changes. But if we really don't want to, > we > > > can add it under a new name easily. That particular inheritance > hierarchy > > > is not precious IMO. > > > > > > On Thu, Oct 20, 2016, 14:03 Eugene Kirpichov > > > > > > > > wrote: > > > > > > > @Cham - this addresses temporary files that were written by > successful > > > > bundles, but not by failed bundles (and not the case when the entire > > > > pipeline fails), so it's not sufficient. > > > > > > > > @Dan - yes, there are situations when it's impossible to create a > > > sibling. > > > > In that case, we'd need a fallback - either something the user needs > to > > > > explicitly specify ("your path is such that we don't know where to > > place > > > > temporary files, please specify withTempLocation or something"), or I > > > like > > > > Robert's option of using sibling but differently-named files in this > > > case. > > > > > > > > @Kenn - yeah, a directory-based format would be great > > > > (/path/to/foo/x-of-y), but this would be a breaking change to > > the > > > > expected behavior. > > > > > > > > I actually really like the option of sibling-but-differently-named > > files > > > > (/path/to/temp-beam-foo-$uid) which would be a very non-invasive > change > > > to > > > > the current (/path/to/foo-temp-$uid) and indeed would not involve > > > creating > > > > new directories or needing new IOChannelFactory APIs. It will still > > > match a > > > > glob like /path/to/* though (which a user could conceivably specify > in > > a > > > > situation like gs://my-logs-bucket/*), but it might be good enough. > > > > > > > > > > > > On Thu, Oct 20, 2016 at 10:14 AM Robert Bradshaw > > > > wrote: > > > > > > >
Re: Placement of temporary files by FileBasedSink
I don't think your assessment of behavior of glob patterns correct, per https://cloud.google.com/storage/docs/gsutil/addlhelp/WildcardNames#directory-by-directory-vs-recursive-wildcards . I believe (and hope) that behavior of IOChannelFactory.match() matches the behavior of gsutil. On Thu, Oct 27, 2016 at 1:48 PM Chamikara Jayalath wrote: > BTW I'm in favor of using a sub-directory and possibly asking users to > update their glob pattern while also allowing users to optionally specify a > temporary path in the future, as you propose. > > Thanks, > Cham > > On Thu, Oct 27, 2016 at 1:45 PM Chamikara Jayalath > wrote: > > > On Thu, Oct 27, 2016 at 1:27 PM Eugene Kirpichov > > wrote: > > > > Getting back to this. I noticed that the original user's job mentioned in > > > > > http://stackoverflow.com/questions/39822859/temp-files-remain-in-gcs-after-a-dataflow-job-succeeded > > is > > configured to write to /path/to/$date/foo-x-of-y and another job > > then reads from /path/to/$date/*, so sibling files won't work - it's > > necessary to put temp files either into a subdirectory, or in a location > > completely outside /path/to/$date/. > > > > > > I think, at least for GCS, glob pattern '/path/to/$date/*' will include > > files that are within any immediate sub-directory '/path/to/$date/uuid/'. > > So unless users use the pattern '/path/to/$date/foo*' they could run into > > the same issue. > > > > Thanks, > > Cham > > > > > > > > By the way, if we ever support recursive globs (e.g. /path/to/foo/**/*), > > then a subdirectory won't help; and if a user has another job that reads > > from, say, /path/to/**/* (without the "foo" component - e.g. if foo is a > > date, and they have a job that reads all data for all dates), then a > > sibling directory won't help either. > > > > I think these two cases are good motivation for allowing the user to > > provide a specific temp directory, as a last resort. > > > > To sum up: > > - in order to solve the user's problem, we need to use a directory > > - in the future we'll need to allow users to configure the temp directory > > on FileBasedSink. > > > > The current PR takes the "directory sibling to the write path" approach, > > and I don't see a better option that would address the needs of most > users > > automatically. > > > > Dan - you mentioned on the PR that you would prefer a subdirectory to a > > sibling directory, but this *is* a subdirectory (specified write path is > > /path/to/$date/foo-x-of-y and the suggested temp path is > > /path/to/$date/temp-beam-foo-$uid/ which is a subdirectory of the > directory > > to which the sink is writing). > > > > Any alternatives / objections to proceeding with the approach in the PR > > as-is? > > > > On Thu, Oct 20, 2016 at 6:26 PM Kenneth Knowles > > wrote: > > > > > @Eugene, we can make breaking changes. But if we really don't want to, > we > > > can add it under a new name easily. That particular inheritance > hierarchy > > > is not precious IMO. > > > > > > On Thu, Oct 20, 2016, 14:03 Eugene Kirpichov > > > > > > > > wrote: > > > > > > > @Cham - this addresses temporary files that were written by > successful > > > > bundles, but not by failed bundles (and not the case when the entire > > > > pipeline fails), so it's not sufficient. > > > > > > > > @Dan - yes, there are situations when it's impossible to create a > > > sibling. > > > > In that case, we'd need a fallback - either something the user needs > to > > > > explicitly specify ("your path is such that we don't know where to > > place > > > > temporary files, please specify withTempLocation or something"), or I > > > like > > > > Robert's option of using sibling but differently-named files in this > > > case. > > > > > > > > @Kenn - yeah, a directory-based format would be great > > > > (/path/to/foo/x-of-y), but this would be a breaking change to > > the > > > > expected behavior. > > > > > > > > I actually really like the option of sibling-but-differently-named > > files > > > > (/path/to/temp-beam-foo-$uid) which would be a very non-invasive > change > > > to > > > > the current (/path/to/foo-temp-$uid) and indeed would not involve > > > creating > > > > new directories or needing new IOChannelFactory APIs. It will still > > > match a > > > > glob like /path/to/* though (which a user could conceivably specify > in > > a > > > > situation like gs://my-logs-bucket/*), but it might be good enough. > > > > > > > > > > > > On Thu, Oct 20, 2016 at 10:14 AM Robert Bradshaw > > > > wrote: > > > > > > > > > On Thu, Oct 20, 2016 at 9:58 AM, Kenneth Knowles > > > > > > > > > > > > wrote: > > > > > > I like the spirit of proposal #1 for addressing the critical > > > > duplication > > > > > > problem, though as Dan points out the logic to choose a related > but > > > > > > collision-free name might be slightly more complex. > > > > > > > > > > > > It is a nice bonus that it addresses the less critical issues and > > > > > improves > > > > > > u
Re: Placement of temporary files by FileBasedSink
BTW I'm in favor of using a sub-directory and possibly asking users to update their glob pattern while also allowing users to optionally specify a temporary path in the future, as you propose. Thanks, Cham On Thu, Oct 27, 2016 at 1:45 PM Chamikara Jayalath wrote: > On Thu, Oct 27, 2016 at 1:27 PM Eugene Kirpichov > wrote: > > Getting back to this. I noticed that the original user's job mentioned in > > http://stackoverflow.com/questions/39822859/temp-files-remain-in-gcs-after-a-dataflow-job-succeeded > is > configured to write to /path/to/$date/foo-x-of-y and another job > then reads from /path/to/$date/*, so sibling files won't work - it's > necessary to put temp files either into a subdirectory, or in a location > completely outside /path/to/$date/. > > > I think, at least for GCS, glob pattern '/path/to/$date/*' will include > files that are within any immediate sub-directory '/path/to/$date/uuid/'. > So unless users use the pattern '/path/to/$date/foo*' they could run into > the same issue. > > Thanks, > Cham > > > > By the way, if we ever support recursive globs (e.g. /path/to/foo/**/*), > then a subdirectory won't help; and if a user has another job that reads > from, say, /path/to/**/* (without the "foo" component - e.g. if foo is a > date, and they have a job that reads all data for all dates), then a > sibling directory won't help either. > > I think these two cases are good motivation for allowing the user to > provide a specific temp directory, as a last resort. > > To sum up: > - in order to solve the user's problem, we need to use a directory > - in the future we'll need to allow users to configure the temp directory > on FileBasedSink. > > The current PR takes the "directory sibling to the write path" approach, > and I don't see a better option that would address the needs of most users > automatically. > > Dan - you mentioned on the PR that you would prefer a subdirectory to a > sibling directory, but this *is* a subdirectory (specified write path is > /path/to/$date/foo-x-of-y and the suggested temp path is > /path/to/$date/temp-beam-foo-$uid/ which is a subdirectory of the directory > to which the sink is writing). > > Any alternatives / objections to proceeding with the approach in the PR > as-is? > > On Thu, Oct 20, 2016 at 6:26 PM Kenneth Knowles > wrote: > > > @Eugene, we can make breaking changes. But if we really don't want to, we > > can add it under a new name easily. That particular inheritance hierarchy > > is not precious IMO. > > > > On Thu, Oct 20, 2016, 14:03 Eugene Kirpichov > > > > > wrote: > > > > > @Cham - this addresses temporary files that were written by successful > > > bundles, but not by failed bundles (and not the case when the entire > > > pipeline fails), so it's not sufficient. > > > > > > @Dan - yes, there are situations when it's impossible to create a > > sibling. > > > In that case, we'd need a fallback - either something the user needs to > > > explicitly specify ("your path is such that we don't know where to > place > > > temporary files, please specify withTempLocation or something"), or I > > like > > > Robert's option of using sibling but differently-named files in this > > case. > > > > > > @Kenn - yeah, a directory-based format would be great > > > (/path/to/foo/x-of-y), but this would be a breaking change to > the > > > expected behavior. > > > > > > I actually really like the option of sibling-but-differently-named > files > > > (/path/to/temp-beam-foo-$uid) which would be a very non-invasive change > > to > > > the current (/path/to/foo-temp-$uid) and indeed would not involve > > creating > > > new directories or needing new IOChannelFactory APIs. It will still > > match a > > > glob like /path/to/* though (which a user could conceivably specify in > a > > > situation like gs://my-logs-bucket/*), but it might be good enough. > > > > > > > > > On Thu, Oct 20, 2016 at 10:14 AM Robert Bradshaw > > > wrote: > > > > > > > On Thu, Oct 20, 2016 at 9:58 AM, Kenneth Knowles > > > > > > > > > wrote: > > > > > I like the spirit of proposal #1 for addressing the critical > > > duplication > > > > > problem, though as Dan points out the logic to choose a related but > > > > > collision-free name might be slightly more complex. > > > > > > > > > > It is a nice bonus that it addresses the less critical issues and > > > > improves > > > > > usability for manual inspections and interventions. > > > > > > > > > > The term "sibling" is being slightly misused here. I'd say #1 as > > > proposed > > > > > is a "sibling of the parent" while today's behavior is "sibling". > I'd > > > > say a > > > > > root cause of multiple problems is that our sharded file format is > "a > > > > bunch > > > > > of files next to each other" and the sibling is "other files in the > > > same > > > > > directory" so it takes some care, and explicit file name tracking > > > instead > > > > > of globbing, to work with it correctly. > > > > > > > > > > AFAIK (
Re: Placement of temporary files by FileBasedSink
On Thu, Oct 27, 2016 at 1:27 PM Eugene Kirpichov wrote: > Getting back to this. I noticed that the original user's job mentioned in > > http://stackoverflow.com/questions/39822859/temp-files-remain-in-gcs-after-a-dataflow-job-succeeded > is > configured to write to /path/to/$date/foo-x-of-y and another job > then reads from /path/to/$date/*, so sibling files won't work - it's > necessary to put temp files either into a subdirectory, or in a location > completely outside /path/to/$date/. > I think, at least for GCS, glob pattern '/path/to/$date/*' will include files that are within any immediate sub-directory '/path/to/$date/uuid/'. So unless users use the pattern '/path/to/$date/foo*' they could run into the same issue. Thanks, Cham > > By the way, if we ever support recursive globs (e.g. /path/to/foo/**/*), > then a subdirectory won't help; and if a user has another job that reads > from, say, /path/to/**/* (without the "foo" component - e.g. if foo is a > date, and they have a job that reads all data for all dates), then a > sibling directory won't help either. > > I think these two cases are good motivation for allowing the user to > provide a specific temp directory, as a last resort. > > To sum up: > - in order to solve the user's problem, we need to use a directory > - in the future we'll need to allow users to configure the temp directory > on FileBasedSink. > > The current PR takes the "directory sibling to the write path" approach, > and I don't see a better option that would address the needs of most users > automatically. > > Dan - you mentioned on the PR that you would prefer a subdirectory to a > sibling directory, but this *is* a subdirectory (specified write path is > /path/to/$date/foo-x-of-y and the suggested temp path is > /path/to/$date/temp-beam-foo-$uid/ which is a subdirectory of the directory > to which the sink is writing). > > Any alternatives / objections to proceeding with the approach in the PR > as-is? > > On Thu, Oct 20, 2016 at 6:26 PM Kenneth Knowles > wrote: > > > @Eugene, we can make breaking changes. But if we really don't want to, we > > can add it under a new name easily. That particular inheritance hierarchy > > is not precious IMO. > > > > On Thu, Oct 20, 2016, 14:03 Eugene Kirpichov > > > > > wrote: > > > > > @Cham - this addresses temporary files that were written by successful > > > bundles, but not by failed bundles (and not the case when the entire > > > pipeline fails), so it's not sufficient. > > > > > > @Dan - yes, there are situations when it's impossible to create a > > sibling. > > > In that case, we'd need a fallback - either something the user needs to > > > explicitly specify ("your path is such that we don't know where to > place > > > temporary files, please specify withTempLocation or something"), or I > > like > > > Robert's option of using sibling but differently-named files in this > > case. > > > > > > @Kenn - yeah, a directory-based format would be great > > > (/path/to/foo/x-of-y), but this would be a breaking change to > the > > > expected behavior. > > > > > > I actually really like the option of sibling-but-differently-named > files > > > (/path/to/temp-beam-foo-$uid) which would be a very non-invasive change > > to > > > the current (/path/to/foo-temp-$uid) and indeed would not involve > > creating > > > new directories or needing new IOChannelFactory APIs. It will still > > match a > > > glob like /path/to/* though (which a user could conceivably specify in > a > > > situation like gs://my-logs-bucket/*), but it might be good enough. > > > > > > > > > On Thu, Oct 20, 2016 at 10:14 AM Robert Bradshaw > > > wrote: > > > > > > > On Thu, Oct 20, 2016 at 9:58 AM, Kenneth Knowles > > > > > > > > > wrote: > > > > > I like the spirit of proposal #1 for addressing the critical > > > duplication > > > > > problem, though as Dan points out the logic to choose a related but > > > > > collision-free name might be slightly more complex. > > > > > > > > > > It is a nice bonus that it addresses the less critical issues and > > > > improves > > > > > usability for manual inspections and interventions. > > > > > > > > > > The term "sibling" is being slightly misused here. I'd say #1 as > > > proposed > > > > > is a "sibling of the parent" while today's behavior is "sibling". > I'd > > > > say a > > > > > root cause of multiple problems is that our sharded file format is > "a > > > > bunch > > > > > of files next to each other" and the sibling is "other files in the > > > same > > > > > directory" so it takes some care, and explicit file name tracking > > > instead > > > > > of globbing, to work with it correctly. > > > > > > > > > > AFAIK (corrections welcome) there is nothing special about > > > > > Write.to("s3://bucket/file") meaning write to > > > > > "s3://bucket/file-$shardnum-of-$totalshards". An alternative that > > seems > > > > > superior is to write to > "s3://bucket/file/$shardnum-of-$totalshards" > > > with > >
Re: Placement of temporary files by FileBasedSink
Getting back to this. I noticed that the original user's job mentioned in http://stackoverflow.com/questions/39822859/temp-files-remain-in-gcs-after-a-dataflow-job-succeeded is configured to write to /path/to/$date/foo-x-of-y and another job then reads from /path/to/$date/*, so sibling files won't work - it's necessary to put temp files either into a subdirectory, or in a location completely outside /path/to/$date/. By the way, if we ever support recursive globs (e.g. /path/to/foo/**/*), then a subdirectory won't help; and if a user has another job that reads from, say, /path/to/**/* (without the "foo" component - e.g. if foo is a date, and they have a job that reads all data for all dates), then a sibling directory won't help either. I think these two cases are good motivation for allowing the user to provide a specific temp directory, as a last resort. To sum up: - in order to solve the user's problem, we need to use a directory - in the future we'll need to allow users to configure the temp directory on FileBasedSink. The current PR takes the "directory sibling to the write path" approach, and I don't see a better option that would address the needs of most users automatically. Dan - you mentioned on the PR that you would prefer a subdirectory to a sibling directory, but this *is* a subdirectory (specified write path is /path/to/$date/foo-x-of-y and the suggested temp path is /path/to/$date/temp-beam-foo-$uid/ which is a subdirectory of the directory to which the sink is writing). Any alternatives / objections to proceeding with the approach in the PR as-is? On Thu, Oct 20, 2016 at 6:26 PM Kenneth Knowles wrote: > @Eugene, we can make breaking changes. But if we really don't want to, we > can add it under a new name easily. That particular inheritance hierarchy > is not precious IMO. > > On Thu, Oct 20, 2016, 14:03 Eugene Kirpichov > > wrote: > > > @Cham - this addresses temporary files that were written by successful > > bundles, but not by failed bundles (and not the case when the entire > > pipeline fails), so it's not sufficient. > > > > @Dan - yes, there are situations when it's impossible to create a > sibling. > > In that case, we'd need a fallback - either something the user needs to > > explicitly specify ("your path is such that we don't know where to place > > temporary files, please specify withTempLocation or something"), or I > like > > Robert's option of using sibling but differently-named files in this > case. > > > > @Kenn - yeah, a directory-based format would be great > > (/path/to/foo/x-of-y), but this would be a breaking change to the > > expected behavior. > > > > I actually really like the option of sibling-but-differently-named files > > (/path/to/temp-beam-foo-$uid) which would be a very non-invasive change > to > > the current (/path/to/foo-temp-$uid) and indeed would not involve > creating > > new directories or needing new IOChannelFactory APIs. It will still > match a > > glob like /path/to/* though (which a user could conceivably specify in a > > situation like gs://my-logs-bucket/*), but it might be good enough. > > > > > > On Thu, Oct 20, 2016 at 10:14 AM Robert Bradshaw > > wrote: > > > > > On Thu, Oct 20, 2016 at 9:58 AM, Kenneth Knowles > > > > > > wrote: > > > > I like the spirit of proposal #1 for addressing the critical > > duplication > > > > problem, though as Dan points out the logic to choose a related but > > > > collision-free name might be slightly more complex. > > > > > > > > It is a nice bonus that it addresses the less critical issues and > > > improves > > > > usability for manual inspections and interventions. > > > > > > > > The term "sibling" is being slightly misused here. I'd say #1 as > > proposed > > > > is a "sibling of the parent" while today's behavior is "sibling". I'd > > > say a > > > > root cause of multiple problems is that our sharded file format is "a > > > bunch > > > > of files next to each other" and the sibling is "other files in the > > same > > > > directory" so it takes some care, and explicit file name tracking > > instead > > > > of globbing, to work with it correctly. > > > > > > > > AFAIK (corrections welcome) there is nothing special about > > > > Write.to("s3://bucket/file") meaning write to > > > > "s3://bucket/file-$shardnum-of-$totalshards". An alternative that > seems > > > > superior is to write to "s3://bucket/file/$shardnum-of-$totalshards" > > with > > > > the convention that this prefix is fully owned by this file. Now the > > > prefix > > > > "s3://bucket/file/" _is_ the sharded file. It is conceptually simpler > > and > > > > more glob and UI friendly. (any non "-" character would work for GCS > > and > > > > S3, but the "/" convention is better, considering the broader world) > > > > > > > > And bringing it back to this thread, the "sibling" is no longer "more > > > files > > > > in the same directory" now "s3://bucket/file-temp-$uid" which is on > the > > > > same filesystem
Re: Placement of temporary files by FileBasedSink
@Eugene, we can make breaking changes. But if we really don't want to, we can add it under a new name easily. That particular inheritance hierarchy is not precious IMO. On Thu, Oct 20, 2016, 14:03 Eugene Kirpichov wrote: > @Cham - this addresses temporary files that were written by successful > bundles, but not by failed bundles (and not the case when the entire > pipeline fails), so it's not sufficient. > > @Dan - yes, there are situations when it's impossible to create a sibling. > In that case, we'd need a fallback - either something the user needs to > explicitly specify ("your path is such that we don't know where to place > temporary files, please specify withTempLocation or something"), or I like > Robert's option of using sibling but differently-named files in this case. > > @Kenn - yeah, a directory-based format would be great > (/path/to/foo/x-of-y), but this would be a breaking change to the > expected behavior. > > I actually really like the option of sibling-but-differently-named files > (/path/to/temp-beam-foo-$uid) which would be a very non-invasive change to > the current (/path/to/foo-temp-$uid) and indeed would not involve creating > new directories or needing new IOChannelFactory APIs. It will still match a > glob like /path/to/* though (which a user could conceivably specify in a > situation like gs://my-logs-bucket/*), but it might be good enough. > > > On Thu, Oct 20, 2016 at 10:14 AM Robert Bradshaw > wrote: > > > On Thu, Oct 20, 2016 at 9:58 AM, Kenneth Knowles > > > wrote: > > > I like the spirit of proposal #1 for addressing the critical > duplication > > > problem, though as Dan points out the logic to choose a related but > > > collision-free name might be slightly more complex. > > > > > > It is a nice bonus that it addresses the less critical issues and > > improves > > > usability for manual inspections and interventions. > > > > > > The term "sibling" is being slightly misused here. I'd say #1 as > proposed > > > is a "sibling of the parent" while today's behavior is "sibling". I'd > > say a > > > root cause of multiple problems is that our sharded file format is "a > > bunch > > > of files next to each other" and the sibling is "other files in the > same > > > directory" so it takes some care, and explicit file name tracking > instead > > > of globbing, to work with it correctly. > > > > > > AFAIK (corrections welcome) there is nothing special about > > > Write.to("s3://bucket/file") meaning write to > > > "s3://bucket/file-$shardnum-of-$totalshards". An alternative that seems > > > superior is to write to "s3://bucket/file/$shardnum-of-$totalshards" > with > > > the convention that this prefix is fully owned by this file. Now the > > prefix > > > "s3://bucket/file/" _is_ the sharded file. It is conceptually simpler > and > > > more glob and UI friendly. (any non "-" character would work for GCS > and > > > S3, but the "/" convention is better, considering the broader world) > > > > > > And bringing it back to this thread, the "sibling" is no longer "more > > files > > > in the same directory" now "s3://bucket/file-temp-$uid" which is on the > > > same filesystem with the same ACLs. It is also more UI friendly, easier > > to > > > clean up, and does more to explicitly indicate that this is really one > > > sharded file. Perhaps there's a pitfall I am overlooking? > > > > Using directories rather than prefixes is a big change, and introduces > > complications like dealing with hidden dot files (some placed > > implicitly by the system or applications, and worrying about > > executable bits rather than just the rw ones and possibly more > > complicated permission inheritance). > > > > > Also since you mentioned local file support, FWIW the cleanup glob > > "file-*" > > > today breaks on Windows due to Java library vagaries, while "file/*" > > would > > > succeed. > > > On Thu, Oct 20, 2016, 09:14 Dan Halperin > > > wrote: > > > > > > This thread is conflating many issues. > > > > > > * Putting temp files where they will not match the glob for the desired > > > output files > > > * Dealing with eventually-consistent filesystems (s3, GCS, ...) > > > * Properly cleaning up all temp files > > > > > > They all need to get solved, but for now I think we only need to solve > > the > > > first one. > > > > > > Siblings fundamentally will not work. Consider the following > > > perfectly-valid output path: s3://bucket/file-SSS-NNN.txt . A sibling > > would > > > be a new bucket, so not guaranteed to exist. > > > > > > On Thu, Oct 20, 2016 at 1:57 AM, Chamikara Jayalath < > > chamik...@apache.org> > > > wrote: > > > > > >> Can this be prevented by moving temporary files (copy + delete > > >> individually) at finalization instead of copying all of them and > > > performing > > >> a bulk delete ? You can support task failures by ignoring renames when > > the > > >> destination exists. Python SDK currently does this (and puts temp > files > > in > > >> a sub-directory). > >
Re: Placement of temporary files by FileBasedSink
@Cham - this addresses temporary files that were written by successful bundles, but not by failed bundles (and not the case when the entire pipeline fails), so it's not sufficient. @Dan - yes, there are situations when it's impossible to create a sibling. In that case, we'd need a fallback - either something the user needs to explicitly specify ("your path is such that we don't know where to place temporary files, please specify withTempLocation or something"), or I like Robert's option of using sibling but differently-named files in this case. @Kenn - yeah, a directory-based format would be great (/path/to/foo/x-of-y), but this would be a breaking change to the expected behavior. I actually really like the option of sibling-but-differently-named files (/path/to/temp-beam-foo-$uid) which would be a very non-invasive change to the current (/path/to/foo-temp-$uid) and indeed would not involve creating new directories or needing new IOChannelFactory APIs. It will still match a glob like /path/to/* though (which a user could conceivably specify in a situation like gs://my-logs-bucket/*), but it might be good enough. On Thu, Oct 20, 2016 at 10:14 AM Robert Bradshaw wrote: > On Thu, Oct 20, 2016 at 9:58 AM, Kenneth Knowles > wrote: > > I like the spirit of proposal #1 for addressing the critical duplication > > problem, though as Dan points out the logic to choose a related but > > collision-free name might be slightly more complex. > > > > It is a nice bonus that it addresses the less critical issues and > improves > > usability for manual inspections and interventions. > > > > The term "sibling" is being slightly misused here. I'd say #1 as proposed > > is a "sibling of the parent" while today's behavior is "sibling". I'd > say a > > root cause of multiple problems is that our sharded file format is "a > bunch > > of files next to each other" and the sibling is "other files in the same > > directory" so it takes some care, and explicit file name tracking instead > > of globbing, to work with it correctly. > > > > AFAIK (corrections welcome) there is nothing special about > > Write.to("s3://bucket/file") meaning write to > > "s3://bucket/file-$shardnum-of-$totalshards". An alternative that seems > > superior is to write to "s3://bucket/file/$shardnum-of-$totalshards" with > > the convention that this prefix is fully owned by this file. Now the > prefix > > "s3://bucket/file/" _is_ the sharded file. It is conceptually simpler and > > more glob and UI friendly. (any non "-" character would work for GCS and > > S3, but the "/" convention is better, considering the broader world) > > > > And bringing it back to this thread, the "sibling" is no longer "more > files > > in the same directory" now "s3://bucket/file-temp-$uid" which is on the > > same filesystem with the same ACLs. It is also more UI friendly, easier > to > > clean up, and does more to explicitly indicate that this is really one > > sharded file. Perhaps there's a pitfall I am overlooking? > > Using directories rather than prefixes is a big change, and introduces > complications like dealing with hidden dot files (some placed > implicitly by the system or applications, and worrying about > executable bits rather than just the rw ones and possibly more > complicated permission inheritance). > > > Also since you mentioned local file support, FWIW the cleanup glob > "file-*" > > today breaks on Windows due to Java library vagaries, while "file/*" > would > > succeed. > > On Thu, Oct 20, 2016, 09:14 Dan Halperin > > wrote: > > > > This thread is conflating many issues. > > > > * Putting temp files where they will not match the glob for the desired > > output files > > * Dealing with eventually-consistent filesystems (s3, GCS, ...) > > * Properly cleaning up all temp files > > > > They all need to get solved, but for now I think we only need to solve > the > > first one. > > > > Siblings fundamentally will not work. Consider the following > > perfectly-valid output path: s3://bucket/file-SSS-NNN.txt . A sibling > would > > be a new bucket, so not guaranteed to exist. > > > > On Thu, Oct 20, 2016 at 1:57 AM, Chamikara Jayalath < > chamik...@apache.org> > > wrote: > > > >> Can this be prevented by moving temporary files (copy + delete > >> individually) at finalization instead of copying all of them and > > performing > >> a bulk delete ? You can support task failures by ignoring renames when > the > >> destination exists. Python SDK currently does this (and puts temp files > in > >> a sub-directory). > >> > >> Thanks, > >> Cham > >> > >> On Wed, Oct 19, 2016 at 6:25 PM Eugene Kirpichov > >> wrote: > >> > >> Hello, > >> > >> This is a continuation of the discussion on PR > >> https://github.com/apache/incubator-beam/pull/1050 which turned out > more > >> complex than expected. > >> > >> Short summary: > >> Currently FileBasedSink, when writing to /path/to/foo (in practice, > >> /path/to/foo-x-of-y where y is the total number of
Re: Placement of temporary files by FileBasedSink
On Thu, Oct 20, 2016 at 9:58 AM, Kenneth Knowles wrote: > I like the spirit of proposal #1 for addressing the critical duplication > problem, though as Dan points out the logic to choose a related but > collision-free name might be slightly more complex. > > It is a nice bonus that it addresses the less critical issues and improves > usability for manual inspections and interventions. > > The term "sibling" is being slightly misused here. I'd say #1 as proposed > is a "sibling of the parent" while today's behavior is "sibling". I'd say a > root cause of multiple problems is that our sharded file format is "a bunch > of files next to each other" and the sibling is "other files in the same > directory" so it takes some care, and explicit file name tracking instead > of globbing, to work with it correctly. > > AFAIK (corrections welcome) there is nothing special about > Write.to("s3://bucket/file") meaning write to > "s3://bucket/file-$shardnum-of-$totalshards". An alternative that seems > superior is to write to "s3://bucket/file/$shardnum-of-$totalshards" with > the convention that this prefix is fully owned by this file. Now the prefix > "s3://bucket/file/" _is_ the sharded file. It is conceptually simpler and > more glob and UI friendly. (any non "-" character would work for GCS and > S3, but the "/" convention is better, considering the broader world) > > And bringing it back to this thread, the "sibling" is no longer "more files > in the same directory" now "s3://bucket/file-temp-$uid" which is on the > same filesystem with the same ACLs. It is also more UI friendly, easier to > clean up, and does more to explicitly indicate that this is really one > sharded file. Perhaps there's a pitfall I am overlooking? Using directories rather than prefixes is a big change, and introduces complications like dealing with hidden dot files (some placed implicitly by the system or applications, and worrying about executable bits rather than just the rw ones and possibly more complicated permission inheritance). > Also since you mentioned local file support, FWIW the cleanup glob "file-*" > today breaks on Windows due to Java library vagaries, while "file/*" would > succeed. > On Thu, Oct 20, 2016, 09:14 Dan Halperin > wrote: > > This thread is conflating many issues. > > * Putting temp files where they will not match the glob for the desired > output files > * Dealing with eventually-consistent filesystems (s3, GCS, ...) > * Properly cleaning up all temp files > > They all need to get solved, but for now I think we only need to solve the > first one. > > Siblings fundamentally will not work. Consider the following > perfectly-valid output path: s3://bucket/file-SSS-NNN.txt . A sibling would > be a new bucket, so not guaranteed to exist. > > On Thu, Oct 20, 2016 at 1:57 AM, Chamikara Jayalath > wrote: > >> Can this be prevented by moving temporary files (copy + delete >> individually) at finalization instead of copying all of them and > performing >> a bulk delete ? You can support task failures by ignoring renames when the >> destination exists. Python SDK currently does this (and puts temp files in >> a sub-directory). >> >> Thanks, >> Cham >> >> On Wed, Oct 19, 2016 at 6:25 PM Eugene Kirpichov >> wrote: >> >> Hello, >> >> This is a continuation of the discussion on PR >> https://github.com/apache/incubator-beam/pull/1050 which turned out more >> complex than expected. >> >> Short summary: >> Currently FileBasedSink, when writing to /path/to/foo (in practice, >> /path/to/foo-x-of-y where y is the total number of output >> files), puts temporary files into /path/to/foo-temp-$uid, and when >> finalizing the sink, it removes the temporary files by matching the > pattern >> /path/to/foo-temp-* and removing everything that matches. >> >> There are a couple of issues with this: >> - FileBasedSink uses IOChannelFactory, which currently supports local >> filesystems and Google Cloud Storage (GCS). GCS's match() operation is >> currently eventually consistent. So, it may fail to return some of the >> files, so we won't remove them. >> - If the Beam job is cancelled or fails midway, then the temp files won't >> be deleted at all (that's subject to a separate discussion on cleanup API > - >> AFAIK there's no JIRA for it yet, I believe peihe@ was thinking about this >> and was going to file one). >> - If a follow-up data processing job is reading /path/to/foo, then the way >> temp files are named, they will likely match the same glob pattern (e.g. >> "/path/to/foo*") as the one intending to match the final output in >> /path/to/foo, so if some temp files are leftover, the follow-up job will >> effectively read duplicate records (some from /path/to/foo, some from >> /path/to/foo-temp-$blah). >> >> I think, in the absence of a way to guarantee that all temp files will be >> deleted (I think it'd be very difficult or impossible to provide a hard >> guarantee of this, considering various possible failure conditio
Re: Placement of temporary files by FileBasedSink
I like the spirit of proposal #1 for addressing the critical duplication problem, though as Dan points out the logic to choose a related but collision-free name might be slightly more complex. It is a nice bonus that it addresses the less critical issues and improves usability for manual inspections and interventions. The term "sibling" is being slightly misused here. I'd say #1 as proposed is a "sibling of the parent" while today's behavior is "sibling". I'd say a root cause of multiple problems is that our sharded file format is "a bunch of files next to each other" and the sibling is "other files in the same directory" so it takes some care, and explicit file name tracking instead of globbing, to work with it correctly. AFAIK (corrections welcome) there is nothing special about Write.to("s3://bucket/file") meaning write to "s3://bucket/file-$shardnum-of-$totalshards". An alternative that seems superior is to write to "s3://bucket/file/$shardnum-of-$totalshards" with the convention that this prefix is fully owned by this file. Now the prefix "s3://bucket/file/" _is_ the sharded file. It is conceptually simpler and more glob and UI friendly. (any non "-" character would work for GCS and S3, but the "/" convention is better, considering the broader world) And bringing it back to this thread, the "sibling" is no longer "more files in the same directory" now "s3://bucket/file-temp-$uid" which is on the same filesystem with the same ACLs. It is also more UI friendly, easier to clean up, and does more to explicitly indicate that this is really one sharded file. Perhaps there's a pitfall I am overlooking? Also since you mentioned local file support, FWIW the cleanup glob "file-*" today breaks on Windows due to Java library vagaries, while "file/*" would succeed. On Thu, Oct 20, 2016, 09:14 Dan Halperin wrote: This thread is conflating many issues. * Putting temp files where they will not match the glob for the desired output files * Dealing with eventually-consistent filesystems (s3, GCS, ...) * Properly cleaning up all temp files They all need to get solved, but for now I think we only need to solve the first one. Siblings fundamentally will not work. Consider the following perfectly-valid output path: s3://bucket/file-SSS-NNN.txt . A sibling would be a new bucket, so not guaranteed to exist. On Thu, Oct 20, 2016 at 1:57 AM, Chamikara Jayalath wrote: > Can this be prevented by moving temporary files (copy + delete > individually) at finalization instead of copying all of them and performing > a bulk delete ? You can support task failures by ignoring renames when the > destination exists. Python SDK currently does this (and puts temp files in > a sub-directory). > > Thanks, > Cham > > On Wed, Oct 19, 2016 at 6:25 PM Eugene Kirpichov > wrote: > > Hello, > > This is a continuation of the discussion on PR > https://github.com/apache/incubator-beam/pull/1050 which turned out more > complex than expected. > > Short summary: > Currently FileBasedSink, when writing to /path/to/foo (in practice, > /path/to/foo-x-of-y where y is the total number of output > files), puts temporary files into /path/to/foo-temp-$uid, and when > finalizing the sink, it removes the temporary files by matching the pattern > /path/to/foo-temp-* and removing everything that matches. > > There are a couple of issues with this: > - FileBasedSink uses IOChannelFactory, which currently supports local > filesystems and Google Cloud Storage (GCS). GCS's match() operation is > currently eventually consistent. So, it may fail to return some of the > files, so we won't remove them. > - If the Beam job is cancelled or fails midway, then the temp files won't > be deleted at all (that's subject to a separate discussion on cleanup API - > AFAIK there's no JIRA for it yet, I believe peihe@ was thinking about this > and was going to file one). > - If a follow-up data processing job is reading /path/to/foo, then the way > temp files are named, they will likely match the same glob pattern (e.g. > "/path/to/foo*") as the one intending to match the final output in > /path/to/foo, so if some temp files are leftover, the follow-up job will > effectively read duplicate records (some from /path/to/foo, some from > /path/to/foo-temp-$blah). > > I think, in the absence of a way to guarantee that all temp files will be > deleted (I think it'd be very difficult or impossible to provide a hard > guarantee of this, considering various possible failure conditions such as > zombie workers), the cleanest way to solve this is put temp files in a > location that's unlikely to match the same glob pattern as one that matches > the final output. > > Some options for what that could be: > 1. A subdirectory that is a sibling of the final path, sufficiently unique, > and unlikely to match the same glob - > /path/to/temp-beam-foo-$uid/$another_uid (that's the approach the PR > currently takes) > 2. A subdirectory under PipelineOptions.tempLocation - this mi
Re: Placement of temporary files by FileBasedSink
Another option would be to just use /path/to/temp-foo-$uid to avoid matching /path/to/foo-* (hoping of course the temp- or whatever prefix doesn't match anything). I see #2 causing all sorts of issues, and #3 would be a significant reduction in usability. I would lean towards doing /path/to/temp-beam-foo-$uid/$another_uid when possible, and /path/to/temp-beam-foo-$uid-$another_uid otherwise (note the dash instead of the slash). The logic of determining "when possible" seems like it belongs in IOChannelFactory not FileBasedSource. On Thu, Oct 20, 2016 at 9:21 AM, Lukasz Cwik wrote: > The issue manifests when a completely different pipeline uses the output of > the last pipeline as input to the new pipeline and then these temporary > files are matched in the glob expression. > > This happens because FileBasedSource is responsible for creating the > temporary paths which occurs while processing a bundle. If that bundle > processing fails, there is no way to guarantee for the runner to even know > that it existed in our current execution model. > > I think there are other potential solutions which require support from the > runner that aren't being considered since this would all fall under a > general cleanup API which Eugene referred to. The question for now is the > solution good enough? > > I'm in favor of #1 as well. > > I'm against #4 since FileBasedSource could do a pretty good job for all > filesystems and once there is support for cleanup, FileBasedSource could > migrate to use it without any changes to the various IOChannelFactory's. > This prevents us from getting to the place where Hadoop filesystem > implementation has many many methods. > > > On Thu, Oct 20, 2016 at 1:57 AM, Chamikara Jayalath > wrote: > >> Can this be prevented by moving temporary files (copy + delete >> individually) at finalization instead of copying all of them and performing >> a bulk delete ? You can support task failures by ignoring renames when the >> destination exists. Python SDK currently does this (and puts temp files in >> a sub-directory). >> >> Thanks, >> Cham >> >> On Wed, Oct 19, 2016 at 6:25 PM Eugene Kirpichov >> wrote: >> >> Hello, >> >> This is a continuation of the discussion on PR >> https://github.com/apache/incubator-beam/pull/1050 which turned out more >> complex than expected. >> >> Short summary: >> Currently FileBasedSink, when writing to /path/to/foo (in practice, >> /path/to/foo-x-of-y where y is the total number of output >> files), puts temporary files into /path/to/foo-temp-$uid, and when >> finalizing the sink, it removes the temporary files by matching the pattern >> /path/to/foo-temp-* and removing everything that matches. >> >> There are a couple of issues with this: >> - FileBasedSink uses IOChannelFactory, which currently supports local >> filesystems and Google Cloud Storage (GCS). GCS's match() operation is >> currently eventually consistent. So, it may fail to return some of the >> files, so we won't remove them. >> - If the Beam job is cancelled or fails midway, then the temp files won't >> be deleted at all (that's subject to a separate discussion on cleanup API - >> AFAIK there's no JIRA for it yet, I believe peihe@ was thinking about this >> and was going to file one). >> - If a follow-up data processing job is reading /path/to/foo, then the way >> temp files are named, they will likely match the same glob pattern (e.g. >> "/path/to/foo*") as the one intending to match the final output in >> /path/to/foo, so if some temp files are leftover, the follow-up job will >> effectively read duplicate records (some from /path/to/foo, some from >> /path/to/foo-temp-$blah). >> >> I think, in the absence of a way to guarantee that all temp files will be >> deleted (I think it'd be very difficult or impossible to provide a hard >> guarantee of this, considering various possible failure conditions such as >> zombie workers), the cleanest way to solve this is put temp files in a >> location that's unlikely to match the same glob pattern as one that matches >> the final output. >> >> Some options for what that could be: >> 1. A subdirectory that is a sibling of the final path, sufficiently unique, >> and unlikely to match the same glob - >> /path/to/temp-beam-foo-$uid/$another_uid (that's the approach the PR >> currently takes) >> 2. A subdirectory under PipelineOptions.tempLocation - this might be flawed >> because PipelineOptions.tempLocation might be on a different filesystem, or >> have different ACLs, than the output of the FileBasedSink. >> 3. A subdirectory that the user *must* explicitly provide on their >> FileBasedSink. This is a reduction in usability, but there may be cases >> when this is necessary - e.g. if the final location of the FileBasedSink is >> such that we can't create siblings to it (e.g. the root path in a GCS >> bucket - gs://some-bucket/) >> 4. A subdirectory generated by a new IOChannelFactory call ("give me a temp >> directory for the given final path")
Re: Placement of temporary files by FileBasedSink
The issue manifests when a completely different pipeline uses the output of the last pipeline as input to the new pipeline and then these temporary files are matched in the glob expression. This happens because FileBasedSource is responsible for creating the temporary paths which occurs while processing a bundle. If that bundle processing fails, there is no way to guarantee for the runner to even know that it existed in our current execution model. I think there are other potential solutions which require support from the runner that aren't being considered since this would all fall under a general cleanup API which Eugene referred to. The question for now is the solution good enough? I'm in favor of #1 as well. I'm against #4 since FileBasedSource could do a pretty good job for all filesystems and once there is support for cleanup, FileBasedSource could migrate to use it without any changes to the various IOChannelFactory's. This prevents us from getting to the place where Hadoop filesystem implementation has many many methods. On Thu, Oct 20, 2016 at 1:57 AM, Chamikara Jayalath wrote: > Can this be prevented by moving temporary files (copy + delete > individually) at finalization instead of copying all of them and performing > a bulk delete ? You can support task failures by ignoring renames when the > destination exists. Python SDK currently does this (and puts temp files in > a sub-directory). > > Thanks, > Cham > > On Wed, Oct 19, 2016 at 6:25 PM Eugene Kirpichov > wrote: > > Hello, > > This is a continuation of the discussion on PR > https://github.com/apache/incubator-beam/pull/1050 which turned out more > complex than expected. > > Short summary: > Currently FileBasedSink, when writing to /path/to/foo (in practice, > /path/to/foo-x-of-y where y is the total number of output > files), puts temporary files into /path/to/foo-temp-$uid, and when > finalizing the sink, it removes the temporary files by matching the pattern > /path/to/foo-temp-* and removing everything that matches. > > There are a couple of issues with this: > - FileBasedSink uses IOChannelFactory, which currently supports local > filesystems and Google Cloud Storage (GCS). GCS's match() operation is > currently eventually consistent. So, it may fail to return some of the > files, so we won't remove them. > - If the Beam job is cancelled or fails midway, then the temp files won't > be deleted at all (that's subject to a separate discussion on cleanup API - > AFAIK there's no JIRA for it yet, I believe peihe@ was thinking about this > and was going to file one). > - If a follow-up data processing job is reading /path/to/foo, then the way > temp files are named, they will likely match the same glob pattern (e.g. > "/path/to/foo*") as the one intending to match the final output in > /path/to/foo, so if some temp files are leftover, the follow-up job will > effectively read duplicate records (some from /path/to/foo, some from > /path/to/foo-temp-$blah). > > I think, in the absence of a way to guarantee that all temp files will be > deleted (I think it'd be very difficult or impossible to provide a hard > guarantee of this, considering various possible failure conditions such as > zombie workers), the cleanest way to solve this is put temp files in a > location that's unlikely to match the same glob pattern as one that matches > the final output. > > Some options for what that could be: > 1. A subdirectory that is a sibling of the final path, sufficiently unique, > and unlikely to match the same glob - > /path/to/temp-beam-foo-$uid/$another_uid (that's the approach the PR > currently takes) > 2. A subdirectory under PipelineOptions.tempLocation - this might be flawed > because PipelineOptions.tempLocation might be on a different filesystem, or > have different ACLs, than the output of the FileBasedSink. > 3. A subdirectory that the user *must* explicitly provide on their > FileBasedSink. This is a reduction in usability, but there may be cases > when this is necessary - e.g. if the final location of the FileBasedSink is > such that we can't create siblings to it (e.g. the root path in a GCS > bucket - gs://some-bucket/) > 4. A subdirectory generated by a new IOChannelFactory call ("give me a temp > directory for the given final path") which would do one of the above - > reasonable, and simplifies FileBasedSink, but we still need to choose which > of #1-#3 this call should do. > > There might be other things I missed. There might be radical restructurings > of FileBasedSink that work around this problem entirely, though I couldn't > think of any. > > In general, the requirements on the solution are: > - It should be very unlikely that somebody reads the temp files in the same > glob pattern as the final output by mistake. > - It should continue to make sense as IOChannelFactory is extended with > support for other filesystems. > - It should ideally use the same filesystem as the final output, or perhaps > even a location
Re: Placement of temporary files by FileBasedSink
This thread is conflating many issues. * Putting temp files where they will not match the glob for the desired output files * Dealing with eventually-consistent filesystems (s3, GCS, ...) * Properly cleaning up all temp files They all need to get solved, but for now I think we only need to solve the first one. Siblings fundamentally will not work. Consider the following perfectly-valid output path: s3://bucket/file-SSS-NNN.txt . A sibling would be a new bucket, so not guaranteed to exist. On Thu, Oct 20, 2016 at 1:57 AM, Chamikara Jayalath wrote: > Can this be prevented by moving temporary files (copy + delete > individually) at finalization instead of copying all of them and performing > a bulk delete ? You can support task failures by ignoring renames when the > destination exists. Python SDK currently does this (and puts temp files in > a sub-directory). > > Thanks, > Cham > > On Wed, Oct 19, 2016 at 6:25 PM Eugene Kirpichov > wrote: > > Hello, > > This is a continuation of the discussion on PR > https://github.com/apache/incubator-beam/pull/1050 which turned out more > complex than expected. > > Short summary: > Currently FileBasedSink, when writing to /path/to/foo (in practice, > /path/to/foo-x-of-y where y is the total number of output > files), puts temporary files into /path/to/foo-temp-$uid, and when > finalizing the sink, it removes the temporary files by matching the pattern > /path/to/foo-temp-* and removing everything that matches. > > There are a couple of issues with this: > - FileBasedSink uses IOChannelFactory, which currently supports local > filesystems and Google Cloud Storage (GCS). GCS's match() operation is > currently eventually consistent. So, it may fail to return some of the > files, so we won't remove them. > - If the Beam job is cancelled or fails midway, then the temp files won't > be deleted at all (that's subject to a separate discussion on cleanup API - > AFAIK there's no JIRA for it yet, I believe peihe@ was thinking about this > and was going to file one). > - If a follow-up data processing job is reading /path/to/foo, then the way > temp files are named, they will likely match the same glob pattern (e.g. > "/path/to/foo*") as the one intending to match the final output in > /path/to/foo, so if some temp files are leftover, the follow-up job will > effectively read duplicate records (some from /path/to/foo, some from > /path/to/foo-temp-$blah). > > I think, in the absence of a way to guarantee that all temp files will be > deleted (I think it'd be very difficult or impossible to provide a hard > guarantee of this, considering various possible failure conditions such as > zombie workers), the cleanest way to solve this is put temp files in a > location that's unlikely to match the same glob pattern as one that matches > the final output. > > Some options for what that could be: > 1. A subdirectory that is a sibling of the final path, sufficiently unique, > and unlikely to match the same glob - > /path/to/temp-beam-foo-$uid/$another_uid (that's the approach the PR > currently takes) > 2. A subdirectory under PipelineOptions.tempLocation - this might be flawed > because PipelineOptions.tempLocation might be on a different filesystem, or > have different ACLs, than the output of the FileBasedSink. > 3. A subdirectory that the user *must* explicitly provide on their > FileBasedSink. This is a reduction in usability, but there may be cases > when this is necessary - e.g. if the final location of the FileBasedSink is > such that we can't create siblings to it (e.g. the root path in a GCS > bucket - gs://some-bucket/) > 4. A subdirectory generated by a new IOChannelFactory call ("give me a temp > directory for the given final path") which would do one of the above - > reasonable, and simplifies FileBasedSink, but we still need to choose which > of #1-#3 this call should do. > > There might be other things I missed. There might be radical restructurings > of FileBasedSink that work around this problem entirely, though I couldn't > think of any. > > In general, the requirements on the solution are: > - It should be very unlikely that somebody reads the temp files in the same > glob pattern as the final output by mistake. > - It should continue to make sense as IOChannelFactory is extended with > support for other filesystems. > - It should ideally use the same filesystem as the final output, or perhaps > even a location logically "close" to the final output, so that it could > potentially take advantage of that filesystem's efficient bulk-copy or > bulk-rename operations if available. > - It should be easy to manually clean up the temp files if something went > wrong and they weren't cleaned up by the Beam job. > > I'm personally in favor of #1 with fallback to #2 or #3, because I think a > sibling directory achieves all of these requirements unless a sibling > directory can't be created. > > Thoughts? >
Re: Placement of temporary files by FileBasedSink
Can this be prevented by moving temporary files (copy + delete individually) at finalization instead of copying all of them and performing a bulk delete ? You can support task failures by ignoring renames when the destination exists. Python SDK currently does this (and puts temp files in a sub-directory). Thanks, Cham On Wed, Oct 19, 2016 at 6:25 PM Eugene Kirpichov wrote: Hello, This is a continuation of the discussion on PR https://github.com/apache/incubator-beam/pull/1050 which turned out more complex than expected. Short summary: Currently FileBasedSink, when writing to /path/to/foo (in practice, /path/to/foo-x-of-y where y is the total number of output files), puts temporary files into /path/to/foo-temp-$uid, and when finalizing the sink, it removes the temporary files by matching the pattern /path/to/foo-temp-* and removing everything that matches. There are a couple of issues with this: - FileBasedSink uses IOChannelFactory, which currently supports local filesystems and Google Cloud Storage (GCS). GCS's match() operation is currently eventually consistent. So, it may fail to return some of the files, so we won't remove them. - If the Beam job is cancelled or fails midway, then the temp files won't be deleted at all (that's subject to a separate discussion on cleanup API - AFAIK there's no JIRA for it yet, I believe peihe@ was thinking about this and was going to file one). - If a follow-up data processing job is reading /path/to/foo, then the way temp files are named, they will likely match the same glob pattern (e.g. "/path/to/foo*") as the one intending to match the final output in /path/to/foo, so if some temp files are leftover, the follow-up job will effectively read duplicate records (some from /path/to/foo, some from /path/to/foo-temp-$blah). I think, in the absence of a way to guarantee that all temp files will be deleted (I think it'd be very difficult or impossible to provide a hard guarantee of this, considering various possible failure conditions such as zombie workers), the cleanest way to solve this is put temp files in a location that's unlikely to match the same glob pattern as one that matches the final output. Some options for what that could be: 1. A subdirectory that is a sibling of the final path, sufficiently unique, and unlikely to match the same glob - /path/to/temp-beam-foo-$uid/$another_uid (that's the approach the PR currently takes) 2. A subdirectory under PipelineOptions.tempLocation - this might be flawed because PipelineOptions.tempLocation might be on a different filesystem, or have different ACLs, than the output of the FileBasedSink. 3. A subdirectory that the user *must* explicitly provide on their FileBasedSink. This is a reduction in usability, but there may be cases when this is necessary - e.g. if the final location of the FileBasedSink is such that we can't create siblings to it (e.g. the root path in a GCS bucket - gs://some-bucket/) 4. A subdirectory generated by a new IOChannelFactory call ("give me a temp directory for the given final path") which would do one of the above - reasonable, and simplifies FileBasedSink, but we still need to choose which of #1-#3 this call should do. There might be other things I missed. There might be radical restructurings of FileBasedSink that work around this problem entirely, though I couldn't think of any. In general, the requirements on the solution are: - It should be very unlikely that somebody reads the temp files in the same glob pattern as the final output by mistake. - It should continue to make sense as IOChannelFactory is extended with support for other filesystems. - It should ideally use the same filesystem as the final output, or perhaps even a location logically "close" to the final output, so that it could potentially take advantage of that filesystem's efficient bulk-copy or bulk-rename operations if available. - It should be easy to manually clean up the temp files if something went wrong and they weren't cleaned up by the Beam job. I'm personally in favor of #1 with fallback to #2 or #3, because I think a sibling directory achieves all of these requirements unless a sibling directory can't be created. Thoughts?
Placement of temporary files by FileBasedSink
Hello, This is a continuation of the discussion on PR https://github.com/apache/incubator-beam/pull/1050 which turned out more complex than expected. Short summary: Currently FileBasedSink, when writing to /path/to/foo (in practice, /path/to/foo-x-of-y where y is the total number of output files), puts temporary files into /path/to/foo-temp-$uid, and when finalizing the sink, it removes the temporary files by matching the pattern /path/to/foo-temp-* and removing everything that matches. There are a couple of issues with this: - FileBasedSink uses IOChannelFactory, which currently supports local filesystems and Google Cloud Storage (GCS). GCS's match() operation is currently eventually consistent. So, it may fail to return some of the files, so we won't remove them. - If the Beam job is cancelled or fails midway, then the temp files won't be deleted at all (that's subject to a separate discussion on cleanup API - AFAIK there's no JIRA for it yet, I believe peihe@ was thinking about this and was going to file one). - If a follow-up data processing job is reading /path/to/foo, then the way temp files are named, they will likely match the same glob pattern (e.g. "/path/to/foo*") as the one intending to match the final output in /path/to/foo, so if some temp files are leftover, the follow-up job will effectively read duplicate records (some from /path/to/foo, some from /path/to/foo-temp-$blah). I think, in the absence of a way to guarantee that all temp files will be deleted (I think it'd be very difficult or impossible to provide a hard guarantee of this, considering various possible failure conditions such as zombie workers), the cleanest way to solve this is put temp files in a location that's unlikely to match the same glob pattern as one that matches the final output. Some options for what that could be: 1. A subdirectory that is a sibling of the final path, sufficiently unique, and unlikely to match the same glob - /path/to/temp-beam-foo-$uid/$another_uid (that's the approach the PR currently takes) 2. A subdirectory under PipelineOptions.tempLocation - this might be flawed because PipelineOptions.tempLocation might be on a different filesystem, or have different ACLs, than the output of the FileBasedSink. 3. A subdirectory that the user *must* explicitly provide on their FileBasedSink. This is a reduction in usability, but there may be cases when this is necessary - e.g. if the final location of the FileBasedSink is such that we can't create siblings to it (e.g. the root path in a GCS bucket - gs://some-bucket/) 4. A subdirectory generated by a new IOChannelFactory call ("give me a temp directory for the given final path") which would do one of the above - reasonable, and simplifies FileBasedSink, but we still need to choose which of #1-#3 this call should do. There might be other things I missed. There might be radical restructurings of FileBasedSink that work around this problem entirely, though I couldn't think of any. In general, the requirements on the solution are: - It should be very unlikely that somebody reads the temp files in the same glob pattern as the final output by mistake. - It should continue to make sense as IOChannelFactory is extended with support for other filesystems. - It should ideally use the same filesystem as the final output, or perhaps even a location logically "close" to the final output, so that it could potentially take advantage of that filesystem's efficient bulk-copy or bulk-rename operations if available. - It should be easy to manually clean up the temp files if something went wrong and they weren't cleaned up by the Beam job. I'm personally in favor of #1 with fallback to #2 or #3, because I think a sibling directory achieves all of these requirements unless a sibling directory can't be created. Thoughts?