I think I have a somewhat better proposal that encompasses this and
WholeFileIO.
I'm already moving Match.filepatterns() into FileIO.match()/matchAll(), and
I'd like to create FileIO.read(): PCollection<MatchResult.Metadata> ->
PCollection<ReadableFile>, potentially configurable by .withCompression().
Here, ReadableFile will be a new type that looks like this:
class ReadableFile {
ResourceId getResourceId();
long getSizeBytes();
Compression getCompression();
boolean isReadSeekEfficient();
ReadableByteChannel open();
byte[] readFullyAsBytes();
String readFullyAsString();
}
Under the hood it contains just the first 4 members, and others are just
utility functions. This is basically the current proposed
WholeFileIO.read() but seemingly under a more conveniently packaged
interface.
And then I'd expect all file-based IOs to support a version
BarIO.readAllFiles(): PCollection<ReadableFile> -> PCollection<Bar>
Example usage with a hypothetical BarIO:
PCollection<Bar> bars =
p.apply(FileIO.match().filepattern("...").continuously(...).withEmptyMatchTreatment(DISALLOW))
.apply(FileIO.read().withCompression(AUTO))
.apply(BarIO.readAllFiles());
BarIO.readAllFiles() under the hood may also split the ReadableFile's into
chunks, or even use an SDF.
Essentially this is a proposal to solve all use cases that involve reading
unsplittable file formats (because users can just add a ParDo on top of the
ReadableFile), as well as rid all future file-based IOs of boilerplate
associated with duplicating parameters like .continuously(),
.withCompression() and such.
On Fri, Aug 18, 2017 at 5:06 PM Chamikara Jayalath <[email protected]>
wrote:
> +1 for this. Also it looks like IO authors should be able to use existing
> 'ReadAllViaFileBasedSource' transform when implementing
> FooIO.readAllMatches().
>
> - Cham
>
> On Fri, Aug 18, 2017 at 2:38 PM Eugene Kirpichov
> <[email protected]> wrote:
>
> > Hi all,
> >
> > I've been adding new features to TextIO and AvroIO recently, see e.g.
> > https://github.com/apache/beam/pull/3725. The features are:
> > - withHintMatchesManyFiles()
> > - readAll() that reads a PCollection of filepatterns
> > - configurable treatment of filepatterns that match no files
> > - watchForNewFiles() that incrementally watches for new files matching
> the
> > filepatterns
> >
> > However, these features also make sense for other file-based IOs
> > (TFRecordIO, XmlIO, the in-review WholeFileIO), and adding them
> explicitly
> > to each of these requires a lot of boilerplate and reeks of lack of
> > modularity. I don't want to add this much duplicated code to each of
> them,
> > nor to require authors of new such IOs to add it.
> >
> > Note that all of these features are available on the recently added
> > Match.filepatterns() transform, that converts a PCollection<String> to a
> > PCollection<MatchResult.Metadata> (file path and size). The boilerplate
> in
> > file-based IOs ends up simply passing on the properties to the Match
> > transform.
> >
> > Because of this, I'd like to propose the following recommendation for
> > file-based IOs:
> > A file-based FooIO should include:
> > - A read transform that reads the data from a filepattern specified at
> > pipeline construction time - FooIO.read().from(filepattern) or something
> > analogous, as a PCollection<Foo>
> > - A transform FooIO.readAllMatches() that converts a
> PCollection<Metadata>
> > to PCollection<Foo>
> >
> > Then FooIO.read() handles the common case, and the user can solve all
> > advanced cases by combining Match.filepatterns() with
> > FooIO.readAllMatches():
> >
> > // Read files in a filepattern but don't fail if it's empty
> > PCollection<Foo> foos = p.apply(Create.of(myFilepattern))
> > .apply(Match.filepatterns().withEmptyMatchTreatment(ALLOW))
> > .apply(FooIO.readAllMatches());
> >
> > // Read new filepatterns arriving over PubSub, and for each filepattern
> > // continuously watch for new files matching it, polling every 1 minute
> > // and stop polling a filepattern if no new files appear for 5 minutes
> > PCollection<String> filepatterns = p.apply(PubsubIO.readStrings()...)
> > PCollection<Foo> foos = p.apply(Create.of(myFilepattern))
> > .apply(Match.filepatterns().continuously(
> > Duration.standardMinutes(1),
> > afterTimeSinceNewOutput(Duration.standardMinutes(5))))
> > .apply(FooIO.readAllMatches());
> >
> > Adding explicit support for these configuration options to FooIO.read(),
> > and adding a FooIO.readAll() should be optional.
> >
> > WDYT?
> >
>