Hmm, it makes sense from a runner optimization perspective, but I think
it's a lot less ergonomic to publish combinefns instead of ptransforms.
Another drawback to the stacked combinefn is the label will have to be a
mash of possibly very different combiners squashed into one

On Fri, Sep 27, 2024 at 3:52 PM Robert Bradshaw via dev <dev@beam.apache.org>
wrote:

> I'd be worried about encouraging the anti-pattern of GroupByKey() +
> CombinePerGroup() which would make the important (often essential) combiner
> lifting optimization harder (pattern detection in the runner vs. composite
> detection).
>
> You might also be interested in the TupleCombineFns
>
>
> https://github.com/apache/beam/blob/release-2.13.0/sdks/python/apache_beam/transforms/combiners.py#L717
>
> keyed_nums = ...
> combined_nums = keyed_nums | CombinePerKey(
>     combiners.SingleInputTupleCombineFn(
>         sum,
>         combiners.MeanCombineFn(),
>         ...))
>
> FWIW, I never liked the .Globall() and .PerKey() transforms as they're not
> very compressible. I think they were needed to make java typing work out
> well in java 7 and then copied to Python, but I would suggest just using
> CombineGlobally(...) and CombinePerKey(...) over these.
>
> You might also be interested to know that in Python we already have
> combiner consolidation
>
>
> https://github.com/apache/beam/blob/release-2.48.0/sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py#L953
>
> https://github.com/apache/beam/blob/release-2.48.0/sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py#L1161
>
>
> I don't remember what the status is of enabling this by default (IIRC,
> they're conditionally enabled by decorating a transform).
>
> On Fri, Sep 27, 2024 at 12:01 PM Joey Tran <joey.t...@schrodinger.com>
> wrote:
>
>> Thoughts @dev <dev@beam.apache.org> for a `GroupedValues` version of
>> combiners? Named `PerGroup` or `PerGroupedValues`?
>>
>> On Fri, Sep 27, 2024 at 2:57 PM Valentyn Tymofieiev <valen...@google.com>
>> wrote:
>>
>>>
>>> On Fri, Sep 27, 2024 at 11:13 AM Joey Tran <joey.t...@schrodinger.com>
>>> wrote:
>>>
>>>> Ah! That is exactly the kind of primitive I was looking for but thought
>>>> didn't exist. Thanks for pointing it out. Yeah that works well for me, I'll
>>>> use that in my combiners (with an API of `PerGroupedValues`). Thanks!
>>>>
>>>> If we did want to add `PerGroupedValues` to our current combiners I'd
>>>> also be happy to put up a PR doing that
>>>>
>>>
>>> I don't see why not. I'd run by dev@ for naming ideas.  PerGroup is
>>> another possibility.
>>>
>>>
>>>
>>>
>>>>
>>>> On Fri, Sep 27, 2024 at 2:01 PM Valentyn Tymofieiev <
>>>> valen...@google.com> wrote:
>>>>
>>>>> The closest primitve to that intent seems to be CombineValues:
>>>>> https://github.com/apache/beam/blob/c2c640f8c33071d5bb3e854e82c554c03a0bc851/sdks/python/apache_beam/transforms/core.py#L3010
>>>>> , and you should be able to write:
>>>>>
>>>>> max_sample_size = 100_000
>>>>> ( keyed_nums
>>>>>  | GroupByKey()
>>>>>  | Map(lambda k_nums: (k, nums[:max_sample_size]))
>>>>>  | CombineValues(MeanCombineFn())
>>>>> ```
>>>>> Would that work for other scenarios you have in mind?
>>>>>
>>>>> Haven't thought too much about this but from looking at
>>>>> https://github.com/apache/beam/blob/c2c640f8c33071d5bb3e854e82c554c03a0bc851/sdks/python/apache_beam/transforms/combiners.py#L90,
>>>>> I could see us adding Mean.GroupedValues or Mean.PerGroupedValues there.
>>>>>
>>>>>
>>>>> On Fri, Sep 27, 2024 at 10:41 AM Joey Tran <joey.t...@schrodinger.com>
>>>>> wrote:
>>>>>
>>>>>> It feels more natural because it's only using GroupByKey once instead
>>>>>> of once per combiner. Which I think is still more efficient even 
>>>>>> accounting
>>>>>> for combiner lifting (unless there's some kind of pipeline optimization
>>>>>> that merges multiple groupbykey's on the same pcollection into a single
>>>>>> GBK).
>>>>>>
>>>>>> You can imagine a different use case where this pattern might arise
>>>>>> that isn't just trying to reduce GBKs though. For example:
>>>>>>
>>>>>> ```
>>>>>> max_sample_size = 100_000
>>>>>> ( keyed_nums
>>>>>>  | GroupByKey()
>>>>>>  | Map(lambda k_nums: (k, nums[:max_sample_size]))
>>>>>>  | #??  Mean.PerGrouped()?
>>>>>> ```
>>>>>>
>>>>>> To take the mean of every grouped_values using current combiners, I
>>>>>> think you'd have to use an inverted groupbykey and then call
>>>>>> `Mean.PerKey()` unless I'm missing something.
>>>>>>
>>>>>> (I recognize that writing a Map that takes a mean is simple enough,
>>>>>> but in a real use case we might have a more complicated combiner)
>>>>>>
>>>>>> On Fri, Sep 27, 2024 at 1:31 PM Valentyn Tymofieiev via user <
>>>>>> u...@beam.apache.org> wrote:
>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Fri, Sep 27, 2024 at 8:35 AM Joey Tran <joey.t...@schrodinger.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hey all,
>>>>>>>>
>>>>>>>> Just curious if this pattern comes up for others and if people have
>>>>>>>> worked out a good convention.
>>>>>>>>
>>>>>>>> There are many combiners and a lot of them have two forms: a global
>>>>>>>> form (e.g. Count.Globally) and a per key form (e.g. Count.PerKey). 
>>>>>>>> These
>>>>>>>> are convenient but it feels like often we're running into the case 
>>>>>>>> where we
>>>>>>>> GroupBy a set of data once and then wish to perform a series of 
>>>>>>>> combines on
>>>>>>>> them, in which case neither of these forms work, and it begs another 
>>>>>>>> form
>>>>>>>> which operates on pre-grouped KVs.
>>>>>>>>
>>>>>>>> Contrived example: maybe you have a pcollection of keyed numbers
>>>>>>>> and you want to calculate some summary statistics on them. You could 
>>>>>>>> do:
>>>>>>>> ```
>>>>>>>> keyed_means = (keyed_nums
>>>>>>>>  | Mean.PerKey())
>>>>>>>> keyed_counts = (keyed_num
>>>>>>>>  | Count.PerKey())
>>>>>>>> ... # other combines
>>>>>>>> ```
>>>>>>>> But it'd feel more natural to pre-group the pcollection.
>>>>>>>>
>>>>>>>
>>>>>>> Does it feel more natural because it feels as though it would be
>>>>>>> more performant? Because it seems like it adds an extra grouping step to
>>>>>>> the pipeline code, which otherwise might be not necessary. Note that
>>>>>>> Dataflow has the "combiner lifting" optimization, and
>>>>>>> combiner-specified-reduction happens before the data is written into
>>>>>>> shuffle as much as possible:
>>>>>>> https://cloud.google.com/dataflow/docs/pipeline-lifecycle#combine_optimization
>>>>>>> .
>>>>>>>
>>>>>>>
>>>>>>>> ```
>>>>>>>> grouped_nums = keyed_nums | GBK()
>>>>>>>> keyed_means = (grouped_nums | Mean.PerGrouped())
>>>>>>>> keyed_counts (grouped_nums | Count.PerGrouped())
>>>>>>>> ```
>>>>>>>> But these "PerGrouped" variants don't actually currently exist.
>>>>>>>> Does anyone else run into this pattern often? I might be missing an 
>>>>>>>> obvious
>>>>>>>> pattern here.
>>>>>>>>
>>>>>>>> --
>>>>>>>>
>>>>>>>> Joey Tran | Staff Developer | AutoDesigner TL
>>>>>>>>
>>>>>>>> *he/him*
>>>>>>>>
>>>>>>>> [image: Schrödinger, Inc.] <https://schrodinger.com/>
>>>>>>>>
>>>>>>>

Reply via email to