Thanks. I think I agree that file-based IOs (at least widely used ones)
should, for convenience, still provide FooIO.read().from(filepattern), and
for performance until SDF has full support in all runners, implement it via
a BoundedSource.

The second case with Create.of(filepattern) illustrates when the
filepattern is not known at construction time but rather there's a
collection of filepatterns: it's a separate use case.

On Mon, Aug 28, 2017 at 2:23 AM Etienne Chauchot <[email protected]>
wrote:

> Hi Eugene,
>
> +1 to this, it is nice to add this common behavior to all the file-based
> IOs. I find the design elegant, I just have one minor API comment, I
> would prefer
>
> p.apply(FooIO.read().from(filepattern))
>
> to
>
> p.apply(Create.of(filepattern))
>
> IMHO, it is more readable and analogous to the other APIs.
>
> Etienne
>
> Le 18/08/2017 à 23:38, Eugene Kirpichov a écrit :
> > Hi all,
> >
> > I've been adding new features to TextIO and AvroIO recently, see e.g.
> > https://github.com/apache/beam/pull/3725. The features are:
> > - withHintMatchesManyFiles()
> > - readAll() that reads a PCollection of filepatterns
> > - configurable treatment of filepatterns that match no files
> > - watchForNewFiles() that incrementally watches for new files matching
> the
> > filepatterns
> >
> > However, these features also make sense for other file-based IOs
> > (TFRecordIO, XmlIO, the in-review WholeFileIO), and adding them
> explicitly
> > to each of these requires a lot of boilerplate and reeks of lack of
> > modularity. I don't want to add this much duplicated code to each of
> them,
> > nor to require authors of new such IOs to add it.
> >
> > Note that all of these features are available on the recently added
> > Match.filepatterns() transform, that converts a PCollection<String> to a
> > PCollection<MatchResult.Metadata> (file path and size). The boilerplate
> in
> > file-based IOs ends up simply passing on the properties to the Match
> > transform.
> >
> > Because of this, I'd like to propose the following recommendation for
> > file-based IOs:
> > A file-based FooIO should include:
> > - A read transform that reads the data from a filepattern specified at
> > pipeline construction time - FooIO.read().from(filepattern) or something
> > analogous, as a PCollection<Foo>
> > - A transform FooIO.readAllMatches() that converts a
> PCollection<Metadata>
> > to PCollection<Foo>
> >
> > Then FooIO.read() handles the common case, and the user can solve all
> > advanced cases by combining Match.filepatterns() with
> > FooIO.readAllMatches():
> >
> > // Read files in a filepattern but don't fail if it's empty
> > PCollection<Foo> foos = p.apply(Create.of(myFilepattern))
> >     .apply(Match.filepatterns().withEmptyMatchTreatment(ALLOW))
> >     .apply(FooIO.readAllMatches());
> >
> > // Read new filepatterns arriving over PubSub, and for each filepattern
> > // continuously watch for new files matching it, polling every 1 minute
> > // and stop polling a filepattern if no new files appear for 5 minutes
> > PCollection<String> filepatterns = p.apply(PubsubIO.readStrings()...)
> > PCollection<Foo> foos = p.apply(Create.of(myFilepattern))
> >     .apply(Match.filepatterns().continuously(
> >        Duration.standardMinutes(1),
> > afterTimeSinceNewOutput(Duration.standardMinutes(5))))
> >     .apply(FooIO.readAllMatches());
> >
> > Adding explicit support for these configuration options to FooIO.read(),
> > and adding a FooIO.readAll() should be optional.
> >
> > WDYT?
> >
>
>

Reply via email to