Hmm, it makes sense from a runner optimization perspective, but I think it's a lot less ergonomic to publish combinefns instead of ptransforms. Another drawback to the stacked combinefn is the label will have to be a mash of possibly very different combiners squashed into one
On Fri, Sep 27, 2024 at 3:52 PM Robert Bradshaw via dev <dev@beam.apache.org> wrote: > I'd be worried about encouraging the anti-pattern of GroupByKey() + > CombinePerGroup() which would make the important (often essential) combiner > lifting optimization harder (pattern detection in the runner vs. composite > detection). > > You might also be interested in the TupleCombineFns > > > https://github.com/apache/beam/blob/release-2.13.0/sdks/python/apache_beam/transforms/combiners.py#L717 > > keyed_nums = ... > combined_nums = keyed_nums | CombinePerKey( > combiners.SingleInputTupleCombineFn( > sum, > combiners.MeanCombineFn(), > ...)) > > FWIW, I never liked the .Globall() and .PerKey() transforms as they're not > very compressible. I think they were needed to make java typing work out > well in java 7 and then copied to Python, but I would suggest just using > CombineGlobally(...) and CombinePerKey(...) over these. > > You might also be interested to know that in Python we already have > combiner consolidation > > > https://github.com/apache/beam/blob/release-2.48.0/sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py#L953 > > https://github.com/apache/beam/blob/release-2.48.0/sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py#L1161 > > > I don't remember what the status is of enabling this by default (IIRC, > they're conditionally enabled by decorating a transform). > > On Fri, Sep 27, 2024 at 12:01 PM Joey Tran <joey.t...@schrodinger.com> > wrote: > >> Thoughts @dev <dev@beam.apache.org> for a `GroupedValues` version of >> combiners? Named `PerGroup` or `PerGroupedValues`? >> >> On Fri, Sep 27, 2024 at 2:57 PM Valentyn Tymofieiev <valen...@google.com> >> wrote: >> >>> >>> On Fri, Sep 27, 2024 at 11:13 AM Joey Tran <joey.t...@schrodinger.com> >>> wrote: >>> >>>> Ah! That is exactly the kind of primitive I was looking for but thought >>>> didn't exist. Thanks for pointing it out. Yeah that works well for me, I'll >>>> use that in my combiners (with an API of `PerGroupedValues`). Thanks! >>>> >>>> If we did want to add `PerGroupedValues` to our current combiners I'd >>>> also be happy to put up a PR doing that >>>> >>> >>> I don't see why not. I'd run by dev@ for naming ideas. PerGroup is >>> another possibility. >>> >>> >>> >>> >>>> >>>> On Fri, Sep 27, 2024 at 2:01 PM Valentyn Tymofieiev < >>>> valen...@google.com> wrote: >>>> >>>>> The closest primitve to that intent seems to be CombineValues: >>>>> https://github.com/apache/beam/blob/c2c640f8c33071d5bb3e854e82c554c03a0bc851/sdks/python/apache_beam/transforms/core.py#L3010 >>>>> , and you should be able to write: >>>>> >>>>> max_sample_size = 100_000 >>>>> ( keyed_nums >>>>> | GroupByKey() >>>>> | Map(lambda k_nums: (k, nums[:max_sample_size])) >>>>> | CombineValues(MeanCombineFn()) >>>>> ``` >>>>> Would that work for other scenarios you have in mind? >>>>> >>>>> Haven't thought too much about this but from looking at >>>>> https://github.com/apache/beam/blob/c2c640f8c33071d5bb3e854e82c554c03a0bc851/sdks/python/apache_beam/transforms/combiners.py#L90, >>>>> I could see us adding Mean.GroupedValues or Mean.PerGroupedValues there. >>>>> >>>>> >>>>> On Fri, Sep 27, 2024 at 10:41 AM Joey Tran <joey.t...@schrodinger.com> >>>>> wrote: >>>>> >>>>>> It feels more natural because it's only using GroupByKey once instead >>>>>> of once per combiner. Which I think is still more efficient even >>>>>> accounting >>>>>> for combiner lifting (unless there's some kind of pipeline optimization >>>>>> that merges multiple groupbykey's on the same pcollection into a single >>>>>> GBK). >>>>>> >>>>>> You can imagine a different use case where this pattern might arise >>>>>> that isn't just trying to reduce GBKs though. For example: >>>>>> >>>>>> ``` >>>>>> max_sample_size = 100_000 >>>>>> ( keyed_nums >>>>>> | GroupByKey() >>>>>> | Map(lambda k_nums: (k, nums[:max_sample_size])) >>>>>> | #?? Mean.PerGrouped()? >>>>>> ``` >>>>>> >>>>>> To take the mean of every grouped_values using current combiners, I >>>>>> think you'd have to use an inverted groupbykey and then call >>>>>> `Mean.PerKey()` unless I'm missing something. >>>>>> >>>>>> (I recognize that writing a Map that takes a mean is simple enough, >>>>>> but in a real use case we might have a more complicated combiner) >>>>>> >>>>>> On Fri, Sep 27, 2024 at 1:31 PM Valentyn Tymofieiev via user < >>>>>> u...@beam.apache.org> wrote: >>>>>> >>>>>>> >>>>>>> >>>>>>> On Fri, Sep 27, 2024 at 8:35 AM Joey Tran <joey.t...@schrodinger.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Hey all, >>>>>>>> >>>>>>>> Just curious if this pattern comes up for others and if people have >>>>>>>> worked out a good convention. >>>>>>>> >>>>>>>> There are many combiners and a lot of them have two forms: a global >>>>>>>> form (e.g. Count.Globally) and a per key form (e.g. Count.PerKey). >>>>>>>> These >>>>>>>> are convenient but it feels like often we're running into the case >>>>>>>> where we >>>>>>>> GroupBy a set of data once and then wish to perform a series of >>>>>>>> combines on >>>>>>>> them, in which case neither of these forms work, and it begs another >>>>>>>> form >>>>>>>> which operates on pre-grouped KVs. >>>>>>>> >>>>>>>> Contrived example: maybe you have a pcollection of keyed numbers >>>>>>>> and you want to calculate some summary statistics on them. You could >>>>>>>> do: >>>>>>>> ``` >>>>>>>> keyed_means = (keyed_nums >>>>>>>> | Mean.PerKey()) >>>>>>>> keyed_counts = (keyed_num >>>>>>>> | Count.PerKey()) >>>>>>>> ... # other combines >>>>>>>> ``` >>>>>>>> But it'd feel more natural to pre-group the pcollection. >>>>>>>> >>>>>>> >>>>>>> Does it feel more natural because it feels as though it would be >>>>>>> more performant? Because it seems like it adds an extra grouping step to >>>>>>> the pipeline code, which otherwise might be not necessary. Note that >>>>>>> Dataflow has the "combiner lifting" optimization, and >>>>>>> combiner-specified-reduction happens before the data is written into >>>>>>> shuffle as much as possible: >>>>>>> https://cloud.google.com/dataflow/docs/pipeline-lifecycle#combine_optimization >>>>>>> . >>>>>>> >>>>>>> >>>>>>>> ``` >>>>>>>> grouped_nums = keyed_nums | GBK() >>>>>>>> keyed_means = (grouped_nums | Mean.PerGrouped()) >>>>>>>> keyed_counts (grouped_nums | Count.PerGrouped()) >>>>>>>> ``` >>>>>>>> But these "PerGrouped" variants don't actually currently exist. >>>>>>>> Does anyone else run into this pattern often? I might be missing an >>>>>>>> obvious >>>>>>>> pattern here. >>>>>>>> >>>>>>>> -- >>>>>>>> >>>>>>>> Joey Tran | Staff Developer | AutoDesigner TL >>>>>>>> >>>>>>>> *he/him* >>>>>>>> >>>>>>>> [image: Schrödinger, Inc.] <https://schrodinger.com/> >>>>>>>> >>>>>>>