On Tue, Oct 1, 2024 at 8:11 AM Kenneth Knowles <k...@apache.org> wrote:
> Am I right in understanding that "TupleCombineFn" is the Python name for > ComposedCombineFn? ( > https://beam.apache.org/releases/javadoc/2.59.0/index.html?org/apache/beam/sdk/transforms/CombineFns.ComposedCombineFn.html > ) > Yes. Note there's two variants, one that takes a tuple of inputs and applies a tuple of combiners, and another that takes a single input and a tuple of combiners (each applied in parallel). > On Sun, Sep 29, 2024 at 7:51 PM Joey Tran <joey.t...@schrodinger.com> > wrote: > >> Ah I realize now that `standard_optimize_phases` is not actually >> currently used by the fn runner so these changes don't effectively do >> anything >> >> On Sun, Sep 29, 2024 at 3:19 PM Joey Tran <joey.t...@schrodinger.com> >> wrote: >> >>> I took a crack at trying to replace GBK+CombineValues with CBK so then >>> it doesn't matter what the user chooses to do. >>> >>> https://github.com/apache/beam/pull/32592 >>> >>> On Fri, Sep 27, 2024 at 4:32 PM Joey Tran <joey.t...@schrodinger.com> >>> wrote: >>> >>>> Hmm, it makes sense from a runner optimization perspective, but I think >>>> it's a lot less ergonomic to publish combinefns instead of ptransforms. >>>> Another drawback to the stacked combinefn is the label will have to be a >>>> mash of possibly very different combiners squashed into one >>>> >>>> On Fri, Sep 27, 2024 at 3:52 PM Robert Bradshaw via dev < >>>> dev@beam.apache.org> wrote: >>>> >>>>> I'd be worried about encouraging the anti-pattern of GroupByKey() + >>>>> CombinePerGroup() which would make the important (often essential) >>>>> combiner >>>>> lifting optimization harder (pattern detection in the runner vs. composite >>>>> detection). >>>>> >>>>> You might also be interested in the TupleCombineFns >>>>> >>>>> >>>>> https://github.com/apache/beam/blob/release-2.13.0/sdks/python/apache_beam/transforms/combiners.py#L717 >>>>> >>>>> keyed_nums = ... >>>>> combined_nums = keyed_nums | CombinePerKey( >>>>> combiners.SingleInputTupleCombineFn( >>>>> sum, >>>>> combiners.MeanCombineFn(), >>>>> ...)) >>>>> >>>>> FWIW, I never liked the .Globall() and .PerKey() transforms as they're >>>>> not very compressible. I think they were needed to make java typing work >>>>> out well in java 7 and then copied to Python, but I would suggest just >>>>> using CombineGlobally(...) and CombinePerKey(...) over these. >>>>> >>>>> You might also be interested to know that in Python we already have >>>>> combiner consolidation >>>>> >>>>> >>>>> https://github.com/apache/beam/blob/release-2.48.0/sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py#L953 >>>>> >>>>> https://github.com/apache/beam/blob/release-2.48.0/sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py#L1161 >>>>> >>>>> >>>>> I don't remember what the status is of enabling this by default (IIRC, >>>>> they're conditionally enabled by decorating a transform). >>>>> >>>>> On Fri, Sep 27, 2024 at 12:01 PM Joey Tran <joey.t...@schrodinger.com> >>>>> wrote: >>>>> >>>>>> Thoughts @dev <dev@beam.apache.org> for a `GroupedValues` version of >>>>>> combiners? Named `PerGroup` or `PerGroupedValues`? >>>>>> >>>>>> On Fri, Sep 27, 2024 at 2:57 PM Valentyn Tymofieiev < >>>>>> valen...@google.com> wrote: >>>>>> >>>>>>> >>>>>>> On Fri, Sep 27, 2024 at 11:13 AM Joey Tran < >>>>>>> joey.t...@schrodinger.com> wrote: >>>>>>> >>>>>>>> Ah! That is exactly the kind of primitive I was looking for but >>>>>>>> thought didn't exist. Thanks for pointing it out. Yeah that works well >>>>>>>> for >>>>>>>> me, I'll use that in my combiners (with an API of `PerGroupedValues`). >>>>>>>> Thanks! >>>>>>>> >>>>>>>> If we did want to add `PerGroupedValues` to our current combiners >>>>>>>> I'd also be happy to put up a PR doing that >>>>>>>> >>>>>>> >>>>>>> I don't see why not. I'd run by dev@ for naming ideas. PerGroup is >>>>>>> another possibility. >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>>> >>>>>>>> On Fri, Sep 27, 2024 at 2:01 PM Valentyn Tymofieiev < >>>>>>>> valen...@google.com> wrote: >>>>>>>> >>>>>>>>> The closest primitve to that intent seems to be CombineValues: >>>>>>>>> https://github.com/apache/beam/blob/c2c640f8c33071d5bb3e854e82c554c03a0bc851/sdks/python/apache_beam/transforms/core.py#L3010 >>>>>>>>> , and you should be able to write: >>>>>>>>> >>>>>>>>> max_sample_size = 100_000 >>>>>>>>> ( keyed_nums >>>>>>>>> | GroupByKey() >>>>>>>>> | Map(lambda k_nums: (k, nums[:max_sample_size])) >>>>>>>>> | CombineValues(MeanCombineFn()) >>>>>>>>> ``` >>>>>>>>> Would that work for other scenarios you have in mind? >>>>>>>>> >>>>>>>>> Haven't thought too much about this but from looking at >>>>>>>>> https://github.com/apache/beam/blob/c2c640f8c33071d5bb3e854e82c554c03a0bc851/sdks/python/apache_beam/transforms/combiners.py#L90, >>>>>>>>> I could see us adding Mean.GroupedValues or Mean.PerGroupedValues >>>>>>>>> there. >>>>>>>>> >>>>>>>>> >>>>>>>>> On Fri, Sep 27, 2024 at 10:41 AM Joey Tran < >>>>>>>>> joey.t...@schrodinger.com> wrote: >>>>>>>>> >>>>>>>>>> It feels more natural because it's only using GroupByKey once >>>>>>>>>> instead of once per combiner. Which I think is still more efficient >>>>>>>>>> even >>>>>>>>>> accounting for combiner lifting (unless there's some kind of pipeline >>>>>>>>>> optimization that merges multiple groupbykey's on the same >>>>>>>>>> pcollection into >>>>>>>>>> a single GBK). >>>>>>>>>> >>>>>>>>>> You can imagine a different use case where this pattern might >>>>>>>>>> arise that isn't just trying to reduce GBKs though. For example: >>>>>>>>>> >>>>>>>>>> ``` >>>>>>>>>> max_sample_size = 100_000 >>>>>>>>>> ( keyed_nums >>>>>>>>>> | GroupByKey() >>>>>>>>>> | Map(lambda k_nums: (k, nums[:max_sample_size])) >>>>>>>>>> | #?? Mean.PerGrouped()? >>>>>>>>>> ``` >>>>>>>>>> >>>>>>>>>> To take the mean of every grouped_values using current combiners, >>>>>>>>>> I think you'd have to use an inverted groupbykey and then call >>>>>>>>>> `Mean.PerKey()` unless I'm missing something. >>>>>>>>>> >>>>>>>>>> (I recognize that writing a Map that takes a mean is simple >>>>>>>>>> enough, but in a real use case we might have a more complicated >>>>>>>>>> combiner) >>>>>>>>>> >>>>>>>>>> On Fri, Sep 27, 2024 at 1:31 PM Valentyn Tymofieiev via user < >>>>>>>>>> u...@beam.apache.org> wrote: >>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Fri, Sep 27, 2024 at 8:35 AM Joey Tran < >>>>>>>>>>> joey.t...@schrodinger.com> wrote: >>>>>>>>>>> >>>>>>>>>>>> Hey all, >>>>>>>>>>>> >>>>>>>>>>>> Just curious if this pattern comes up for others and if people >>>>>>>>>>>> have worked out a good convention. >>>>>>>>>>>> >>>>>>>>>>>> There are many combiners and a lot of them have two forms: a >>>>>>>>>>>> global form (e.g. Count.Globally) and a per key form (e.g. >>>>>>>>>>>> Count.PerKey). >>>>>>>>>>>> These are convenient but it feels like often we're running into >>>>>>>>>>>> the case >>>>>>>>>>>> where we GroupBy a set of data once and then wish to perform a >>>>>>>>>>>> series of >>>>>>>>>>>> combines on them, in which case neither of these forms work, and >>>>>>>>>>>> it begs >>>>>>>>>>>> another form which operates on pre-grouped KVs. >>>>>>>>>>>> >>>>>>>>>>>> Contrived example: maybe you have a pcollection of keyed >>>>>>>>>>>> numbers and you want to calculate some summary statistics on them. >>>>>>>>>>>> You >>>>>>>>>>>> could do: >>>>>>>>>>>> ``` >>>>>>>>>>>> keyed_means = (keyed_nums >>>>>>>>>>>> | Mean.PerKey()) >>>>>>>>>>>> keyed_counts = (keyed_num >>>>>>>>>>>> | Count.PerKey()) >>>>>>>>>>>> ... # other combines >>>>>>>>>>>> ``` >>>>>>>>>>>> But it'd feel more natural to pre-group the pcollection. >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> Does it feel more natural because it feels as though it would be >>>>>>>>>>> more performant? Because it seems like it adds an extra grouping >>>>>>>>>>> step to >>>>>>>>>>> the pipeline code, which otherwise might be not necessary. Note that >>>>>>>>>>> Dataflow has the "combiner lifting" optimization, and >>>>>>>>>>> combiner-specified-reduction happens before the data is written into >>>>>>>>>>> shuffle as much as possible: >>>>>>>>>>> https://cloud.google.com/dataflow/docs/pipeline-lifecycle#combine_optimization >>>>>>>>>>> . >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>>> ``` >>>>>>>>>>>> grouped_nums = keyed_nums | GBK() >>>>>>>>>>>> keyed_means = (grouped_nums | Mean.PerGrouped()) >>>>>>>>>>>> keyed_counts (grouped_nums | Count.PerGrouped()) >>>>>>>>>>>> ``` >>>>>>>>>>>> But these "PerGrouped" variants don't actually currently exist. >>>>>>>>>>>> Does anyone else run into this pattern often? I might be missing >>>>>>>>>>>> an obvious >>>>>>>>>>>> pattern here. >>>>>>>>>>>> >>>>>>>>>>>> -- >>>>>>>>>>>> >>>>>>>>>>>> Joey Tran | Staff Developer | AutoDesigner TL >>>>>>>>>>>> >>>>>>>>>>>> *he/him* >>>>>>>>>>>> >>>>>>>>>>>> [image: Schrödinger, Inc.] <https://schrodinger.com/> >>>>>>>>>>>> >>>>>>>>>>>