Interesting. Right now we are only doing batch processing so I hadn't thought about the windowing aspect.
On Fri, Sep 27, 2019 at 12:10 PM Reuven Lax <[email protected]> wrote: > Are you doing this in streaming with windowed writes? Window grouping does > not "happen" in Beam until a GroupByKey, so you do need the GroupByKey in > that case. > > If you are not windowing but want a specific number of shards (though the > general suggestion in that case is to not pick a specific number of shards, > but let the runner pick it for you), your approach could work. However the > implementation would be more complicated than you suggest. The problem is > that every file writer has a buffer, and when you force many of them to be > in memory in a map you risk running out of memory. If you look at the > spilledFiles code in WriteFiles.java, it was written to handle exactly this > case. > > Reuven > > On Fri, Sep 27, 2019 at 8:47 AM Shannon Duncan <[email protected]> > wrote: > >> Yes, Specifically TextIO withNumShards(). >> >> On Fri, Sep 27, 2019 at 10:45 AM Reuven Lax <[email protected]> wrote: >> >>> I'm not sure what you mean by "write out ot a specific shard number." >>> Are you talking about FIleIO sinks? >>> >>> Reuven >>> >>> On Fri, Sep 27, 2019 at 7:41 AM Shannon Duncan < >>> [email protected]> wrote: >>> >>>> So when beam writes out to a specific shard number, as I understand it >>>> does a few things: >>>> >>>> - Assigns a shard key to each record (reduces parallelism) >>>> - Shuffles and Groups by the shard key to colocate all records >>>> - Writes out to each shard file within a single DoFn per key... >>>> >>>> When thinking about this, I believe we might be able to eliminate the >>>> GroupByKey to go ahead and write out to each file with its records with >>>> only a DoFn after the shard key is assigned. >>>> >>>> As long as the shard key is the actual key of the PCollection, then >>>> could we use a state variable to force all keys that are the same to >>>> process to share state with each other? >>>> >>>> On a DoFn can we use the setup to hold a Map of files being written to >>>> within bundles on that instance, and on teardown can we close all files >>>> within the map? >>>> >>>> If this is the case does it reduce the need for a shuffle and allow a >>>> DoFn to safely write out in append mode to a file, batch, etc held in >>>> state? >>>> >>>> It doesn't really decrease parallelism after the key is assigned since >>>> it can parallelize over each key within its state window. Which is the same >>>> level of parallelism we achieve by doing a GroupByKey and doing a for loop >>>> over the result. So performance shouldn't be impacted if this holds true. >>>> >>>> It's kind of like combining both the shuffle and the data write in the >>>> same step? >>>> >>>> This does however have a significant cost reduction by eliminating a >>>> compute based shuffle and also eliminating a Dataflow shuffle service call >>>> if shuffle service is enabled. >>>> >>>> Thoughts? >>>> >>>> Thanks, >>>> Shannon Duncan >>>> >>>
