Ah I realize now that `standard_optimize_phases` is not actually currently used by the fn runner so these changes don't effectively do anything
On Sun, Sep 29, 2024 at 3:19 PM Joey Tran <joey.t...@schrodinger.com> wrote: > I took a crack at trying to replace GBK+CombineValues with CBK so then it > doesn't matter what the user chooses to do. > > https://github.com/apache/beam/pull/32592 > > On Fri, Sep 27, 2024 at 4:32 PM Joey Tran <joey.t...@schrodinger.com> > wrote: > >> Hmm, it makes sense from a runner optimization perspective, but I think >> it's a lot less ergonomic to publish combinefns instead of ptransforms. >> Another drawback to the stacked combinefn is the label will have to be a >> mash of possibly very different combiners squashed into one >> >> On Fri, Sep 27, 2024 at 3:52 PM Robert Bradshaw via dev < >> dev@beam.apache.org> wrote: >> >>> I'd be worried about encouraging the anti-pattern of GroupByKey() + >>> CombinePerGroup() which would make the important (often essential) combiner >>> lifting optimization harder (pattern detection in the runner vs. composite >>> detection). >>> >>> You might also be interested in the TupleCombineFns >>> >>> >>> https://github.com/apache/beam/blob/release-2.13.0/sdks/python/apache_beam/transforms/combiners.py#L717 >>> >>> keyed_nums = ... >>> combined_nums = keyed_nums | CombinePerKey( >>> combiners.SingleInputTupleCombineFn( >>> sum, >>> combiners.MeanCombineFn(), >>> ...)) >>> >>> FWIW, I never liked the .Globall() and .PerKey() transforms as they're >>> not very compressible. I think they were needed to make java typing work >>> out well in java 7 and then copied to Python, but I would suggest just >>> using CombineGlobally(...) and CombinePerKey(...) over these. >>> >>> You might also be interested to know that in Python we already have >>> combiner consolidation >>> >>> >>> https://github.com/apache/beam/blob/release-2.48.0/sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py#L953 >>> >>> https://github.com/apache/beam/blob/release-2.48.0/sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py#L1161 >>> >>> >>> I don't remember what the status is of enabling this by default (IIRC, >>> they're conditionally enabled by decorating a transform). >>> >>> On Fri, Sep 27, 2024 at 12:01 PM Joey Tran <joey.t...@schrodinger.com> >>> wrote: >>> >>>> Thoughts @dev <dev@beam.apache.org> for a `GroupedValues` version of >>>> combiners? Named `PerGroup` or `PerGroupedValues`? >>>> >>>> On Fri, Sep 27, 2024 at 2:57 PM Valentyn Tymofieiev < >>>> valen...@google.com> wrote: >>>> >>>>> >>>>> On Fri, Sep 27, 2024 at 11:13 AM Joey Tran <joey.t...@schrodinger.com> >>>>> wrote: >>>>> >>>>>> Ah! That is exactly the kind of primitive I was looking for but >>>>>> thought didn't exist. Thanks for pointing it out. Yeah that works well >>>>>> for >>>>>> me, I'll use that in my combiners (with an API of `PerGroupedValues`). >>>>>> Thanks! >>>>>> >>>>>> If we did want to add `PerGroupedValues` to our current combiners I'd >>>>>> also be happy to put up a PR doing that >>>>>> >>>>> >>>>> I don't see why not. I'd run by dev@ for naming ideas. PerGroup is >>>>> another possibility. >>>>> >>>>> >>>>> >>>>> >>>>>> >>>>>> On Fri, Sep 27, 2024 at 2:01 PM Valentyn Tymofieiev < >>>>>> valen...@google.com> wrote: >>>>>> >>>>>>> The closest primitve to that intent seems to be CombineValues: >>>>>>> https://github.com/apache/beam/blob/c2c640f8c33071d5bb3e854e82c554c03a0bc851/sdks/python/apache_beam/transforms/core.py#L3010 >>>>>>> , and you should be able to write: >>>>>>> >>>>>>> max_sample_size = 100_000 >>>>>>> ( keyed_nums >>>>>>> | GroupByKey() >>>>>>> | Map(lambda k_nums: (k, nums[:max_sample_size])) >>>>>>> | CombineValues(MeanCombineFn()) >>>>>>> ``` >>>>>>> Would that work for other scenarios you have in mind? >>>>>>> >>>>>>> Haven't thought too much about this but from looking at >>>>>>> https://github.com/apache/beam/blob/c2c640f8c33071d5bb3e854e82c554c03a0bc851/sdks/python/apache_beam/transforms/combiners.py#L90, >>>>>>> I could see us adding Mean.GroupedValues or Mean.PerGroupedValues there. >>>>>>> >>>>>>> >>>>>>> On Fri, Sep 27, 2024 at 10:41 AM Joey Tran < >>>>>>> joey.t...@schrodinger.com> wrote: >>>>>>> >>>>>>>> It feels more natural because it's only using GroupByKey once >>>>>>>> instead of once per combiner. Which I think is still more efficient >>>>>>>> even >>>>>>>> accounting for combiner lifting (unless there's some kind of pipeline >>>>>>>> optimization that merges multiple groupbykey's on the same pcollection >>>>>>>> into >>>>>>>> a single GBK). >>>>>>>> >>>>>>>> You can imagine a different use case where this pattern might arise >>>>>>>> that isn't just trying to reduce GBKs though. For example: >>>>>>>> >>>>>>>> ``` >>>>>>>> max_sample_size = 100_000 >>>>>>>> ( keyed_nums >>>>>>>> | GroupByKey() >>>>>>>> | Map(lambda k_nums: (k, nums[:max_sample_size])) >>>>>>>> | #?? Mean.PerGrouped()? >>>>>>>> ``` >>>>>>>> >>>>>>>> To take the mean of every grouped_values using current combiners, I >>>>>>>> think you'd have to use an inverted groupbykey and then call >>>>>>>> `Mean.PerKey()` unless I'm missing something. >>>>>>>> >>>>>>>> (I recognize that writing a Map that takes a mean is simple enough, >>>>>>>> but in a real use case we might have a more complicated combiner) >>>>>>>> >>>>>>>> On Fri, Sep 27, 2024 at 1:31 PM Valentyn Tymofieiev via user < >>>>>>>> u...@beam.apache.org> wrote: >>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> On Fri, Sep 27, 2024 at 8:35 AM Joey Tran < >>>>>>>>> joey.t...@schrodinger.com> wrote: >>>>>>>>> >>>>>>>>>> Hey all, >>>>>>>>>> >>>>>>>>>> Just curious if this pattern comes up for others and if people >>>>>>>>>> have worked out a good convention. >>>>>>>>>> >>>>>>>>>> There are many combiners and a lot of them have two forms: a >>>>>>>>>> global form (e.g. Count.Globally) and a per key form (e.g. >>>>>>>>>> Count.PerKey). >>>>>>>>>> These are convenient but it feels like often we're running into the >>>>>>>>>> case >>>>>>>>>> where we GroupBy a set of data once and then wish to perform a >>>>>>>>>> series of >>>>>>>>>> combines on them, in which case neither of these forms work, and it >>>>>>>>>> begs >>>>>>>>>> another form which operates on pre-grouped KVs. >>>>>>>>>> >>>>>>>>>> Contrived example: maybe you have a pcollection of keyed numbers >>>>>>>>>> and you want to calculate some summary statistics on them. You could >>>>>>>>>> do: >>>>>>>>>> ``` >>>>>>>>>> keyed_means = (keyed_nums >>>>>>>>>> | Mean.PerKey()) >>>>>>>>>> keyed_counts = (keyed_num >>>>>>>>>> | Count.PerKey()) >>>>>>>>>> ... # other combines >>>>>>>>>> ``` >>>>>>>>>> But it'd feel more natural to pre-group the pcollection. >>>>>>>>>> >>>>>>>>> >>>>>>>>> Does it feel more natural because it feels as though it would be >>>>>>>>> more performant? Because it seems like it adds an extra grouping step >>>>>>>>> to >>>>>>>>> the pipeline code, which otherwise might be not necessary. Note that >>>>>>>>> Dataflow has the "combiner lifting" optimization, and >>>>>>>>> combiner-specified-reduction happens before the data is written into >>>>>>>>> shuffle as much as possible: >>>>>>>>> https://cloud.google.com/dataflow/docs/pipeline-lifecycle#combine_optimization >>>>>>>>> . >>>>>>>>> >>>>>>>>> >>>>>>>>>> ``` >>>>>>>>>> grouped_nums = keyed_nums | GBK() >>>>>>>>>> keyed_means = (grouped_nums | Mean.PerGrouped()) >>>>>>>>>> keyed_counts (grouped_nums | Count.PerGrouped()) >>>>>>>>>> ``` >>>>>>>>>> But these "PerGrouped" variants don't actually currently exist. >>>>>>>>>> Does anyone else run into this pattern often? I might be missing an >>>>>>>>>> obvious >>>>>>>>>> pattern here. >>>>>>>>>> >>>>>>>>>> -- >>>>>>>>>> >>>>>>>>>> Joey Tran | Staff Developer | AutoDesigner TL >>>>>>>>>> >>>>>>>>>> *he/him* >>>>>>>>>> >>>>>>>>>> [image: Schrödinger, Inc.] <https://schrodinger.com/> >>>>>>>>>> >>>>>>>>>