+1 for this. Also it looks like IO authors should be able to use existing
'ReadAllViaFileBasedSource' transform when implementing
FooIO.readAllMatches().

- Cham

On Fri, Aug 18, 2017 at 2:38 PM Eugene Kirpichov
<[email protected]> wrote:

> Hi all,
>
> I've been adding new features to TextIO and AvroIO recently, see e.g.
> https://github.com/apache/beam/pull/3725. The features are:
> - withHintMatchesManyFiles()
> - readAll() that reads a PCollection of filepatterns
> - configurable treatment of filepatterns that match no files
> - watchForNewFiles() that incrementally watches for new files matching the
> filepatterns
>
> However, these features also make sense for other file-based IOs
> (TFRecordIO, XmlIO, the in-review WholeFileIO), and adding them explicitly
> to each of these requires a lot of boilerplate and reeks of lack of
> modularity. I don't want to add this much duplicated code to each of them,
> nor to require authors of new such IOs to add it.
>
> Note that all of these features are available on the recently added
> Match.filepatterns() transform, that converts a PCollection<String> to a
> PCollection<MatchResult.Metadata> (file path and size). The boilerplate in
> file-based IOs ends up simply passing on the properties to the Match
> transform.
>
> Because of this, I'd like to propose the following recommendation for
> file-based IOs:
> A file-based FooIO should include:
> - A read transform that reads the data from a filepattern specified at
> pipeline construction time - FooIO.read().from(filepattern) or something
> analogous, as a PCollection<Foo>
> - A transform FooIO.readAllMatches() that converts a PCollection<Metadata>
> to PCollection<Foo>
>
> Then FooIO.read() handles the common case, and the user can solve all
> advanced cases by combining Match.filepatterns() with
> FooIO.readAllMatches():
>
> // Read files in a filepattern but don't fail if it's empty
> PCollection<Foo> foos = p.apply(Create.of(myFilepattern))
>    .apply(Match.filepatterns().withEmptyMatchTreatment(ALLOW))
>    .apply(FooIO.readAllMatches());
>
> // Read new filepatterns arriving over PubSub, and for each filepattern
> // continuously watch for new files matching it, polling every 1 minute
> // and stop polling a filepattern if no new files appear for 5 minutes
> PCollection<String> filepatterns = p.apply(PubsubIO.readStrings()...)
> PCollection<Foo> foos = p.apply(Create.of(myFilepattern))
>    .apply(Match.filepatterns().continuously(
>       Duration.standardMinutes(1),
> afterTimeSinceNewOutput(Duration.standardMinutes(5))))
>    .apply(FooIO.readAllMatches());
>
> Adding explicit support for these configuration options to FooIO.read(),
> and adding a FooIO.readAll() should be optional.
>
> WDYT?
>

Reply via email to