Ah I realize now that `standard_optimize_phases` is not actually currently
used by the fn runner so these changes don't effectively do anything

On Sun, Sep 29, 2024 at 3:19 PM Joey Tran <joey.t...@schrodinger.com> wrote:

> I took a crack at trying to replace GBK+CombineValues with CBK so then it
> doesn't matter what the user chooses to do.
>
> https://github.com/apache/beam/pull/32592
>
> On Fri, Sep 27, 2024 at 4:32 PM Joey Tran <joey.t...@schrodinger.com>
> wrote:
>
>> Hmm, it makes sense from a runner optimization perspective, but I think
>> it's a lot less ergonomic to publish combinefns instead of ptransforms.
>> Another drawback to the stacked combinefn is the label will have to be a
>> mash of possibly very different combiners squashed into one
>>
>> On Fri, Sep 27, 2024 at 3:52 PM Robert Bradshaw via dev <
>> dev@beam.apache.org> wrote:
>>
>>> I'd be worried about encouraging the anti-pattern of GroupByKey() +
>>> CombinePerGroup() which would make the important (often essential) combiner
>>> lifting optimization harder (pattern detection in the runner vs. composite
>>> detection).
>>>
>>> You might also be interested in the TupleCombineFns
>>>
>>>
>>> https://github.com/apache/beam/blob/release-2.13.0/sdks/python/apache_beam/transforms/combiners.py#L717
>>>
>>> keyed_nums = ...
>>> combined_nums = keyed_nums | CombinePerKey(
>>>     combiners.SingleInputTupleCombineFn(
>>>         sum,
>>>         combiners.MeanCombineFn(),
>>>         ...))
>>>
>>> FWIW, I never liked the .Globall() and .PerKey() transforms as they're
>>> not very compressible. I think they were needed to make java typing work
>>> out well in java 7 and then copied to Python, but I would suggest just
>>> using CombineGlobally(...) and CombinePerKey(...) over these.
>>>
>>> You might also be interested to know that in Python we already have
>>> combiner consolidation
>>>
>>>
>>> https://github.com/apache/beam/blob/release-2.48.0/sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py#L953
>>>
>>> https://github.com/apache/beam/blob/release-2.48.0/sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py#L1161
>>>
>>>
>>> I don't remember what the status is of enabling this by default (IIRC,
>>> they're conditionally enabled by decorating a transform).
>>>
>>> On Fri, Sep 27, 2024 at 12:01 PM Joey Tran <joey.t...@schrodinger.com>
>>> wrote:
>>>
>>>> Thoughts @dev <dev@beam.apache.org> for a `GroupedValues` version of
>>>> combiners? Named `PerGroup` or `PerGroupedValues`?
>>>>
>>>> On Fri, Sep 27, 2024 at 2:57 PM Valentyn Tymofieiev <
>>>> valen...@google.com> wrote:
>>>>
>>>>>
>>>>> On Fri, Sep 27, 2024 at 11:13 AM Joey Tran <joey.t...@schrodinger.com>
>>>>> wrote:
>>>>>
>>>>>> Ah! That is exactly the kind of primitive I was looking for but
>>>>>> thought didn't exist. Thanks for pointing it out. Yeah that works well 
>>>>>> for
>>>>>> me, I'll use that in my combiners (with an API of `PerGroupedValues`).
>>>>>> Thanks!
>>>>>>
>>>>>> If we did want to add `PerGroupedValues` to our current combiners I'd
>>>>>> also be happy to put up a PR doing that
>>>>>>
>>>>>
>>>>> I don't see why not. I'd run by dev@ for naming ideas.  PerGroup is
>>>>> another possibility.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>>
>>>>>> On Fri, Sep 27, 2024 at 2:01 PM Valentyn Tymofieiev <
>>>>>> valen...@google.com> wrote:
>>>>>>
>>>>>>> The closest primitve to that intent seems to be CombineValues:
>>>>>>> https://github.com/apache/beam/blob/c2c640f8c33071d5bb3e854e82c554c03a0bc851/sdks/python/apache_beam/transforms/core.py#L3010
>>>>>>> , and you should be able to write:
>>>>>>>
>>>>>>> max_sample_size = 100_000
>>>>>>> ( keyed_nums
>>>>>>>  | GroupByKey()
>>>>>>>  | Map(lambda k_nums: (k, nums[:max_sample_size]))
>>>>>>>  | CombineValues(MeanCombineFn())
>>>>>>> ```
>>>>>>> Would that work for other scenarios you have in mind?
>>>>>>>
>>>>>>> Haven't thought too much about this but from looking at
>>>>>>> https://github.com/apache/beam/blob/c2c640f8c33071d5bb3e854e82c554c03a0bc851/sdks/python/apache_beam/transforms/combiners.py#L90,
>>>>>>> I could see us adding Mean.GroupedValues or Mean.PerGroupedValues there.
>>>>>>>
>>>>>>>
>>>>>>> On Fri, Sep 27, 2024 at 10:41 AM Joey Tran <
>>>>>>> joey.t...@schrodinger.com> wrote:
>>>>>>>
>>>>>>>> It feels more natural because it's only using GroupByKey once
>>>>>>>> instead of once per combiner. Which I think is still more efficient 
>>>>>>>> even
>>>>>>>> accounting for combiner lifting (unless there's some kind of pipeline
>>>>>>>> optimization that merges multiple groupbykey's on the same pcollection 
>>>>>>>> into
>>>>>>>> a single GBK).
>>>>>>>>
>>>>>>>> You can imagine a different use case where this pattern might arise
>>>>>>>> that isn't just trying to reduce GBKs though. For example:
>>>>>>>>
>>>>>>>> ```
>>>>>>>> max_sample_size = 100_000
>>>>>>>> ( keyed_nums
>>>>>>>>  | GroupByKey()
>>>>>>>>  | Map(lambda k_nums: (k, nums[:max_sample_size]))
>>>>>>>>  | #??  Mean.PerGrouped()?
>>>>>>>> ```
>>>>>>>>
>>>>>>>> To take the mean of every grouped_values using current combiners, I
>>>>>>>> think you'd have to use an inverted groupbykey and then call
>>>>>>>> `Mean.PerKey()` unless I'm missing something.
>>>>>>>>
>>>>>>>> (I recognize that writing a Map that takes a mean is simple enough,
>>>>>>>> but in a real use case we might have a more complicated combiner)
>>>>>>>>
>>>>>>>> On Fri, Sep 27, 2024 at 1:31 PM Valentyn Tymofieiev via user <
>>>>>>>> u...@beam.apache.org> wrote:
>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Fri, Sep 27, 2024 at 8:35 AM Joey Tran <
>>>>>>>>> joey.t...@schrodinger.com> wrote:
>>>>>>>>>
>>>>>>>>>> Hey all,
>>>>>>>>>>
>>>>>>>>>> Just curious if this pattern comes up for others and if people
>>>>>>>>>> have worked out a good convention.
>>>>>>>>>>
>>>>>>>>>> There are many combiners and a lot of them have two forms: a
>>>>>>>>>> global form (e.g. Count.Globally) and a per key form (e.g. 
>>>>>>>>>> Count.PerKey).
>>>>>>>>>> These are convenient but it feels like often we're running into the 
>>>>>>>>>> case
>>>>>>>>>> where we GroupBy a set of data once and then wish to perform a 
>>>>>>>>>> series of
>>>>>>>>>> combines on them, in which case neither of these forms work, and it 
>>>>>>>>>> begs
>>>>>>>>>> another form which operates on pre-grouped KVs.
>>>>>>>>>>
>>>>>>>>>> Contrived example: maybe you have a pcollection of keyed numbers
>>>>>>>>>> and you want to calculate some summary statistics on them. You could 
>>>>>>>>>> do:
>>>>>>>>>> ```
>>>>>>>>>> keyed_means = (keyed_nums
>>>>>>>>>>  | Mean.PerKey())
>>>>>>>>>> keyed_counts = (keyed_num
>>>>>>>>>>  | Count.PerKey())
>>>>>>>>>> ... # other combines
>>>>>>>>>> ```
>>>>>>>>>> But it'd feel more natural to pre-group the pcollection.
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Does it feel more natural because it feels as though it would be
>>>>>>>>> more performant? Because it seems like it adds an extra grouping step 
>>>>>>>>> to
>>>>>>>>> the pipeline code, which otherwise might be not necessary. Note that
>>>>>>>>> Dataflow has the "combiner lifting" optimization, and
>>>>>>>>> combiner-specified-reduction happens before the data is written into
>>>>>>>>> shuffle as much as possible:
>>>>>>>>> https://cloud.google.com/dataflow/docs/pipeline-lifecycle#combine_optimization
>>>>>>>>> .
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>> ```
>>>>>>>>>> grouped_nums = keyed_nums | GBK()
>>>>>>>>>> keyed_means = (grouped_nums | Mean.PerGrouped())
>>>>>>>>>> keyed_counts (grouped_nums | Count.PerGrouped())
>>>>>>>>>> ```
>>>>>>>>>> But these "PerGrouped" variants don't actually currently exist.
>>>>>>>>>> Does anyone else run into this pattern often? I might be missing an 
>>>>>>>>>> obvious
>>>>>>>>>> pattern here.
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>>
>>>>>>>>>> Joey Tran | Staff Developer | AutoDesigner TL
>>>>>>>>>>
>>>>>>>>>> *he/him*
>>>>>>>>>>
>>>>>>>>>> [image: Schrödinger, Inc.] <https://schrodinger.com/>
>>>>>>>>>>
>>>>>>>>>

Reply via email to