Re: Placement of temporary files by FileBasedSink

2016-10-27 Thread Chamikara Jayalath
Yeah, you are right. I was testing using 'gsutil' which behaves differently.

Thanks,
Cham

On Thu, Oct 27, 2016 at 2:06 PM Eugene Kirpichov
 wrote:

> Indeed IOChannelFactory uses GcsUtil for GCS, and GcsUtil in fact does not
> recurse into subdirectories inside a "*" pattern (see
>
> https://github.com/apache/incubator-beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java#L598
> )
> , and it does not support "**" patterns. However, this is not unit-tested
> by GcsUtilTest, which is a separate issue.
>
> On Thu, Oct 27, 2016 at 1:56 PM Eugene Kirpichov 
> wrote:
>
> > I don't think your assessment of behavior of glob patterns correct, per
> >
> https://cloud.google.com/storage/docs/gsutil/addlhelp/WildcardNames#directory-by-directory-vs-recursive-wildcards
> >  .
> > I believe (and hope) that behavior of IOChannelFactory.match() matches
> the
> > behavior of gsutil.
> >
> > On Thu, Oct 27, 2016 at 1:48 PM Chamikara Jayalath  >
> > wrote:
> >
> > BTW I'm in favor of using a sub-directory and possibly asking users to
> > update their glob pattern while also allowing users to optionally
> specify a
> > temporary path in the future, as you propose.
> >
> > Thanks,
> > Cham
> >
> > On Thu, Oct 27, 2016 at 1:45 PM Chamikara Jayalath  >
> > wrote:
> >
> > > On Thu, Oct 27, 2016 at 1:27 PM Eugene Kirpichov
> > >  wrote:
> > >
> > > Getting back to this. I noticed that the original user's job mentioned
> in
> > >
> > >
> >
> http://stackoverflow.com/questions/39822859/temp-files-remain-in-gcs-after-a-dataflow-job-succeeded
> > > is
> > > configured to write to /path/to/$date/foo-x-of-y and another
> job
> > > then reads from /path/to/$date/*, so sibling files won't work - it's
> > > necessary to put temp files either into a subdirectory, or in a
> location
> > > completely outside /path/to/$date/.
> > >
> > >
> > > I think, at least for GCS, glob pattern '/path/to/$date/*' will include
> > > files that are within any immediate sub-directory
> '/path/to/$date/uuid/'.
> > > So unless users use the pattern '/path/to/$date/foo*' they could run
> into
> > > the same issue.
> > >
> > > Thanks,
> > > Cham
> > >
> > >
> > >
> > > By the way, if we ever support recursive globs (e.g.
> /path/to/foo/**/*),
> > > then a subdirectory won't help; and if a user has another job that
> reads
> > > from, say, /path/to/**/* (without the "foo" component - e.g. if foo is
> a
> > > date, and they have a job that reads all data for all dates), then a
> > > sibling directory won't help either.
> > >
> > > I think these two cases are good motivation for allowing the user to
> > > provide a specific temp directory, as a last resort.
> > >
> > > To sum up:
> > > - in order to solve the user's problem, we need to use a directory
> > > - in the future we'll need to allow users to configure the temp
> directory
> > > on FileBasedSink.
> > >
> > > The current PR takes the "directory sibling to the write path"
> approach,
> > > and I don't see a better option that would address the needs of most
> > users
> > > automatically.
> > >
> > > Dan - you mentioned on the PR that you would prefer a subdirectory to a
> > > sibling directory, but this *is* a subdirectory (specified write path
> is
> > > /path/to/$date/foo-x-of-y and the suggested temp path is
> > > /path/to/$date/temp-beam-foo-$uid/ which is a subdirectory of the
> > directory
> > > to which the sink is writing).
> > >
> > > Any alternatives / objections to proceeding with the approach in the PR
> > > as-is?
> > >
> > > On Thu, Oct 20, 2016 at 6:26 PM Kenneth Knowles  >
> > > wrote:
> > >
> > > > @Eugene, we can make breaking changes. But if we really don't want
> to,
> > we
> > > > can add it under a new name easily. That particular inheritance
> > hierarchy
> > > > is not precious IMO.
> > > >
> > > > On Thu, Oct 20, 2016, 14:03 Eugene Kirpichov
> > >  > > > >
> > > > wrote:
> > > >
> > > > > @Cham - this addresses temporary files that were written by
> > successful
> > > > > bundles, but not by failed bundles (and not the case when the
> entire
> > > > > pipeline fails), so it's not sufficient.
> > > > >
> > > > > @Dan - yes, there are situations when it's impossible to create a
> > > > sibling.
> > > > > In that case, we'd need a fallback - either something the user
> needs
> > to
> > > > > explicitly specify ("your path is such that we don't know where to
> > > place
> > > > > temporary files, please specify withTempLocation or something"),
> or I
> > > > like
> > > > > Robert's option of using sibling but differently-named files in
> this
> > > > case.
> > > > >
> > > > > @Kenn - yeah, a directory-based format would be great
> > > > > (/path/to/foo/x-of-y), but this would be a breaking change
> to
> > > the
> > > > > expected behavior.
> > > > >
> 

Re: Placement of temporary files by FileBasedSink

2016-10-27 Thread Eugene Kirpichov
I don't think your assessment of behavior of glob patterns correct, per
https://cloud.google.com/storage/docs/gsutil/addlhelp/WildcardNames#directory-by-directory-vs-recursive-wildcards
 .
I believe (and hope) that behavior of IOChannelFactory.match() matches the
behavior of gsutil.

On Thu, Oct 27, 2016 at 1:48 PM Chamikara Jayalath 
wrote:

> BTW I'm in favor of using a sub-directory and possibly asking users to
> update their glob pattern while also allowing users to optionally specify a
> temporary path in the future, as you propose.
>
> Thanks,
> Cham
>
> On Thu, Oct 27, 2016 at 1:45 PM Chamikara Jayalath 
> wrote:
>
> > On Thu, Oct 27, 2016 at 1:27 PM Eugene Kirpichov
> >  wrote:
> >
> > Getting back to this. I noticed that the original user's job mentioned in
> >
> >
> http://stackoverflow.com/questions/39822859/temp-files-remain-in-gcs-after-a-dataflow-job-succeeded
> > is
> > configured to write to /path/to/$date/foo-x-of-y and another job
> > then reads from /path/to/$date/*, so sibling files won't work - it's
> > necessary to put temp files either into a subdirectory, or in a location
> > completely outside /path/to/$date/.
> >
> >
> > I think, at least for GCS, glob pattern '/path/to/$date/*' will include
> > files that are within any immediate sub-directory '/path/to/$date/uuid/'.
> > So unless users use the pattern '/path/to/$date/foo*' they could run into
> > the same issue.
> >
> > Thanks,
> > Cham
> >
> >
> >
> > By the way, if we ever support recursive globs (e.g. /path/to/foo/**/*),
> > then a subdirectory won't help; and if a user has another job that reads
> > from, say, /path/to/**/* (without the "foo" component - e.g. if foo is a
> > date, and they have a job that reads all data for all dates), then a
> > sibling directory won't help either.
> >
> > I think these two cases are good motivation for allowing the user to
> > provide a specific temp directory, as a last resort.
> >
> > To sum up:
> > - in order to solve the user's problem, we need to use a directory
> > - in the future we'll need to allow users to configure the temp directory
> > on FileBasedSink.
> >
> > The current PR takes the "directory sibling to the write path" approach,
> > and I don't see a better option that would address the needs of most
> users
> > automatically.
> >
> > Dan - you mentioned on the PR that you would prefer a subdirectory to a
> > sibling directory, but this *is* a subdirectory (specified write path is
> > /path/to/$date/foo-x-of-y and the suggested temp path is
> > /path/to/$date/temp-beam-foo-$uid/ which is a subdirectory of the
> directory
> > to which the sink is writing).
> >
> > Any alternatives / objections to proceeding with the approach in the PR
> > as-is?
> >
> > On Thu, Oct 20, 2016 at 6:26 PM Kenneth Knowles 
> > wrote:
> >
> > > @Eugene, we can make breaking changes. But if we really don't want to,
> we
> > > can add it under a new name easily. That particular inheritance
> hierarchy
> > > is not precious IMO.
> > >
> > > On Thu, Oct 20, 2016, 14:03 Eugene Kirpichov
> >  > > >
> > > wrote:
> > >
> > > > @Cham - this addresses temporary files that were written by
> successful
> > > > bundles, but not by failed bundles (and not the case when the entire
> > > > pipeline fails), so it's not sufficient.
> > > >
> > > > @Dan - yes, there are situations when it's impossible to create a
> > > sibling.
> > > > In that case, we'd need a fallback - either something the user needs
> to
> > > > explicitly specify ("your path is such that we don't know where to
> > place
> > > > temporary files, please specify withTempLocation or something"), or I
> > > like
> > > > Robert's option of using sibling but differently-named files in this
> > > case.
> > > >
> > > > @Kenn - yeah, a directory-based format would be great
> > > > (/path/to/foo/x-of-y), but this would be a breaking change to
> > the
> > > > expected behavior.
> > > >
> > > > I actually really like the option of sibling-but-differently-named
> > files
> > > > (/path/to/temp-beam-foo-$uid) which would be a very non-invasive
> change
> > > to
> > > > the current (/path/to/foo-temp-$uid) and indeed would not involve
> > > creating
> > > > new directories or needing new IOChannelFactory APIs. It will still
> > > match a
> > > > glob like /path/to/* though (which a user could conceivably specify
> in
> > a
> > > > situation like gs://my-logs-bucket/*), but it might be good enough.
> > > >
> > > >
> > > > On Thu, Oct 20, 2016 at 10:14 AM Robert Bradshaw
> > > >  wrote:
> > > >
> > > > > On Thu, Oct 20, 2016 at 9:58 AM, Kenneth Knowles
> > >  > > > >
> > > > > wrote:
> > > > > > I like the spirit of proposal #1 for addressing the critical
> > > > duplication
> > > > > > problem, though as Dan points out the logic to choose a related
> but

Re: Placement of temporary files by FileBasedSink

2016-10-27 Thread Chamikara Jayalath
BTW I'm in favor of using a sub-directory and possibly asking users to
update their glob pattern while also allowing users to optionally specify a
temporary path in the future, as you propose.

Thanks,
Cham

On Thu, Oct 27, 2016 at 1:45 PM Chamikara Jayalath 
wrote:

> On Thu, Oct 27, 2016 at 1:27 PM Eugene Kirpichov
>  wrote:
>
> Getting back to this. I noticed that the original user's job mentioned in
>
> http://stackoverflow.com/questions/39822859/temp-files-remain-in-gcs-after-a-dataflow-job-succeeded
> is
> configured to write to /path/to/$date/foo-x-of-y and another job
> then reads from /path/to/$date/*, so sibling files won't work - it's
> necessary to put temp files either into a subdirectory, or in a location
> completely outside /path/to/$date/.
>
>
> I think, at least for GCS, glob pattern '/path/to/$date/*' will include
> files that are within any immediate sub-directory '/path/to/$date/uuid/'.
> So unless users use the pattern '/path/to/$date/foo*' they could run into
> the same issue.
>
> Thanks,
> Cham
>
>
>
> By the way, if we ever support recursive globs (e.g. /path/to/foo/**/*),
> then a subdirectory won't help; and if a user has another job that reads
> from, say, /path/to/**/* (without the "foo" component - e.g. if foo is a
> date, and they have a job that reads all data for all dates), then a
> sibling directory won't help either.
>
> I think these two cases are good motivation for allowing the user to
> provide a specific temp directory, as a last resort.
>
> To sum up:
> - in order to solve the user's problem, we need to use a directory
> - in the future we'll need to allow users to configure the temp directory
> on FileBasedSink.
>
> The current PR takes the "directory sibling to the write path" approach,
> and I don't see a better option that would address the needs of most users
> automatically.
>
> Dan - you mentioned on the PR that you would prefer a subdirectory to a
> sibling directory, but this *is* a subdirectory (specified write path is
> /path/to/$date/foo-x-of-y and the suggested temp path is
> /path/to/$date/temp-beam-foo-$uid/ which is a subdirectory of the directory
> to which the sink is writing).
>
> Any alternatives / objections to proceeding with the approach in the PR
> as-is?
>
> On Thu, Oct 20, 2016 at 6:26 PM Kenneth Knowles 
> wrote:
>
> > @Eugene, we can make breaking changes. But if we really don't want to, we
> > can add it under a new name easily. That particular inheritance hierarchy
> > is not precious IMO.
> >
> > On Thu, Oct 20, 2016, 14:03 Eugene Kirpichov
>  > >
> > wrote:
> >
> > > @Cham - this addresses temporary files that were written by successful
> > > bundles, but not by failed bundles (and not the case when the entire
> > > pipeline fails), so it's not sufficient.
> > >
> > > @Dan - yes, there are situations when it's impossible to create a
> > sibling.
> > > In that case, we'd need a fallback - either something the user needs to
> > > explicitly specify ("your path is such that we don't know where to
> place
> > > temporary files, please specify withTempLocation or something"), or I
> > like
> > > Robert's option of using sibling but differently-named files in this
> > case.
> > >
> > > @Kenn - yeah, a directory-based format would be great
> > > (/path/to/foo/x-of-y), but this would be a breaking change to
> the
> > > expected behavior.
> > >
> > > I actually really like the option of sibling-but-differently-named
> files
> > > (/path/to/temp-beam-foo-$uid) which would be a very non-invasive change
> > to
> > > the current (/path/to/foo-temp-$uid) and indeed would not involve
> > creating
> > > new directories or needing new IOChannelFactory APIs. It will still
> > match a
> > > glob like /path/to/* though (which a user could conceivably specify in
> a
> > > situation like gs://my-logs-bucket/*), but it might be good enough.
> > >
> > >
> > > On Thu, Oct 20, 2016 at 10:14 AM Robert Bradshaw
> > >  wrote:
> > >
> > > > On Thu, Oct 20, 2016 at 9:58 AM, Kenneth Knowles
> >  > > >
> > > > wrote:
> > > > > I like the spirit of proposal #1 for addressing the critical
> > > duplication
> > > > > problem, though as Dan points out the logic to choose a related but
> > > > > collision-free name might be slightly more complex.
> > > > >
> > > > > It is a nice bonus that it addresses the less critical issues and
> > > > improves
> > > > > usability for manual inspections and interventions.
> > > > >
> > > > > The term "sibling" is being slightly misused here. I'd say #1 as
> > > proposed
> > > > > is a "sibling of the parent" while today's behavior is "sibling".
> I'd
> > > > say a
> > > > > root cause of multiple problems is that our sharded file format is
> "a
> > > > bunch
> > > > > of files next to each other" and the sibling is "other files in the
> > > same
> > 

Re: Placement of temporary files by FileBasedSink

2016-10-27 Thread Chamikara Jayalath
On Thu, Oct 27, 2016 at 1:27 PM Eugene Kirpichov
 wrote:

> Getting back to this. I noticed that the original user's job mentioned in
>
> http://stackoverflow.com/questions/39822859/temp-files-remain-in-gcs-after-a-dataflow-job-succeeded
> is
> configured to write to /path/to/$date/foo-x-of-y and another job
> then reads from /path/to/$date/*, so sibling files won't work - it's
> necessary to put temp files either into a subdirectory, or in a location
> completely outside /path/to/$date/.
>

I think, at least for GCS, glob pattern '/path/to/$date/*' will include
files that are within any immediate sub-directory '/path/to/$date/uuid/'.
So unless users use the pattern '/path/to/$date/foo*' they could run into
the same issue.

Thanks,
Cham


>
> By the way, if we ever support recursive globs (e.g. /path/to/foo/**/*),
> then a subdirectory won't help; and if a user has another job that reads
> from, say, /path/to/**/* (without the "foo" component - e.g. if foo is a
> date, and they have a job that reads all data for all dates), then a
> sibling directory won't help either.
>
> I think these two cases are good motivation for allowing the user to
> provide a specific temp directory, as a last resort.
>
> To sum up:
> - in order to solve the user's problem, we need to use a directory
> - in the future we'll need to allow users to configure the temp directory
> on FileBasedSink.
>
> The current PR takes the "directory sibling to the write path" approach,
> and I don't see a better option that would address the needs of most users
> automatically.
>
> Dan - you mentioned on the PR that you would prefer a subdirectory to a
> sibling directory, but this *is* a subdirectory (specified write path is
> /path/to/$date/foo-x-of-y and the suggested temp path is
> /path/to/$date/temp-beam-foo-$uid/ which is a subdirectory of the directory
> to which the sink is writing).
>
> Any alternatives / objections to proceeding with the approach in the PR
> as-is?
>
> On Thu, Oct 20, 2016 at 6:26 PM Kenneth Knowles 
> wrote:
>
> > @Eugene, we can make breaking changes. But if we really don't want to, we
> > can add it under a new name easily. That particular inheritance hierarchy
> > is not precious IMO.
> >
> > On Thu, Oct 20, 2016, 14:03 Eugene Kirpichov
>  > >
> > wrote:
> >
> > > @Cham - this addresses temporary files that were written by successful
> > > bundles, but not by failed bundles (and not the case when the entire
> > > pipeline fails), so it's not sufficient.
> > >
> > > @Dan - yes, there are situations when it's impossible to create a
> > sibling.
> > > In that case, we'd need a fallback - either something the user needs to
> > > explicitly specify ("your path is such that we don't know where to
> place
> > > temporary files, please specify withTempLocation or something"), or I
> > like
> > > Robert's option of using sibling but differently-named files in this
> > case.
> > >
> > > @Kenn - yeah, a directory-based format would be great
> > > (/path/to/foo/x-of-y), but this would be a breaking change to
> the
> > > expected behavior.
> > >
> > > I actually really like the option of sibling-but-differently-named
> files
> > > (/path/to/temp-beam-foo-$uid) which would be a very non-invasive change
> > to
> > > the current (/path/to/foo-temp-$uid) and indeed would not involve
> > creating
> > > new directories or needing new IOChannelFactory APIs. It will still
> > match a
> > > glob like /path/to/* though (which a user could conceivably specify in
> a
> > > situation like gs://my-logs-bucket/*), but it might be good enough.
> > >
> > >
> > > On Thu, Oct 20, 2016 at 10:14 AM Robert Bradshaw
> > >  wrote:
> > >
> > > > On Thu, Oct 20, 2016 at 9:58 AM, Kenneth Knowles
> >  > > >
> > > > wrote:
> > > > > I like the spirit of proposal #1 for addressing the critical
> > > duplication
> > > > > problem, though as Dan points out the logic to choose a related but
> > > > > collision-free name might be slightly more complex.
> > > > >
> > > > > It is a nice bonus that it addresses the less critical issues and
> > > > improves
> > > > > usability for manual inspections and interventions.
> > > > >
> > > > > The term "sibling" is being slightly misused here. I'd say #1 as
> > > proposed
> > > > > is a "sibling of the parent" while today's behavior is "sibling".
> I'd
> > > > say a
> > > > > root cause of multiple problems is that our sharded file format is
> "a
> > > > bunch
> > > > > of files next to each other" and the sibling is "other files in the
> > > same
> > > > > directory" so it takes some care, and explicit file name tracking
> > > instead
> > > > > of globbing, to work with it correctly.
> > > > >
> > > > >  AFAIK (corrections welcome) there is nothing special about
> > > > > Write.to("s3://bucket/file") meaning write to
> > > > > 

Re: Placement of temporary files by FileBasedSink

2016-10-20 Thread Kenneth Knowles
@Eugene, we can make breaking changes. But if we really don't want to, we
can add it under a new name easily. That particular inheritance hierarchy
is not precious IMO.

On Thu, Oct 20, 2016, 14:03 Eugene Kirpichov 
wrote:

> @Cham - this addresses temporary files that were written by successful
> bundles, but not by failed bundles (and not the case when the entire
> pipeline fails), so it's not sufficient.
>
> @Dan - yes, there are situations when it's impossible to create a sibling.
> In that case, we'd need a fallback - either something the user needs to
> explicitly specify ("your path is such that we don't know where to place
> temporary files, please specify withTempLocation or something"), or I like
> Robert's option of using sibling but differently-named files in this case.
>
> @Kenn - yeah, a directory-based format would be great
> (/path/to/foo/x-of-y), but this would be a breaking change to the
> expected behavior.
>
> I actually really like the option of sibling-but-differently-named files
> (/path/to/temp-beam-foo-$uid) which would be a very non-invasive change to
> the current (/path/to/foo-temp-$uid) and indeed would not involve creating
> new directories or needing new IOChannelFactory APIs. It will still match a
> glob like /path/to/* though (which a user could conceivably specify in a
> situation like gs://my-logs-bucket/*), but it might be good enough.
>
>
> On Thu, Oct 20, 2016 at 10:14 AM Robert Bradshaw
>  wrote:
>
> > On Thu, Oct 20, 2016 at 9:58 AM, Kenneth Knowles  >
> > wrote:
> > > I like the spirit of proposal #1 for addressing the critical
> duplication
> > > problem, though as Dan points out the logic to choose a related but
> > > collision-free name might be slightly more complex.
> > >
> > > It is a nice bonus that it addresses the less critical issues and
> > improves
> > > usability for manual inspections and interventions.
> > >
> > > The term "sibling" is being slightly misused here. I'd say #1 as
> proposed
> > > is a "sibling of the parent" while today's behavior is "sibling". I'd
> > say a
> > > root cause of multiple problems is that our sharded file format is "a
> > bunch
> > > of files next to each other" and the sibling is "other files in the
> same
> > > directory" so it takes some care, and explicit file name tracking
> instead
> > > of globbing, to work with it correctly.
> > >
> > >  AFAIK (corrections welcome) there is nothing special about
> > > Write.to("s3://bucket/file") meaning write to
> > > "s3://bucket/file-$shardnum-of-$totalshards". An alternative that seems
> > > superior is to write to "s3://bucket/file/$shardnum-of-$totalshards"
> with
> > > the convention that this prefix is fully owned by this file. Now the
> > prefix
> > > "s3://bucket/file/" _is_ the sharded file. It is conceptually simpler
> and
> > > more glob and UI friendly. (any non "-" character would work for GCS
> and
> > > S3, but the "/" convention is better, considering the broader world)
> > >
> > > And bringing it back to this thread, the "sibling" is no longer "more
> > files
> > > in the same directory" now "s3://bucket/file-temp-$uid" which is on the
> > > same filesystem with the same ACLs. It is also more UI friendly, easier
> > to
> > > clean up, and does more to explicitly indicate that this is really one
> > > sharded file. Perhaps there's a pitfall I am overlooking?
> >
> > Using directories rather than prefixes is a big change, and introduces
> > complications like dealing with hidden dot files (some placed
> > implicitly by the system or applications, and worrying about
> > executable bits rather than just the rw ones and possibly more
> > complicated permission inheritance).
> >
> > > Also since you mentioned local file support, FWIW the cleanup glob
> > "file-*"
> > > today breaks on Windows due to Java library vagaries, while "file/*"
> > would
> > > succeed.
> > > On Thu, Oct 20, 2016, 09:14 Dan Halperin 
> > > wrote:
> > >
> > > This thread is conflating many issues.
> > >
> > > * Putting temp files where they will not match the glob for the desired
> > > output files
> > > * Dealing with eventually-consistent filesystems (s3, GCS, ...)
> > > * Properly cleaning up all temp files
> > >
> > > They all need to get solved, but for now I think we only need to solve
> > the
> > > first one.
> > >
> > > Siblings fundamentally will not work. Consider the following
> > > perfectly-valid output path: s3://bucket/file-SSS-NNN.txt . A sibling
> > would
> > > be a new bucket, so not guaranteed to exist.
> > >
> > > On Thu, Oct 20, 2016 at 1:57 AM, Chamikara Jayalath <
> > chamik...@apache.org>
> > > wrote:
> > >
> > >> Can this be prevented by moving temporary files (copy + delete
> > >> individually) at finalization instead of copying all of them and
> > > performing
> > >> a bulk delete ? You can support task failures by ignoring renames when
> > the
> 

Re: Placement of temporary files by FileBasedSink

2016-10-20 Thread Eugene Kirpichov
@Cham - this addresses temporary files that were written by successful
bundles, but not by failed bundles (and not the case when the entire
pipeline fails), so it's not sufficient.

@Dan - yes, there are situations when it's impossible to create a sibling.
In that case, we'd need a fallback - either something the user needs to
explicitly specify ("your path is such that we don't know where to place
temporary files, please specify withTempLocation or something"), or I like
Robert's option of using sibling but differently-named files in this case.

@Kenn - yeah, a directory-based format would be great
(/path/to/foo/x-of-y), but this would be a breaking change to the
expected behavior.

I actually really like the option of sibling-but-differently-named files
(/path/to/temp-beam-foo-$uid) which would be a very non-invasive change to
the current (/path/to/foo-temp-$uid) and indeed would not involve creating
new directories or needing new IOChannelFactory APIs. It will still match a
glob like /path/to/* though (which a user could conceivably specify in a
situation like gs://my-logs-bucket/*), but it might be good enough.


On Thu, Oct 20, 2016 at 10:14 AM Robert Bradshaw
 wrote:

> On Thu, Oct 20, 2016 at 9:58 AM, Kenneth Knowles 
> wrote:
> > I like the spirit of proposal #1 for addressing the critical duplication
> > problem, though as Dan points out the logic to choose a related but
> > collision-free name might be slightly more complex.
> >
> > It is a nice bonus that it addresses the less critical issues and
> improves
> > usability for manual inspections and interventions.
> >
> > The term "sibling" is being slightly misused here. I'd say #1 as proposed
> > is a "sibling of the parent" while today's behavior is "sibling". I'd
> say a
> > root cause of multiple problems is that our sharded file format is "a
> bunch
> > of files next to each other" and the sibling is "other files in the same
> > directory" so it takes some care, and explicit file name tracking instead
> > of globbing, to work with it correctly.
> >
> >  AFAIK (corrections welcome) there is nothing special about
> > Write.to("s3://bucket/file") meaning write to
> > "s3://bucket/file-$shardnum-of-$totalshards". An alternative that seems
> > superior is to write to "s3://bucket/file/$shardnum-of-$totalshards" with
> > the convention that this prefix is fully owned by this file. Now the
> prefix
> > "s3://bucket/file/" _is_ the sharded file. It is conceptually simpler and
> > more glob and UI friendly. (any non "-" character would work for GCS and
> > S3, but the "/" convention is better, considering the broader world)
> >
> > And bringing it back to this thread, the "sibling" is no longer "more
> files
> > in the same directory" now "s3://bucket/file-temp-$uid" which is on the
> > same filesystem with the same ACLs. It is also more UI friendly, easier
> to
> > clean up, and does more to explicitly indicate that this is really one
> > sharded file. Perhaps there's a pitfall I am overlooking?
>
> Using directories rather than prefixes is a big change, and introduces
> complications like dealing with hidden dot files (some placed
> implicitly by the system or applications, and worrying about
> executable bits rather than just the rw ones and possibly more
> complicated permission inheritance).
>
> > Also since you mentioned local file support, FWIW the cleanup glob
> "file-*"
> > today breaks on Windows due to Java library vagaries, while "file/*"
> would
> > succeed.
> > On Thu, Oct 20, 2016, 09:14 Dan Halperin 
> > wrote:
> >
> > This thread is conflating many issues.
> >
> > * Putting temp files where they will not match the glob for the desired
> > output files
> > * Dealing with eventually-consistent filesystems (s3, GCS, ...)
> > * Properly cleaning up all temp files
> >
> > They all need to get solved, but for now I think we only need to solve
> the
> > first one.
> >
> > Siblings fundamentally will not work. Consider the following
> > perfectly-valid output path: s3://bucket/file-SSS-NNN.txt . A sibling
> would
> > be a new bucket, so not guaranteed to exist.
> >
> > On Thu, Oct 20, 2016 at 1:57 AM, Chamikara Jayalath <
> chamik...@apache.org>
> > wrote:
> >
> >> Can this be prevented by moving temporary files (copy + delete
> >> individually) at finalization instead of copying all of them and
> > performing
> >> a bulk delete ? You can support task failures by ignoring renames when
> the
> >> destination exists. Python SDK currently does this (and puts temp files
> in
> >> a sub-directory).
> >>
> >> Thanks,
> >> Cham
> >>
> >> On Wed, Oct 19, 2016 at 6:25 PM Eugene Kirpichov
> >>  wrote:
> >>
> >> Hello,
> >>
> >> This is a continuation of the discussion on PR
> >> https://github.com/apache/incubator-beam/pull/1050 which turned out
> more
> >> complex than expected.
> >>
> >> Short summary:
> >> Currently 

Re: Placement of temporary files by FileBasedSink

2016-10-20 Thread Robert Bradshaw
Another option would be to just use /path/to/temp-foo-$uid to avoid
matching /path/to/foo-* (hoping of course the temp- or whatever prefix
doesn't match anything).

I see #2 causing all sorts of issues, and #3 would be a significant
reduction in usability. I would lean towards doing
/path/to/temp-beam-foo-$uid/$another_uid when possible, and
/path/to/temp-beam-foo-$uid-$another_uid otherwise (note the dash
instead of the slash). The logic of determining "when possible" seems
like it belongs in IOChannelFactory not FileBasedSource.


On Thu, Oct 20, 2016 at 9:21 AM, Lukasz Cwik  wrote:
> The issue manifests when a completely different pipeline uses the output of
> the last pipeline as input to the new pipeline and then these temporary
> files are matched in the glob expression.
>
> This happens because FileBasedSource is responsible for creating the
> temporary paths which occurs while processing a bundle. If that bundle
> processing fails, there is no way to guarantee for the runner to even know
> that it existed in our current execution model.
>
> I think there are other potential solutions which require support from the
> runner that aren't being considered since this would all fall under a
> general cleanup API which Eugene referred to. The question for now is the
> solution good enough?
>
> I'm in favor of #1 as well.
>
> I'm against #4 since FileBasedSource could do a pretty good job for all
> filesystems and once there is support for cleanup, FileBasedSource could
> migrate to use it without any changes to the various IOChannelFactory's.
> This prevents us from getting to the place where Hadoop filesystem
> implementation has many many methods.
>
>
> On Thu, Oct 20, 2016 at 1:57 AM, Chamikara Jayalath 
> wrote:
>
>> Can this be prevented by moving temporary files (copy + delete
>> individually) at finalization instead of copying all of them and performing
>> a bulk delete ? You can support task failures by ignoring renames when the
>> destination exists. Python SDK currently does this (and puts temp files in
>> a sub-directory).
>>
>> Thanks,
>> Cham
>>
>> On Wed, Oct 19, 2016 at 6:25 PM Eugene Kirpichov
>>  wrote:
>>
>> Hello,
>>
>> This is a continuation of the discussion on PR
>> https://github.com/apache/incubator-beam/pull/1050 which turned out more
>> complex than expected.
>>
>> Short summary:
>> Currently FileBasedSink, when writing to /path/to/foo (in practice,
>> /path/to/foo-x-of-y where y is the total number of output
>> files), puts temporary files into /path/to/foo-temp-$uid, and when
>> finalizing the sink, it removes the temporary files by matching the pattern
>> /path/to/foo-temp-* and removing everything that matches.
>>
>> There are a couple of issues with this:
>> - FileBasedSink uses IOChannelFactory, which currently supports local
>> filesystems and Google Cloud Storage (GCS). GCS's match() operation is
>> currently eventually consistent. So, it may fail to return some of the
>> files, so we won't remove them.
>> - If the Beam job is cancelled or fails midway, then the temp files won't
>> be deleted at all (that's subject to a separate discussion on cleanup API -
>> AFAIK there's no JIRA for it yet, I believe peihe@ was thinking about this
>> and was going to file one).
>> - If a follow-up data processing job is reading /path/to/foo, then the way
>> temp files are named, they will likely match the same glob pattern (e.g.
>> "/path/to/foo*") as the one intending to match the final output in
>> /path/to/foo, so if some temp files are leftover, the follow-up job will
>> effectively read duplicate records (some from /path/to/foo, some from
>> /path/to/foo-temp-$blah).
>>
>> I think, in the absence of a way to guarantee that all temp files will be
>> deleted (I think it'd be very difficult or impossible to provide a hard
>> guarantee of this, considering various possible failure conditions such as
>> zombie workers), the cleanest way to solve this is put temp files in a
>> location that's unlikely to match the same glob pattern as one that matches
>> the final output.
>>
>> Some options for what that could be:
>> 1. A subdirectory that is a sibling of the final path, sufficiently unique,
>> and unlikely to match the same glob -
>> /path/to/temp-beam-foo-$uid/$another_uid (that's the approach the PR
>> currently takes)
>> 2. A subdirectory under PipelineOptions.tempLocation - this might be flawed
>> because PipelineOptions.tempLocation might be on a different filesystem, or
>> have different ACLs, than the output of the FileBasedSink.
>> 3. A subdirectory that the user *must* explicitly provide on their
>> FileBasedSink. This is a reduction in usability, but there may be cases
>> when this is necessary - e.g. if the final location of the FileBasedSink is
>> such that we can't create siblings to it (e.g. the root path in a GCS
>> bucket - gs://some-bucket/)
>> 4. A subdirectory generated by a new 

Re: Placement of temporary files by FileBasedSink

2016-10-20 Thread Lukasz Cwik
The issue manifests when a completely different pipeline uses the output of
the last pipeline as input to the new pipeline and then these temporary
files are matched in the glob expression.

This happens because FileBasedSource is responsible for creating the
temporary paths which occurs while processing a bundle. If that bundle
processing fails, there is no way to guarantee for the runner to even know
that it existed in our current execution model.

I think there are other potential solutions which require support from the
runner that aren't being considered since this would all fall under a
general cleanup API which Eugene referred to. The question for now is the
solution good enough?

I'm in favor of #1 as well.

I'm against #4 since FileBasedSource could do a pretty good job for all
filesystems and once there is support for cleanup, FileBasedSource could
migrate to use it without any changes to the various IOChannelFactory's.
This prevents us from getting to the place where Hadoop filesystem
implementation has many many methods.


On Thu, Oct 20, 2016 at 1:57 AM, Chamikara Jayalath 
wrote:

> Can this be prevented by moving temporary files (copy + delete
> individually) at finalization instead of copying all of them and performing
> a bulk delete ? You can support task failures by ignoring renames when the
> destination exists. Python SDK currently does this (and puts temp files in
> a sub-directory).
>
> Thanks,
> Cham
>
> On Wed, Oct 19, 2016 at 6:25 PM Eugene Kirpichov
>  wrote:
>
> Hello,
>
> This is a continuation of the discussion on PR
> https://github.com/apache/incubator-beam/pull/1050 which turned out more
> complex than expected.
>
> Short summary:
> Currently FileBasedSink, when writing to /path/to/foo (in practice,
> /path/to/foo-x-of-y where y is the total number of output
> files), puts temporary files into /path/to/foo-temp-$uid, and when
> finalizing the sink, it removes the temporary files by matching the pattern
> /path/to/foo-temp-* and removing everything that matches.
>
> There are a couple of issues with this:
> - FileBasedSink uses IOChannelFactory, which currently supports local
> filesystems and Google Cloud Storage (GCS). GCS's match() operation is
> currently eventually consistent. So, it may fail to return some of the
> files, so we won't remove them.
> - If the Beam job is cancelled or fails midway, then the temp files won't
> be deleted at all (that's subject to a separate discussion on cleanup API -
> AFAIK there's no JIRA for it yet, I believe peihe@ was thinking about this
> and was going to file one).
> - If a follow-up data processing job is reading /path/to/foo, then the way
> temp files are named, they will likely match the same glob pattern (e.g.
> "/path/to/foo*") as the one intending to match the final output in
> /path/to/foo, so if some temp files are leftover, the follow-up job will
> effectively read duplicate records (some from /path/to/foo, some from
> /path/to/foo-temp-$blah).
>
> I think, in the absence of a way to guarantee that all temp files will be
> deleted (I think it'd be very difficult or impossible to provide a hard
> guarantee of this, considering various possible failure conditions such as
> zombie workers), the cleanest way to solve this is put temp files in a
> location that's unlikely to match the same glob pattern as one that matches
> the final output.
>
> Some options for what that could be:
> 1. A subdirectory that is a sibling of the final path, sufficiently unique,
> and unlikely to match the same glob -
> /path/to/temp-beam-foo-$uid/$another_uid (that's the approach the PR
> currently takes)
> 2. A subdirectory under PipelineOptions.tempLocation - this might be flawed
> because PipelineOptions.tempLocation might be on a different filesystem, or
> have different ACLs, than the output of the FileBasedSink.
> 3. A subdirectory that the user *must* explicitly provide on their
> FileBasedSink. This is a reduction in usability, but there may be cases
> when this is necessary - e.g. if the final location of the FileBasedSink is
> such that we can't create siblings to it (e.g. the root path in a GCS
> bucket - gs://some-bucket/)
> 4. A subdirectory generated by a new IOChannelFactory call ("give me a temp
> directory for the given final path") which would do one of the above -
> reasonable, and simplifies FileBasedSink, but we still need to choose which
> of #1-#3 this call should do.
>
> There might be other things I missed. There might be radical restructurings
> of FileBasedSink that work around this problem entirely, though I couldn't
> think of any.
>
> In general, the requirements on the solution are:
> - It should be very unlikely that somebody reads the temp files in the same
> glob pattern as the final output by mistake.
> - It should continue to make sense as IOChannelFactory is extended with
> support for other filesystems.
> - It should ideally use the same 

Re: Placement of temporary files by FileBasedSink

2016-10-20 Thread Dan Halperin
This thread is conflating many issues.

* Putting temp files where they will not match the glob for the desired
output files
* Dealing with eventually-consistent filesystems (s3, GCS, ...)
* Properly cleaning up all temp files

They all need to get solved, but for now I think we only need to solve  the
first one.

Siblings fundamentally will not work. Consider the following
perfectly-valid output path: s3://bucket/file-SSS-NNN.txt . A sibling would
be a new bucket, so not guaranteed to exist.

On Thu, Oct 20, 2016 at 1:57 AM, Chamikara Jayalath 
wrote:

> Can this be prevented by moving temporary files (copy + delete
> individually) at finalization instead of copying all of them and performing
> a bulk delete ? You can support task failures by ignoring renames when the
> destination exists. Python SDK currently does this (and puts temp files in
> a sub-directory).
>
> Thanks,
> Cham
>
> On Wed, Oct 19, 2016 at 6:25 PM Eugene Kirpichov
>  wrote:
>
> Hello,
>
> This is a continuation of the discussion on PR
> https://github.com/apache/incubator-beam/pull/1050 which turned out more
> complex than expected.
>
> Short summary:
> Currently FileBasedSink, when writing to /path/to/foo (in practice,
> /path/to/foo-x-of-y where y is the total number of output
> files), puts temporary files into /path/to/foo-temp-$uid, and when
> finalizing the sink, it removes the temporary files by matching the pattern
> /path/to/foo-temp-* and removing everything that matches.
>
> There are a couple of issues with this:
> - FileBasedSink uses IOChannelFactory, which currently supports local
> filesystems and Google Cloud Storage (GCS). GCS's match() operation is
> currently eventually consistent. So, it may fail to return some of the
> files, so we won't remove them.
> - If the Beam job is cancelled or fails midway, then the temp files won't
> be deleted at all (that's subject to a separate discussion on cleanup API -
> AFAIK there's no JIRA for it yet, I believe peihe@ was thinking about this
> and was going to file one).
> - If a follow-up data processing job is reading /path/to/foo, then the way
> temp files are named, they will likely match the same glob pattern (e.g.
> "/path/to/foo*") as the one intending to match the final output in
> /path/to/foo, so if some temp files are leftover, the follow-up job will
> effectively read duplicate records (some from /path/to/foo, some from
> /path/to/foo-temp-$blah).
>
> I think, in the absence of a way to guarantee that all temp files will be
> deleted (I think it'd be very difficult or impossible to provide a hard
> guarantee of this, considering various possible failure conditions such as
> zombie workers), the cleanest way to solve this is put temp files in a
> location that's unlikely to match the same glob pattern as one that matches
> the final output.
>
> Some options for what that could be:
> 1. A subdirectory that is a sibling of the final path, sufficiently unique,
> and unlikely to match the same glob -
> /path/to/temp-beam-foo-$uid/$another_uid (that's the approach the PR
> currently takes)
> 2. A subdirectory under PipelineOptions.tempLocation - this might be flawed
> because PipelineOptions.tempLocation might be on a different filesystem, or
> have different ACLs, than the output of the FileBasedSink.
> 3. A subdirectory that the user *must* explicitly provide on their
> FileBasedSink. This is a reduction in usability, but there may be cases
> when this is necessary - e.g. if the final location of the FileBasedSink is
> such that we can't create siblings to it (e.g. the root path in a GCS
> bucket - gs://some-bucket/)
> 4. A subdirectory generated by a new IOChannelFactory call ("give me a temp
> directory for the given final path") which would do one of the above -
> reasonable, and simplifies FileBasedSink, but we still need to choose which
> of #1-#3 this call should do.
>
> There might be other things I missed. There might be radical restructurings
> of FileBasedSink that work around this problem entirely, though I couldn't
> think of any.
>
> In general, the requirements on the solution are:
> - It should be very unlikely that somebody reads the temp files in the same
> glob pattern as the final output by mistake.
> - It should continue to make sense as IOChannelFactory is extended with
> support for other filesystems.
> - It should ideally use the same filesystem as the final output, or perhaps
> even a location logically "close" to the final output, so that it could
> potentially take advantage of that filesystem's efficient bulk-copy or
> bulk-rename operations if available.
> - It should be easy to manually clean up the temp files if something went
> wrong and they weren't cleaned up by the Beam job.
>
> I'm personally in favor of #1 with fallback to #2 or #3, because I think a
> sibling directory achieves all of these requirements unless a sibling
> directory can't be created.
>
> Thoughts?
>


Re: Placement of temporary files by FileBasedSink

2016-10-20 Thread Chamikara Jayalath
Can this be prevented by moving temporary files (copy + delete
individually) at finalization instead of copying all of them and performing
a bulk delete ? You can support task failures by ignoring renames when the
destination exists. Python SDK currently does this (and puts temp files in
a sub-directory).

Thanks,
Cham

On Wed, Oct 19, 2016 at 6:25 PM Eugene Kirpichov
 wrote:

Hello,

This is a continuation of the discussion on PR
https://github.com/apache/incubator-beam/pull/1050 which turned out more
complex than expected.

Short summary:
Currently FileBasedSink, when writing to /path/to/foo (in practice,
/path/to/foo-x-of-y where y is the total number of output
files), puts temporary files into /path/to/foo-temp-$uid, and when
finalizing the sink, it removes the temporary files by matching the pattern
/path/to/foo-temp-* and removing everything that matches.

There are a couple of issues with this:
- FileBasedSink uses IOChannelFactory, which currently supports local
filesystems and Google Cloud Storage (GCS). GCS's match() operation is
currently eventually consistent. So, it may fail to return some of the
files, so we won't remove them.
- If the Beam job is cancelled or fails midway, then the temp files won't
be deleted at all (that's subject to a separate discussion on cleanup API -
AFAIK there's no JIRA for it yet, I believe peihe@ was thinking about this
and was going to file one).
- If a follow-up data processing job is reading /path/to/foo, then the way
temp files are named, they will likely match the same glob pattern (e.g.
"/path/to/foo*") as the one intending to match the final output in
/path/to/foo, so if some temp files are leftover, the follow-up job will
effectively read duplicate records (some from /path/to/foo, some from
/path/to/foo-temp-$blah).

I think, in the absence of a way to guarantee that all temp files will be
deleted (I think it'd be very difficult or impossible to provide a hard
guarantee of this, considering various possible failure conditions such as
zombie workers), the cleanest way to solve this is put temp files in a
location that's unlikely to match the same glob pattern as one that matches
the final output.

Some options for what that could be:
1. A subdirectory that is a sibling of the final path, sufficiently unique,
and unlikely to match the same glob -
/path/to/temp-beam-foo-$uid/$another_uid (that's the approach the PR
currently takes)
2. A subdirectory under PipelineOptions.tempLocation - this might be flawed
because PipelineOptions.tempLocation might be on a different filesystem, or
have different ACLs, than the output of the FileBasedSink.
3. A subdirectory that the user *must* explicitly provide on their
FileBasedSink. This is a reduction in usability, but there may be cases
when this is necessary - e.g. if the final location of the FileBasedSink is
such that we can't create siblings to it (e.g. the root path in a GCS
bucket - gs://some-bucket/)
4. A subdirectory generated by a new IOChannelFactory call ("give me a temp
directory for the given final path") which would do one of the above -
reasonable, and simplifies FileBasedSink, but we still need to choose which
of #1-#3 this call should do.

There might be other things I missed. There might be radical restructurings
of FileBasedSink that work around this problem entirely, though I couldn't
think of any.

In general, the requirements on the solution are:
- It should be very unlikely that somebody reads the temp files in the same
glob pattern as the final output by mistake.
- It should continue to make sense as IOChannelFactory is extended with
support for other filesystems.
- It should ideally use the same filesystem as the final output, or perhaps
even a location logically "close" to the final output, so that it could
potentially take advantage of that filesystem's efficient bulk-copy or
bulk-rename operations if available.
- It should be easy to manually clean up the temp files if something went
wrong and they weren't cleaned up by the Beam job.

I'm personally in favor of #1 with fallback to #2 or #3, because I think a
sibling directory achieves all of these requirements unless a sibling
directory can't be created.

Thoughts?


Placement of temporary files by FileBasedSink

2016-10-19 Thread Eugene Kirpichov
Hello,

This is a continuation of the discussion on PR
https://github.com/apache/incubator-beam/pull/1050 which turned out more
complex than expected.

Short summary:
Currently FileBasedSink, when writing to /path/to/foo (in practice,
/path/to/foo-x-of-y where y is the total number of output
files), puts temporary files into /path/to/foo-temp-$uid, and when
finalizing the sink, it removes the temporary files by matching the pattern
/path/to/foo-temp-* and removing everything that matches.

There are a couple of issues with this:
- FileBasedSink uses IOChannelFactory, which currently supports local
filesystems and Google Cloud Storage (GCS). GCS's match() operation is
currently eventually consistent. So, it may fail to return some of the
files, so we won't remove them.
- If the Beam job is cancelled or fails midway, then the temp files won't
be deleted at all (that's subject to a separate discussion on cleanup API -
AFAIK there's no JIRA for it yet, I believe peihe@ was thinking about this
and was going to file one).
- If a follow-up data processing job is reading /path/to/foo, then the way
temp files are named, they will likely match the same glob pattern (e.g.
"/path/to/foo*") as the one intending to match the final output in
/path/to/foo, so if some temp files are leftover, the follow-up job will
effectively read duplicate records (some from /path/to/foo, some from
/path/to/foo-temp-$blah).

I think, in the absence of a way to guarantee that all temp files will be
deleted (I think it'd be very difficult or impossible to provide a hard
guarantee of this, considering various possible failure conditions such as
zombie workers), the cleanest way to solve this is put temp files in a
location that's unlikely to match the same glob pattern as one that matches
the final output.

Some options for what that could be:
1. A subdirectory that is a sibling of the final path, sufficiently unique,
and unlikely to match the same glob -
/path/to/temp-beam-foo-$uid/$another_uid (that's the approach the PR
currently takes)
2. A subdirectory under PipelineOptions.tempLocation - this might be flawed
because PipelineOptions.tempLocation might be on a different filesystem, or
have different ACLs, than the output of the FileBasedSink.
3. A subdirectory that the user *must* explicitly provide on their
FileBasedSink. This is a reduction in usability, but there may be cases
when this is necessary - e.g. if the final location of the FileBasedSink is
such that we can't create siblings to it (e.g. the root path in a GCS
bucket - gs://some-bucket/)
4. A subdirectory generated by a new IOChannelFactory call ("give me a temp
directory for the given final path") which would do one of the above -
reasonable, and simplifies FileBasedSink, but we still need to choose which
of #1-#3 this call should do.

There might be other things I missed. There might be radical restructurings
of FileBasedSink that work around this problem entirely, though I couldn't
think of any.

In general, the requirements on the solution are:
- It should be very unlikely that somebody reads the temp files in the same
glob pattern as the final output by mistake.
- It should continue to make sense as IOChannelFactory is extended with
support for other filesystems.
- It should ideally use the same filesystem as the final output, or perhaps
even a location logically "close" to the final output, so that it could
potentially take advantage of that filesystem's efficient bulk-copy or
bulk-rename operations if available.
- It should be easy to manually clean up the temp files if something went
wrong and they weren't cleaned up by the Beam job.

I'm personally in favor of #1 with fallback to #2 or #3, because I think a
sibling directory achieves all of these requirements unless a sibling
directory can't be created.

Thoughts?