Re: Proposal: file-based IOs should support readAllMatches()

2017-08-18 Thread Chamikara Jayalath
+1 for this. Also it looks like IO authors should be able to use existing
'ReadAllViaFileBasedSource' transform when implementing
FooIO.readAllMatches().

- Cham

On Fri, Aug 18, 2017 at 2:38 PM Eugene Kirpichov
 wrote:

> Hi all,
>
> I've been adding new features to TextIO and AvroIO recently, see e.g.
> https://github.com/apache/beam/pull/3725. The features are:
> - withHintMatchesManyFiles()
> - readAll() that reads a PCollection of filepatterns
> - configurable treatment of filepatterns that match no files
> - watchForNewFiles() that incrementally watches for new files matching the
> filepatterns
>
> However, these features also make sense for other file-based IOs
> (TFRecordIO, XmlIO, the in-review WholeFileIO), and adding them explicitly
> to each of these requires a lot of boilerplate and reeks of lack of
> modularity. I don't want to add this much duplicated code to each of them,
> nor to require authors of new such IOs to add it.
>
> Note that all of these features are available on the recently added
> Match.filepatterns() transform, that converts a PCollection to a
> PCollection (file path and size). The boilerplate in
> file-based IOs ends up simply passing on the properties to the Match
> transform.
>
> Because of this, I'd like to propose the following recommendation for
> file-based IOs:
> A file-based FooIO should include:
> - A read transform that reads the data from a filepattern specified at
> pipeline construction time - FooIO.read().from(filepattern) or something
> analogous, as a PCollection
> - A transform FooIO.readAllMatches() that converts a PCollection
> to PCollection
>
> Then FooIO.read() handles the common case, and the user can solve all
> advanced cases by combining Match.filepatterns() with
> FooIO.readAllMatches():
>
> // Read files in a filepattern but don't fail if it's empty
> PCollection foos = p.apply(Create.of(myFilepattern))
>.apply(Match.filepatterns().withEmptyMatchTreatment(ALLOW))
>.apply(FooIO.readAllMatches());
>
> // Read new filepatterns arriving over PubSub, and for each filepattern
> // continuously watch for new files matching it, polling every 1 minute
> // and stop polling a filepattern if no new files appear for 5 minutes
> PCollection filepatterns = p.apply(PubsubIO.readStrings()...)
> PCollection foos = p.apply(Create.of(myFilepattern))
>.apply(Match.filepatterns().continuously(
>   Duration.standardMinutes(1),
> afterTimeSinceNewOutput(Duration.standardMinutes(5
>.apply(FooIO.readAllMatches());
>
> Adding explicit support for these configuration options to FooIO.read(),
> and adding a FooIO.readAll() should be optional.
>
> WDYT?
>


Re: Proposal: file-based IOs should support readAllMatches()

2017-08-25 Thread Eugene Kirpichov
I think I have a somewhat better proposal that encompasses this and
WholeFileIO.

I'm already moving Match.filepatterns() into FileIO.match()/matchAll(), and
I'd like to create FileIO.read(): PCollection ->
PCollection, potentially configurable by .withCompression().

Here, ReadableFile will be a new type that looks like this:

class ReadableFile {
  ResourceId getResourceId();
  long getSizeBytes();
  Compression getCompression();
  boolean isReadSeekEfficient();

  ReadableByteChannel open();
  byte[] readFullyAsBytes();
  String readFullyAsString();
}

Under the hood it contains just the first 4 members, and others are just
utility functions. This is basically the current proposed
WholeFileIO.read() but seemingly under a more conveniently packaged
interface.

And then I'd expect all file-based IOs to support a version
BarIO.readAllFiles(): PCollection -> PCollection

Example usage with a hypothetical BarIO:

PCollection bars =

p.apply(FileIO.match().filepattern("...").continuously(...).withEmptyMatchTreatment(DISALLOW))
.apply(FileIO.read().withCompression(AUTO))
.apply(BarIO.readAllFiles());

BarIO.readAllFiles() under the hood may also split the ReadableFile's into
chunks, or even use an SDF.

Essentially this is a proposal to solve all use cases that involve reading
unsplittable file formats (because users can just add a ParDo on top of the
ReadableFile), as well as rid all future file-based IOs of boilerplate
associated with duplicating parameters like .continuously(),
.withCompression() and such.

On Fri, Aug 18, 2017 at 5:06 PM Chamikara Jayalath 
wrote:

> +1 for this. Also it looks like IO authors should be able to use existing
> 'ReadAllViaFileBasedSource' transform when implementing
> FooIO.readAllMatches().
>
> - Cham
>
> On Fri, Aug 18, 2017 at 2:38 PM Eugene Kirpichov
>  wrote:
>
> > Hi all,
> >
> > I've been adding new features to TextIO and AvroIO recently, see e.g.
> > https://github.com/apache/beam/pull/3725. The features are:
> > - withHintMatchesManyFiles()
> > - readAll() that reads a PCollection of filepatterns
> > - configurable treatment of filepatterns that match no files
> > - watchForNewFiles() that incrementally watches for new files matching
> the
> > filepatterns
> >
> > However, these features also make sense for other file-based IOs
> > (TFRecordIO, XmlIO, the in-review WholeFileIO), and adding them
> explicitly
> > to each of these requires a lot of boilerplate and reeks of lack of
> > modularity. I don't want to add this much duplicated code to each of
> them,
> > nor to require authors of new such IOs to add it.
> >
> > Note that all of these features are available on the recently added
> > Match.filepatterns() transform, that converts a PCollection to a
> > PCollection (file path and size). The boilerplate
> in
> > file-based IOs ends up simply passing on the properties to the Match
> > transform.
> >
> > Because of this, I'd like to propose the following recommendation for
> > file-based IOs:
> > A file-based FooIO should include:
> > - A read transform that reads the data from a filepattern specified at
> > pipeline construction time - FooIO.read().from(filepattern) or something
> > analogous, as a PCollection
> > - A transform FooIO.readAllMatches() that converts a
> PCollection
> > to PCollection
> >
> > Then FooIO.read() handles the common case, and the user can solve all
> > advanced cases by combining Match.filepatterns() with
> > FooIO.readAllMatches():
> >
> > // Read files in a filepattern but don't fail if it's empty
> > PCollection foos = p.apply(Create.of(myFilepattern))
> >.apply(Match.filepatterns().withEmptyMatchTreatment(ALLOW))
> >.apply(FooIO.readAllMatches());
> >
> > // Read new filepatterns arriving over PubSub, and for each filepattern
> > // continuously watch for new files matching it, polling every 1 minute
> > // and stop polling a filepattern if no new files appear for 5 minutes
> > PCollection filepatterns = p.apply(PubsubIO.readStrings()...)
> > PCollection foos = p.apply(Create.of(myFilepattern))
> >.apply(Match.filepatterns().continuously(
> >   Duration.standardMinutes(1),
> > afterTimeSinceNewOutput(Duration.standardMinutes(5
> >.apply(FooIO.readAllMatches());
> >
> > Adding explicit support for these configuration options to FooIO.read(),
> > and adding a FooIO.readAll() should be optional.
> >
> > WDYT?
> >
>


Re: Proposal: file-based IOs should support readAllMatches()

2017-08-28 Thread Etienne Chauchot

Hi Eugene,

+1 to this, it is nice to add this common behavior to all the file-based 
IOs. I find the design elegant, I just have one minor API comment, I 
would prefer


p.apply(FooIO.read().from(filepattern))

to

p.apply(Create.of(filepattern))

IMHO, it is more readable and analogous to the other APIs.

Etienne

Le 18/08/2017 à 23:38, Eugene Kirpichov a écrit :

Hi all,

I've been adding new features to TextIO and AvroIO recently, see e.g.
https://github.com/apache/beam/pull/3725. The features are:
- withHintMatchesManyFiles()
- readAll() that reads a PCollection of filepatterns
- configurable treatment of filepatterns that match no files
- watchForNewFiles() that incrementally watches for new files matching the
filepatterns

However, these features also make sense for other file-based IOs
(TFRecordIO, XmlIO, the in-review WholeFileIO), and adding them explicitly
to each of these requires a lot of boilerplate and reeks of lack of
modularity. I don't want to add this much duplicated code to each of them,
nor to require authors of new such IOs to add it.

Note that all of these features are available on the recently added
Match.filepatterns() transform, that converts a PCollection to a
PCollection (file path and size). The boilerplate in
file-based IOs ends up simply passing on the properties to the Match
transform.

Because of this, I'd like to propose the following recommendation for
file-based IOs:
A file-based FooIO should include:
- A read transform that reads the data from a filepattern specified at
pipeline construction time - FooIO.read().from(filepattern) or something
analogous, as a PCollection
- A transform FooIO.readAllMatches() that converts a PCollection
to PCollection

Then FooIO.read() handles the common case, and the user can solve all
advanced cases by combining Match.filepatterns() with
FooIO.readAllMatches():

// Read files in a filepattern but don't fail if it's empty
PCollection foos = p.apply(Create.of(myFilepattern))
.apply(Match.filepatterns().withEmptyMatchTreatment(ALLOW))
.apply(FooIO.readAllMatches());

// Read new filepatterns arriving over PubSub, and for each filepattern
// continuously watch for new files matching it, polling every 1 minute
// and stop polling a filepattern if no new files appear for 5 minutes
PCollection filepatterns = p.apply(PubsubIO.readStrings()...)
PCollection foos = p.apply(Create.of(myFilepattern))
.apply(Match.filepatterns().continuously(
   Duration.standardMinutes(1),
afterTimeSinceNewOutput(Duration.standardMinutes(5
.apply(FooIO.readAllMatches());

Adding explicit support for these configuration options to FooIO.read(),
and adding a FooIO.readAll() should be optional.

WDYT?





Re: Proposal: file-based IOs should support readAllMatches()

2017-08-28 Thread Eugene Kirpichov
Thanks. I think I agree that file-based IOs (at least widely used ones)
should, for convenience, still provide FooIO.read().from(filepattern), and
for performance until SDF has full support in all runners, implement it via
a BoundedSource.

The second case with Create.of(filepattern) illustrates when the
filepattern is not known at construction time but rather there's a
collection of filepatterns: it's a separate use case.

On Mon, Aug 28, 2017 at 2:23 AM Etienne Chauchot 
wrote:

> Hi Eugene,
>
> +1 to this, it is nice to add this common behavior to all the file-based
> IOs. I find the design elegant, I just have one minor API comment, I
> would prefer
>
> p.apply(FooIO.read().from(filepattern))
>
> to
>
> p.apply(Create.of(filepattern))
>
> IMHO, it is more readable and analogous to the other APIs.
>
> Etienne
>
> Le 18/08/2017 à 23:38, Eugene Kirpichov a écrit :
> > Hi all,
> >
> > I've been adding new features to TextIO and AvroIO recently, see e.g.
> > https://github.com/apache/beam/pull/3725. The features are:
> > - withHintMatchesManyFiles()
> > - readAll() that reads a PCollection of filepatterns
> > - configurable treatment of filepatterns that match no files
> > - watchForNewFiles() that incrementally watches for new files matching
> the
> > filepatterns
> >
> > However, these features also make sense for other file-based IOs
> > (TFRecordIO, XmlIO, the in-review WholeFileIO), and adding them
> explicitly
> > to each of these requires a lot of boilerplate and reeks of lack of
> > modularity. I don't want to add this much duplicated code to each of
> them,
> > nor to require authors of new such IOs to add it.
> >
> > Note that all of these features are available on the recently added
> > Match.filepatterns() transform, that converts a PCollection to a
> > PCollection (file path and size). The boilerplate
> in
> > file-based IOs ends up simply passing on the properties to the Match
> > transform.
> >
> > Because of this, I'd like to propose the following recommendation for
> > file-based IOs:
> > A file-based FooIO should include:
> > - A read transform that reads the data from a filepattern specified at
> > pipeline construction time - FooIO.read().from(filepattern) or something
> > analogous, as a PCollection
> > - A transform FooIO.readAllMatches() that converts a
> PCollection
> > to PCollection
> >
> > Then FooIO.read() handles the common case, and the user can solve all
> > advanced cases by combining Match.filepatterns() with
> > FooIO.readAllMatches():
> >
> > // Read files in a filepattern but don't fail if it's empty
> > PCollection foos = p.apply(Create.of(myFilepattern))
> > .apply(Match.filepatterns().withEmptyMatchTreatment(ALLOW))
> > .apply(FooIO.readAllMatches());
> >
> > // Read new filepatterns arriving over PubSub, and for each filepattern
> > // continuously watch for new files matching it, polling every 1 minute
> > // and stop polling a filepattern if no new files appear for 5 minutes
> > PCollection filepatterns = p.apply(PubsubIO.readStrings()...)
> > PCollection foos = p.apply(Create.of(myFilepattern))
> > .apply(Match.filepatterns().continuously(
> >Duration.standardMinutes(1),
> > afterTimeSinceNewOutput(Duration.standardMinutes(5
> > .apply(FooIO.readAllMatches());
> >
> > Adding explicit support for these configuration options to FooIO.read(),
> > and adding a FooIO.readAll() should be optional.
> >
> > WDYT?
> >
>
>


Re: Proposal: file-based IOs should support readAllMatches()

2017-08-31 Thread Eugene Kirpichov
I sent a PR about this all:
https://github.com/apache/beam/pull/3799

On Mon, Aug 28, 2017 at 8:45 AM Eugene Kirpichov 
wrote:

> Thanks. I think I agree that file-based IOs (at least widely used ones)
> should, for convenience, still provide FooIO.read().from(filepattern), and
> for performance until SDF has full support in all runners, implement it via
> a BoundedSource.
>
> The second case with Create.of(filepattern) illustrates when the
> filepattern is not known at construction time but rather there's a
> collection of filepatterns: it's a separate use case.
>
> On Mon, Aug 28, 2017 at 2:23 AM Etienne Chauchot 
> wrote:
>
>> Hi Eugene,
>>
>> +1 to this, it is nice to add this common behavior to all the file-based
>> IOs. I find the design elegant, I just have one minor API comment, I
>> would prefer
>>
>> p.apply(FooIO.read().from(filepattern))
>>
>> to
>>
>> p.apply(Create.of(filepattern))
>>
>> IMHO, it is more readable and analogous to the other APIs.
>>
>> Etienne
>>
>> Le 18/08/2017 à 23:38, Eugene Kirpichov a écrit :
>> > Hi all,
>> >
>> > I've been adding new features to TextIO and AvroIO recently, see e.g.
>> > https://github.com/apache/beam/pull/3725. The features are:
>> > - withHintMatchesManyFiles()
>> > - readAll() that reads a PCollection of filepatterns
>> > - configurable treatment of filepatterns that match no files
>> > - watchForNewFiles() that incrementally watches for new files matching
>> the
>> > filepatterns
>> >
>> > However, these features also make sense for other file-based IOs
>> > (TFRecordIO, XmlIO, the in-review WholeFileIO), and adding them
>> explicitly
>> > to each of these requires a lot of boilerplate and reeks of lack of
>> > modularity. I don't want to add this much duplicated code to each of
>> them,
>> > nor to require authors of new such IOs to add it.
>> >
>> > Note that all of these features are available on the recently added
>> > Match.filepatterns() transform, that converts a PCollection to a
>> > PCollection (file path and size). The boilerplate
>> in
>> > file-based IOs ends up simply passing on the properties to the Match
>> > transform.
>> >
>> > Because of this, I'd like to propose the following recommendation for
>> > file-based IOs:
>> > A file-based FooIO should include:
>> > - A read transform that reads the data from a filepattern specified at
>> > pipeline construction time - FooIO.read().from(filepattern) or something
>> > analogous, as a PCollection
>> > - A transform FooIO.readAllMatches() that converts a
>> PCollection
>> > to PCollection
>> >
>> > Then FooIO.read() handles the common case, and the user can solve all
>> > advanced cases by combining Match.filepatterns() with
>> > FooIO.readAllMatches():
>> >
>> > // Read files in a filepattern but don't fail if it's empty
>> > PCollection foos = p.apply(Create.of(myFilepattern))
>> > .apply(Match.filepatterns().withEmptyMatchTreatment(ALLOW))
>> > .apply(FooIO.readAllMatches());
>> >
>> > // Read new filepatterns arriving over PubSub, and for each filepattern
>> > // continuously watch for new files matching it, polling every 1 minute
>> > // and stop polling a filepattern if no new files appear for 5 minutes
>> > PCollection filepatterns = p.apply(PubsubIO.readStrings()...)
>> > PCollection foos = p.apply(Create.of(myFilepattern))
>> > .apply(Match.filepatterns().continuously(
>> >Duration.standardMinutes(1),
>> > afterTimeSinceNewOutput(Duration.standardMinutes(5
>> > .apply(FooIO.readAllMatches());
>> >
>> > Adding explicit support for these configuration options to FooIO.read(),
>> > and adding a FooIO.readAll() should be optional.
>> >
>> > WDYT?
>> >
>>
>>


Re: Proposal: file-based IOs should support readAllMatches()

2017-09-03 Thread Eugene Kirpichov
The PR is in.

Now you can write code like the following, to use XmlIO to watch for new
files even though XmlIO itself does not support this.

   PCollection files = p

 
.apply(FileIO.match().filepattern(options.getInputFilepatternProvider()).continuously(
 Duration.standardSeconds(30),
afterTimeSinceNewOutput(Duration.standardMinutes(5
   .apply(FileIO.readMatches().withCompression(GZIP));

   PCollection output = files.apply(XmlIO.readFiles()
   .withRootElement("root")
   .withRecordElement("record")
   .withRecordClass(Record.class));

Or, you can write new file-based IOs by implementing only a readMatches()
version - you'll get the following features for free, via FileIO.match()
and FileIO.read():
- Watching for new files
- Handling compressed files
- Customizable handling of filepatterns that match no files or filepatterns
that match a directory

Or, you can write code to do arbitrary things with collections of files -
e.g. now it's quite trivial for a user to implement reading a collection of
text files with filenames and line numbers, by simply mapping the result of
FileIO.read() with a ParDo that parses the ReadableFile in this way.

On Thu, Aug 31, 2017 at 6:32 PM Eugene Kirpichov 
wrote:

> I sent a PR about this all:
> https://github.com/apache/beam/pull/3799
>
> On Mon, Aug 28, 2017 at 8:45 AM Eugene Kirpichov 
> wrote:
>
>> Thanks. I think I agree that file-based IOs (at least widely used ones)
>> should, for convenience, still provide FooIO.read().from(filepattern), and
>> for performance until SDF has full support in all runners, implement it via
>> a BoundedSource.
>>
>> The second case with Create.of(filepattern) illustrates when the
>> filepattern is not known at construction time but rather there's a
>> collection of filepatterns: it's a separate use case.
>>
>> On Mon, Aug 28, 2017 at 2:23 AM Etienne Chauchot 
>> wrote:
>>
>>> Hi Eugene,
>>>
>>> +1 to this, it is nice to add this common behavior to all the file-based
>>> IOs. I find the design elegant, I just have one minor API comment, I
>>> would prefer
>>>
>>> p.apply(FooIO.read().from(filepattern))
>>>
>>> to
>>>
>>> p.apply(Create.of(filepattern))
>>>
>>> IMHO, it is more readable and analogous to the other APIs.
>>>
>>> Etienne
>>>
>>> Le 18/08/2017 à 23:38, Eugene Kirpichov a écrit :
>>> > Hi all,
>>> >
>>> > I've been adding new features to TextIO and AvroIO recently, see e.g.
>>> > https://github.com/apache/beam/pull/3725. The features are:
>>> > - withHintMatchesManyFiles()
>>> > - readAll() that reads a PCollection of filepatterns
>>> > - configurable treatment of filepatterns that match no files
>>> > - watchForNewFiles() that incrementally watches for new files matching
>>> the
>>> > filepatterns
>>> >
>>> > However, these features also make sense for other file-based IOs
>>> > (TFRecordIO, XmlIO, the in-review WholeFileIO), and adding them
>>> explicitly
>>> > to each of these requires a lot of boilerplate and reeks of lack of
>>> > modularity. I don't want to add this much duplicated code to each of
>>> them,
>>> > nor to require authors of new such IOs to add it.
>>> >
>>> > Note that all of these features are available on the recently added
>>> > Match.filepatterns() transform, that converts a PCollection to
>>> a
>>> > PCollection (file path and size). The
>>> boilerplate in
>>> > file-based IOs ends up simply passing on the properties to the Match
>>> > transform.
>>> >
>>> > Because of this, I'd like to propose the following recommendation for
>>> > file-based IOs:
>>> > A file-based FooIO should include:
>>> > - A read transform that reads the data from a filepattern specified at
>>> > pipeline construction time - FooIO.read().from(filepattern) or
>>> something
>>> > analogous, as a PCollection
>>> > - A transform FooIO.readAllMatches() that converts a
>>> PCollection
>>> > to PCollection
>>> >
>>> > Then FooIO.read() handles the common case, and the user can solve all
>>> > advanced cases by combining Match.filepatterns() with
>>> > FooIO.readAllMatches():
>>> >
>>> > // Read files in a filepattern but don't fail if it's empty
>>> > PCollection foos = p.apply(Create.of(myFilepattern))
>>> > .apply(Match.filepatterns().withEmptyMatchTreatment(ALLOW))
>>> > .apply(FooIO.readAllMatches());
>>> >
>>> > // Read new filepatterns arriving over PubSub, and for each filepattern
>>> > // continuously watch for new files matching it, polling every 1 minute
>>> > // and stop polling a filepattern if no new files appear for 5 minutes
>>> > PCollection filepatterns = p.apply(PubsubIO.readStrings()...)
>>> > PCollection foos = p.apply(Create.of(myFilepattern))
>>> > .apply(Match.filepatterns().continuously(
>>> >Duration.standardMinutes(1),
>>> > afterTimeSinceNewOutput(Duration.standardMinutes(5
>>> > .apply(FooIO.readAllMatches());
>>> >
>>> > Adding explicit support for these configuration options to
>>> FooIO.read(),
>>> > and adding a FooIO.readAll() should be optiona