I'm not very familiar with SDF so I can't comment on that approach. Maybe +Boyuan Zhang <[email protected]> would be helpful there.
What if FileIO could yield the glob that was matched along with each file? Then you could use that as a grouping key later on. Brian On Wed, Mar 24, 2021 at 7:08 AM Evan Galpin <[email protected]> wrote: > Hi all, I’m looking for some expertise [image: :slightly_smiling_face:] I > realize I may not be using things as intended, and I welcome any feedback. > I’m using the Java SDK, v2.28.0 and using both the Direct Runner (for > development) and Dataflow Runner (for production). My pipeline is a > streaming pipeline with Google PubSub as the source. > > TL;DR I’d like to be able to maintain a grouping of entities within a > single PCollection element, but parallelize the fetching of those entities > from Google Cloud Storage (GCS). PCollection<Iterable<String>> --> > PCollection<Iterable<String>> where the starting PCollection is an > Iterable of file paths and the resulting PCollection is Iterable of file > contents. Alternatively, PCollection<String> --> > PCollection<Iterable<String>> would also work and perhaps even be > preferable, where the starting PCollection is a glob pattern, and the > resulting PCollection is an iterable of file contents which matched the > glob. > > My use-case is that at a point in my pipeline I have as input > PCollection<String>. Each element of the PCollection is a GCS filepath > glob pattern. It’s important that files which match the glob be grouped > together because the content of the files–once *all* files in a group are > read–need to be grouped downstream in the pipeline. I originally tried using > FileIO.matchAll and a subsequently GroupByKey . However, the matchAll, > window, and GroupByKey combination lacked any guarantee that all files > matching the glob would be read and in the same window before performing > the GroupByKey transform. It’s possible to achieve the desired results if > a large WindowFn is applied, but it’s still probabilistic rather than a > guarantee that all files will be read before grouping. It’s also the main > goal of my pipeline to maintain the lowest possible latency. > > So my next, and currently operational, plan was to use an AsyncHttpClient > to fan out fetching file contents via GCS HTTP API. I feel like this goes > against the grain in Beam and is likely quite suboptimal in terms of > parallelization. > > So I’ve started investigating SplittableDoFn . My current plan is to > allow splitting such that each entity in the input Iterable (i.e. each > matched file from the glob pattern) could be processed separately. The > challenge I’ve encountered is: how do I go about grouping/gathering the > split invocations back into a single output value in my DoFn? I’ve tried > using stateful processing and using a BagState to collect file contents > along the way, but I realized part way along that the ProcessElement method > of a splittable DoFn may only accept ProcessContext and Restriction tuples, > and no other args therefore no StateId args referring to a StateSpec. > > I noticed in the FilePatternWatcher example in the official SDF proposal > doc[1] that a custom tracker was created wherein FilePath Objects kept in > a set and presumably added to the set via tryClaim. This seems as though > it could work for my use-case, but I don’t see/understand how to go about > implementing a @SplitRestriction method using a custom RestrictionTracker. > > I would be very appreciative if anyone were able to offer advice. I have > no preference for any particular solution, only that I want to achieve the > ability to maintain a grouping of entities within a single PCollection > element, but parallelize the fetching of those entities from Google Cloud > Storage (GCS). > > Thanks! > Evan > > [1] ( > https://docs.google.com/document/d/1AQmx-T9XjSi1PNoEp5_L-lT0j7BkgTbmQnc6uFEMI4c/edit#heading=h.19qhdetat7d9 > ) >
