Thoughts @dev <dev@beam.apache.org> for a `GroupedValues` version of combiners? Named `PerGroup` or `PerGroupedValues`?
On Fri, Sep 27, 2024 at 2:57 PM Valentyn Tymofieiev <valen...@google.com> wrote: > > On Fri, Sep 27, 2024 at 11:13 AM Joey Tran <joey.t...@schrodinger.com> > wrote: > >> Ah! That is exactly the kind of primitive I was looking for but thought >> didn't exist. Thanks for pointing it out. Yeah that works well for me, I'll >> use that in my combiners (with an API of `PerGroupedValues`). Thanks! >> >> If we did want to add `PerGroupedValues` to our current combiners I'd >> also be happy to put up a PR doing that >> > > I don't see why not. I'd run by dev@ for naming ideas. PerGroup is > another possibility. > > > > >> >> On Fri, Sep 27, 2024 at 2:01 PM Valentyn Tymofieiev <valen...@google.com> >> wrote: >> >>> The closest primitve to that intent seems to be CombineValues: >>> https://github.com/apache/beam/blob/c2c640f8c33071d5bb3e854e82c554c03a0bc851/sdks/python/apache_beam/transforms/core.py#L3010 >>> , and you should be able to write: >>> >>> max_sample_size = 100_000 >>> ( keyed_nums >>> | GroupByKey() >>> | Map(lambda k_nums: (k, nums[:max_sample_size])) >>> | CombineValues(MeanCombineFn()) >>> ``` >>> Would that work for other scenarios you have in mind? >>> >>> Haven't thought too much about this but from looking at >>> https://github.com/apache/beam/blob/c2c640f8c33071d5bb3e854e82c554c03a0bc851/sdks/python/apache_beam/transforms/combiners.py#L90, >>> I could see us adding Mean.GroupedValues or Mean.PerGroupedValues there. >>> >>> >>> On Fri, Sep 27, 2024 at 10:41 AM Joey Tran <joey.t...@schrodinger.com> >>> wrote: >>> >>>> It feels more natural because it's only using GroupByKey once instead >>>> of once per combiner. Which I think is still more efficient even accounting >>>> for combiner lifting (unless there's some kind of pipeline optimization >>>> that merges multiple groupbykey's on the same pcollection into a single >>>> GBK). >>>> >>>> You can imagine a different use case where this pattern might arise >>>> that isn't just trying to reduce GBKs though. For example: >>>> >>>> ``` >>>> max_sample_size = 100_000 >>>> ( keyed_nums >>>> | GroupByKey() >>>> | Map(lambda k_nums: (k, nums[:max_sample_size])) >>>> | #?? Mean.PerGrouped()? >>>> ``` >>>> >>>> To take the mean of every grouped_values using current combiners, I >>>> think you'd have to use an inverted groupbykey and then call >>>> `Mean.PerKey()` unless I'm missing something. >>>> >>>> (I recognize that writing a Map that takes a mean is simple enough, but >>>> in a real use case we might have a more complicated combiner) >>>> >>>> On Fri, Sep 27, 2024 at 1:31 PM Valentyn Tymofieiev via user < >>>> u...@beam.apache.org> wrote: >>>> >>>>> >>>>> >>>>> On Fri, Sep 27, 2024 at 8:35 AM Joey Tran <joey.t...@schrodinger.com> >>>>> wrote: >>>>> >>>>>> Hey all, >>>>>> >>>>>> Just curious if this pattern comes up for others and if people have >>>>>> worked out a good convention. >>>>>> >>>>>> There are many combiners and a lot of them have two forms: a global >>>>>> form (e.g. Count.Globally) and a per key form (e.g. Count.PerKey). These >>>>>> are convenient but it feels like often we're running into the case where >>>>>> we >>>>>> GroupBy a set of data once and then wish to perform a series of combines >>>>>> on >>>>>> them, in which case neither of these forms work, and it begs another form >>>>>> which operates on pre-grouped KVs. >>>>>> >>>>>> Contrived example: maybe you have a pcollection of keyed numbers and >>>>>> you want to calculate some summary statistics on them. You could do: >>>>>> ``` >>>>>> keyed_means = (keyed_nums >>>>>> | Mean.PerKey()) >>>>>> keyed_counts = (keyed_num >>>>>> | Count.PerKey()) >>>>>> ... # other combines >>>>>> ``` >>>>>> But it'd feel more natural to pre-group the pcollection. >>>>>> >>>>> >>>>> Does it feel more natural because it feels as though it would be more >>>>> performant? Because it seems like it adds an extra grouping step to the >>>>> pipeline code, which otherwise might be not necessary. Note that Dataflow >>>>> has the "combiner lifting" optimization, and combiner-specified-reduction >>>>> happens before the data is written into shuffle as much as possible: >>>>> https://cloud.google.com/dataflow/docs/pipeline-lifecycle#combine_optimization >>>>> . >>>>> >>>>> >>>>>> ``` >>>>>> grouped_nums = keyed_nums | GBK() >>>>>> keyed_means = (grouped_nums | Mean.PerGrouped()) >>>>>> keyed_counts (grouped_nums | Count.PerGrouped()) >>>>>> ``` >>>>>> But these "PerGrouped" variants don't actually currently exist. Does >>>>>> anyone else run into this pattern often? I might be missing an obvious >>>>>> pattern here. >>>>>> >>>>>> -- >>>>>> >>>>>> Joey Tran | Staff Developer | AutoDesigner TL >>>>>> >>>>>> *he/him* >>>>>> >>>>>> [image: Schrödinger, Inc.] <https://schrodinger.com/> >>>>>> >>>>>