The PR is in. Now you can write code like the following, to use XmlIO to watch for new files even though XmlIO itself does not support this.
PCollection<ReadableFile> files = p .apply(FileIO.match().filepattern(options.getInputFilepatternProvider()).continuously( Duration.standardSeconds(30), afterTimeSinceNewOutput(Duration.standardMinutes(5)))) .apply(FileIO.readMatches().withCompression(GZIP)); PCollection<String> output = files.apply(XmlIO.<Record>readFiles() .withRootElement("root") .withRecordElement("record") .withRecordClass(Record.class)); Or, you can write new file-based IOs by implementing only a readMatches() version - you'll get the following features for free, via FileIO.match() and FileIO.read(): - Watching for new files - Handling compressed files - Customizable handling of filepatterns that match no files or filepatterns that match a directory Or, you can write code to do arbitrary things with collections of files - e.g. now it's quite trivial for a user to implement reading a collection of text files with filenames and line numbers, by simply mapping the result of FileIO.read() with a ParDo that parses the ReadableFile in this way. On Thu, Aug 31, 2017 at 6:32 PM Eugene Kirpichov <kirpic...@google.com> wrote: > I sent a PR about this all: > https://github.com/apache/beam/pull/3799 > > On Mon, Aug 28, 2017 at 8:45 AM Eugene Kirpichov <kirpic...@google.com> > wrote: > >> Thanks. I think I agree that file-based IOs (at least widely used ones) >> should, for convenience, still provide FooIO.read().from(filepattern), and >> for performance until SDF has full support in all runners, implement it via >> a BoundedSource. >> >> The second case with Create.of(filepattern) illustrates when the >> filepattern is not known at construction time but rather there's a >> collection of filepatterns: it's a separate use case. >> >> On Mon, Aug 28, 2017 at 2:23 AM Etienne Chauchot <echauc...@gmail.com> >> wrote: >> >>> Hi Eugene, >>> >>> +1 to this, it is nice to add this common behavior to all the file-based >>> IOs. I find the design elegant, I just have one minor API comment, I >>> would prefer >>> >>> p.apply(FooIO.read().from(filepattern)) >>> >>> to >>> >>> p.apply(Create.of(filepattern)) >>> >>> IMHO, it is more readable and analogous to the other APIs. >>> >>> Etienne >>> >>> Le 18/08/2017 à 23:38, Eugene Kirpichov a écrit : >>> > Hi all, >>> > >>> > I've been adding new features to TextIO and AvroIO recently, see e.g. >>> > https://github.com/apache/beam/pull/3725. The features are: >>> > - withHintMatchesManyFiles() >>> > - readAll() that reads a PCollection of filepatterns >>> > - configurable treatment of filepatterns that match no files >>> > - watchForNewFiles() that incrementally watches for new files matching >>> the >>> > filepatterns >>> > >>> > However, these features also make sense for other file-based IOs >>> > (TFRecordIO, XmlIO, the in-review WholeFileIO), and adding them >>> explicitly >>> > to each of these requires a lot of boilerplate and reeks of lack of >>> > modularity. I don't want to add this much duplicated code to each of >>> them, >>> > nor to require authors of new such IOs to add it. >>> > >>> > Note that all of these features are available on the recently added >>> > Match.filepatterns() transform, that converts a PCollection<String> to >>> a >>> > PCollection<MatchResult.Metadata> (file path and size). The >>> boilerplate in >>> > file-based IOs ends up simply passing on the properties to the Match >>> > transform. >>> > >>> > Because of this, I'd like to propose the following recommendation for >>> > file-based IOs: >>> > A file-based FooIO should include: >>> > - A read transform that reads the data from a filepattern specified at >>> > pipeline construction time - FooIO.read().from(filepattern) or >>> something >>> > analogous, as a PCollection<Foo> >>> > - A transform FooIO.readAllMatches() that converts a >>> PCollection<Metadata> >>> > to PCollection<Foo> >>> > >>> > Then FooIO.read() handles the common case, and the user can solve all >>> > advanced cases by combining Match.filepatterns() with >>> > FooIO.readAllMatches(): >>> > >>> > // Read files in a filepattern but don't fail if it's empty >>> > PCollection<Foo> foos = p.apply(Create.of(myFilepattern)) >>> > .apply(Match.filepatterns().withEmptyMatchTreatment(ALLOW)) >>> > .apply(FooIO.readAllMatches()); >>> > >>> > // Read new filepatterns arriving over PubSub, and for each filepattern >>> > // continuously watch for new files matching it, polling every 1 minute >>> > // and stop polling a filepattern if no new files appear for 5 minutes >>> > PCollection<String> filepatterns = p.apply(PubsubIO.readStrings()...) >>> > PCollection<Foo> foos = p.apply(Create.of(myFilepattern)) >>> > .apply(Match.filepatterns().continuously( >>> > Duration.standardMinutes(1), >>> > afterTimeSinceNewOutput(Duration.standardMinutes(5)))) >>> > .apply(FooIO.readAllMatches()); >>> > >>> > Adding explicit support for these configuration options to >>> FooIO.read(), >>> > and adding a FooIO.readAll() should be optional. >>> > >>> > WDYT? >>> > >>> >>>