Thoughts @dev <dev@beam.apache.org> for a `GroupedValues` version of
combiners? Named `PerGroup` or `PerGroupedValues`?

On Fri, Sep 27, 2024 at 2:57 PM Valentyn Tymofieiev <valen...@google.com>
wrote:

>
> On Fri, Sep 27, 2024 at 11:13 AM Joey Tran <joey.t...@schrodinger.com>
> wrote:
>
>> Ah! That is exactly the kind of primitive I was looking for but thought
>> didn't exist. Thanks for pointing it out. Yeah that works well for me, I'll
>> use that in my combiners (with an API of `PerGroupedValues`). Thanks!
>>
>> If we did want to add `PerGroupedValues` to our current combiners I'd
>> also be happy to put up a PR doing that
>>
>
> I don't see why not. I'd run by dev@ for naming ideas.  PerGroup is
> another possibility.
>
>
>
>
>>
>> On Fri, Sep 27, 2024 at 2:01 PM Valentyn Tymofieiev <valen...@google.com>
>> wrote:
>>
>>> The closest primitve to that intent seems to be CombineValues:
>>> https://github.com/apache/beam/blob/c2c640f8c33071d5bb3e854e82c554c03a0bc851/sdks/python/apache_beam/transforms/core.py#L3010
>>> , and you should be able to write:
>>>
>>> max_sample_size = 100_000
>>> ( keyed_nums
>>>  | GroupByKey()
>>>  | Map(lambda k_nums: (k, nums[:max_sample_size]))
>>>  | CombineValues(MeanCombineFn())
>>> ```
>>> Would that work for other scenarios you have in mind?
>>>
>>> Haven't thought too much about this but from looking at
>>> https://github.com/apache/beam/blob/c2c640f8c33071d5bb3e854e82c554c03a0bc851/sdks/python/apache_beam/transforms/combiners.py#L90,
>>> I could see us adding Mean.GroupedValues or Mean.PerGroupedValues there.
>>>
>>>
>>> On Fri, Sep 27, 2024 at 10:41 AM Joey Tran <joey.t...@schrodinger.com>
>>> wrote:
>>>
>>>> It feels more natural because it's only using GroupByKey once instead
>>>> of once per combiner. Which I think is still more efficient even accounting
>>>> for combiner lifting (unless there's some kind of pipeline optimization
>>>> that merges multiple groupbykey's on the same pcollection into a single
>>>> GBK).
>>>>
>>>> You can imagine a different use case where this pattern might arise
>>>> that isn't just trying to reduce GBKs though. For example:
>>>>
>>>> ```
>>>> max_sample_size = 100_000
>>>> ( keyed_nums
>>>>  | GroupByKey()
>>>>  | Map(lambda k_nums: (k, nums[:max_sample_size]))
>>>>  | #??  Mean.PerGrouped()?
>>>> ```
>>>>
>>>> To take the mean of every grouped_values using current combiners, I
>>>> think you'd have to use an inverted groupbykey and then call
>>>> `Mean.PerKey()` unless I'm missing something.
>>>>
>>>> (I recognize that writing a Map that takes a mean is simple enough, but
>>>> in a real use case we might have a more complicated combiner)
>>>>
>>>> On Fri, Sep 27, 2024 at 1:31 PM Valentyn Tymofieiev via user <
>>>> u...@beam.apache.org> wrote:
>>>>
>>>>>
>>>>>
>>>>> On Fri, Sep 27, 2024 at 8:35 AM Joey Tran <joey.t...@schrodinger.com>
>>>>> wrote:
>>>>>
>>>>>> Hey all,
>>>>>>
>>>>>> Just curious if this pattern comes up for others and if people have
>>>>>> worked out a good convention.
>>>>>>
>>>>>> There are many combiners and a lot of them have two forms: a global
>>>>>> form (e.g. Count.Globally) and a per key form (e.g. Count.PerKey). These
>>>>>> are convenient but it feels like often we're running into the case where 
>>>>>> we
>>>>>> GroupBy a set of data once and then wish to perform a series of combines 
>>>>>> on
>>>>>> them, in which case neither of these forms work, and it begs another form
>>>>>> which operates on pre-grouped KVs.
>>>>>>
>>>>>> Contrived example: maybe you have a pcollection of keyed numbers and
>>>>>> you want to calculate some summary statistics on them. You could do:
>>>>>> ```
>>>>>> keyed_means = (keyed_nums
>>>>>>  | Mean.PerKey())
>>>>>> keyed_counts = (keyed_num
>>>>>>  | Count.PerKey())
>>>>>> ... # other combines
>>>>>> ```
>>>>>> But it'd feel more natural to pre-group the pcollection.
>>>>>>
>>>>>
>>>>> Does it feel more natural because it feels as though it would be more
>>>>> performant? Because it seems like it adds an extra grouping step to the
>>>>> pipeline code, which otherwise might be not necessary. Note that Dataflow
>>>>> has the "combiner lifting" optimization, and combiner-specified-reduction
>>>>> happens before the data is written into shuffle as much as possible:
>>>>> https://cloud.google.com/dataflow/docs/pipeline-lifecycle#combine_optimization
>>>>> .
>>>>>
>>>>>
>>>>>> ```
>>>>>> grouped_nums = keyed_nums | GBK()
>>>>>> keyed_means = (grouped_nums | Mean.PerGrouped())
>>>>>> keyed_counts (grouped_nums | Count.PerGrouped())
>>>>>> ```
>>>>>> But these "PerGrouped" variants don't actually currently exist. Does
>>>>>> anyone else run into this pattern often? I might be missing an obvious
>>>>>> pattern here.
>>>>>>
>>>>>> --
>>>>>>
>>>>>> Joey Tran | Staff Developer | AutoDesigner TL
>>>>>>
>>>>>> *he/him*
>>>>>>
>>>>>> [image: Schrödinger, Inc.] <https://schrodinger.com/>
>>>>>>
>>>>>

Reply via email to