Hi Eugene,

+1 to this, it is nice to add this common behavior to all the file-based IOs. I find the design elegant, I just have one minor API comment, I would prefer

p.apply(FooIO.read().from(filepattern))

to

p.apply(Create.of(filepattern))

IMHO, it is more readable and analogous to the other APIs.

Etienne

Le 18/08/2017 à 23:38, Eugene Kirpichov a écrit :
Hi all,

I've been adding new features to TextIO and AvroIO recently, see e.g.
https://github.com/apache/beam/pull/3725. The features are:
- withHintMatchesManyFiles()
- readAll() that reads a PCollection of filepatterns
- configurable treatment of filepatterns that match no files
- watchForNewFiles() that incrementally watches for new files matching the
filepatterns

However, these features also make sense for other file-based IOs
(TFRecordIO, XmlIO, the in-review WholeFileIO), and adding them explicitly
to each of these requires a lot of boilerplate and reeks of lack of
modularity. I don't want to add this much duplicated code to each of them,
nor to require authors of new such IOs to add it.

Note that all of these features are available on the recently added
Match.filepatterns() transform, that converts a PCollection<String> to a
PCollection<MatchResult.Metadata> (file path and size). The boilerplate in
file-based IOs ends up simply passing on the properties to the Match
transform.

Because of this, I'd like to propose the following recommendation for
file-based IOs:
A file-based FooIO should include:
- A read transform that reads the data from a filepattern specified at
pipeline construction time - FooIO.read().from(filepattern) or something
analogous, as a PCollection<Foo>
- A transform FooIO.readAllMatches() that converts a PCollection<Metadata>
to PCollection<Foo>

Then FooIO.read() handles the common case, and the user can solve all
advanced cases by combining Match.filepatterns() with
FooIO.readAllMatches():

// Read files in a filepattern but don't fail if it's empty
PCollection<Foo> foos = p.apply(Create.of(myFilepattern))
    .apply(Match.filepatterns().withEmptyMatchTreatment(ALLOW))
    .apply(FooIO.readAllMatches());

// Read new filepatterns arriving over PubSub, and for each filepattern
// continuously watch for new files matching it, polling every 1 minute
// and stop polling a filepattern if no new files appear for 5 minutes
PCollection<String> filepatterns = p.apply(PubsubIO.readStrings()...)
PCollection<Foo> foos = p.apply(Create.of(myFilepattern))
    .apply(Match.filepatterns().continuously(
       Duration.standardMinutes(1),
afterTimeSinceNewOutput(Duration.standardMinutes(5))))
    .apply(FooIO.readAllMatches());

Adding explicit support for these configuration options to FooIO.read(),
and adding a FooIO.readAll() should be optional.

WDYT?


Reply via email to