+1 for this. Also it looks like IO authors should be able to use existing 'ReadAllViaFileBasedSource' transform when implementing FooIO.readAllMatches().
- Cham On Fri, Aug 18, 2017 at 2:38 PM Eugene Kirpichov <[email protected]> wrote: > Hi all, > > I've been adding new features to TextIO and AvroIO recently, see e.g. > https://github.com/apache/beam/pull/3725. The features are: > - withHintMatchesManyFiles() > - readAll() that reads a PCollection of filepatterns > - configurable treatment of filepatterns that match no files > - watchForNewFiles() that incrementally watches for new files matching the > filepatterns > > However, these features also make sense for other file-based IOs > (TFRecordIO, XmlIO, the in-review WholeFileIO), and adding them explicitly > to each of these requires a lot of boilerplate and reeks of lack of > modularity. I don't want to add this much duplicated code to each of them, > nor to require authors of new such IOs to add it. > > Note that all of these features are available on the recently added > Match.filepatterns() transform, that converts a PCollection<String> to a > PCollection<MatchResult.Metadata> (file path and size). The boilerplate in > file-based IOs ends up simply passing on the properties to the Match > transform. > > Because of this, I'd like to propose the following recommendation for > file-based IOs: > A file-based FooIO should include: > - A read transform that reads the data from a filepattern specified at > pipeline construction time - FooIO.read().from(filepattern) or something > analogous, as a PCollection<Foo> > - A transform FooIO.readAllMatches() that converts a PCollection<Metadata> > to PCollection<Foo> > > Then FooIO.read() handles the common case, and the user can solve all > advanced cases by combining Match.filepatterns() with > FooIO.readAllMatches(): > > // Read files in a filepattern but don't fail if it's empty > PCollection<Foo> foos = p.apply(Create.of(myFilepattern)) > .apply(Match.filepatterns().withEmptyMatchTreatment(ALLOW)) > .apply(FooIO.readAllMatches()); > > // Read new filepatterns arriving over PubSub, and for each filepattern > // continuously watch for new files matching it, polling every 1 minute > // and stop polling a filepattern if no new files appear for 5 minutes > PCollection<String> filepatterns = p.apply(PubsubIO.readStrings()...) > PCollection<Foo> foos = p.apply(Create.of(myFilepattern)) > .apply(Match.filepatterns().continuously( > Duration.standardMinutes(1), > afterTimeSinceNewOutput(Duration.standardMinutes(5)))) > .apply(FooIO.readAllMatches()); > > Adding explicit support for these configuration options to FooIO.read(), > and adding a FooIO.readAll() should be optional. > > WDYT? >
