Yes we do have a use case for specifying number of shards, but unfortunately I can't share it with the group.
Shannon On Fri, Sep 27, 2019 at 2:14 PM Reuven Lax <[email protected]> wrote: > Is there a reason that you need to explicitly specify the number of > shards? If you don't, then this extra shuffle will not be performed. > > Reuven > > On Fri, Sep 27, 2019 at 12:12 PM Shannon Duncan < > [email protected]> wrote: > >> Interesting. Right now we are only doing batch processing so I hadn't >> thought about the windowing aspect. >> >> On Fri, Sep 27, 2019 at 12:10 PM Reuven Lax <[email protected]> wrote: >> >>> Are you doing this in streaming with windowed writes? Window grouping >>> does not "happen" in Beam until a GroupByKey, so you do need the GroupByKey >>> in that case. >>> >>> If you are not windowing but want a specific number of shards (though >>> the general suggestion in that case is to not pick a specific number of >>> shards, but let the runner pick it for you), your approach could work. >>> However the implementation would be more complicated than you suggest. The >>> problem is that every file writer has a buffer, and when you force many of >>> them to be in memory in a map you risk running out of memory. If you look >>> at the spilledFiles code in WriteFiles.java, it was written to handle >>> exactly this case. >>> >>> Reuven >>> >>> On Fri, Sep 27, 2019 at 8:47 AM Shannon Duncan < >>> [email protected]> wrote: >>> >>>> Yes, Specifically TextIO withNumShards(). >>>> >>>> On Fri, Sep 27, 2019 at 10:45 AM Reuven Lax <[email protected]> wrote: >>>> >>>>> I'm not sure what you mean by "write out ot a specific shard number." >>>>> Are you talking about FIleIO sinks? >>>>> >>>>> Reuven >>>>> >>>>> On Fri, Sep 27, 2019 at 7:41 AM Shannon Duncan < >>>>> [email protected]> wrote: >>>>> >>>>>> So when beam writes out to a specific shard number, as I understand >>>>>> it does a few things: >>>>>> >>>>>> - Assigns a shard key to each record (reduces parallelism) >>>>>> - Shuffles and Groups by the shard key to colocate all records >>>>>> - Writes out to each shard file within a single DoFn per key... >>>>>> >>>>>> When thinking about this, I believe we might be able to eliminate the >>>>>> GroupByKey to go ahead and write out to each file with its records with >>>>>> only a DoFn after the shard key is assigned. >>>>>> >>>>>> As long as the shard key is the actual key of the PCollection, then >>>>>> could we use a state variable to force all keys that are the same to >>>>>> process to share state with each other? >>>>>> >>>>>> On a DoFn can we use the setup to hold a Map of files being written >>>>>> to within bundles on that instance, and on teardown can we close all >>>>>> files >>>>>> within the map? >>>>>> >>>>>> If this is the case does it reduce the need for a shuffle and allow a >>>>>> DoFn to safely write out in append mode to a file, batch, etc held in >>>>>> state? >>>>>> >>>>>> It doesn't really decrease parallelism after the key is assigned >>>>>> since it can parallelize over each key within its state window. Which is >>>>>> the same level of parallelism we achieve by doing a GroupByKey and doing >>>>>> a >>>>>> for loop over the result. So performance shouldn't be impacted if this >>>>>> holds true. >>>>>> >>>>>> It's kind of like combining both the shuffle and the data write in >>>>>> the same step? >>>>>> >>>>>> This does however have a significant cost reduction by eliminating a >>>>>> compute based shuffle and also eliminating a Dataflow shuffle service >>>>>> call >>>>>> if shuffle service is enabled. >>>>>> >>>>>> Thoughts? >>>>>> >>>>>> Thanks, >>>>>> Shannon Duncan >>>>>> >>>>>
