On Tue, Oct 1, 2024 at 8:11 AM Kenneth Knowles <k...@apache.org> wrote:

> Am I right in understanding that "TupleCombineFn" is the Python name for
> ComposedCombineFn? (
> https://beam.apache.org/releases/javadoc/2.59.0/index.html?org/apache/beam/sdk/transforms/CombineFns.ComposedCombineFn.html
> )
>

Yes. Note there's two variants, one that takes a tuple of inputs and
applies a tuple of combiners, and another that takes a single input and a
tuple of combiners (each applied in parallel).


> On Sun, Sep 29, 2024 at 7:51 PM Joey Tran <joey.t...@schrodinger.com>
> wrote:
>
>> Ah I realize now that `standard_optimize_phases` is not actually
>> currently used by the fn runner so these changes don't effectively do
>> anything
>>
>> On Sun, Sep 29, 2024 at 3:19 PM Joey Tran <joey.t...@schrodinger.com>
>> wrote:
>>
>>> I took a crack at trying to replace GBK+CombineValues with CBK so then
>>> it doesn't matter what the user chooses to do.
>>>
>>> https://github.com/apache/beam/pull/32592
>>>
>>> On Fri, Sep 27, 2024 at 4:32 PM Joey Tran <joey.t...@schrodinger.com>
>>> wrote:
>>>
>>>> Hmm, it makes sense from a runner optimization perspective, but I think
>>>> it's a lot less ergonomic to publish combinefns instead of ptransforms.
>>>> Another drawback to the stacked combinefn is the label will have to be a
>>>> mash of possibly very different combiners squashed into one
>>>>
>>>> On Fri, Sep 27, 2024 at 3:52 PM Robert Bradshaw via dev <
>>>> dev@beam.apache.org> wrote:
>>>>
>>>>> I'd be worried about encouraging the anti-pattern of GroupByKey() +
>>>>> CombinePerGroup() which would make the important (often essential) 
>>>>> combiner
>>>>> lifting optimization harder (pattern detection in the runner vs. composite
>>>>> detection).
>>>>>
>>>>> You might also be interested in the TupleCombineFns
>>>>>
>>>>>
>>>>> https://github.com/apache/beam/blob/release-2.13.0/sdks/python/apache_beam/transforms/combiners.py#L717
>>>>>
>>>>> keyed_nums = ...
>>>>> combined_nums = keyed_nums | CombinePerKey(
>>>>>     combiners.SingleInputTupleCombineFn(
>>>>>         sum,
>>>>>         combiners.MeanCombineFn(),
>>>>>         ...))
>>>>>
>>>>> FWIW, I never liked the .Globall() and .PerKey() transforms as they're
>>>>> not very compressible. I think they were needed to make java typing work
>>>>> out well in java 7 and then copied to Python, but I would suggest just
>>>>> using CombineGlobally(...) and CombinePerKey(...) over these.
>>>>>
>>>>> You might also be interested to know that in Python we already have
>>>>> combiner consolidation
>>>>>
>>>>>
>>>>> https://github.com/apache/beam/blob/release-2.48.0/sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py#L953
>>>>>
>>>>> https://github.com/apache/beam/blob/release-2.48.0/sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py#L1161
>>>>>
>>>>>
>>>>> I don't remember what the status is of enabling this by default (IIRC,
>>>>> they're conditionally enabled by decorating a transform).
>>>>>
>>>>> On Fri, Sep 27, 2024 at 12:01 PM Joey Tran <joey.t...@schrodinger.com>
>>>>> wrote:
>>>>>
>>>>>> Thoughts @dev <dev@beam.apache.org> for a `GroupedValues` version of
>>>>>> combiners? Named `PerGroup` or `PerGroupedValues`?
>>>>>>
>>>>>> On Fri, Sep 27, 2024 at 2:57 PM Valentyn Tymofieiev <
>>>>>> valen...@google.com> wrote:
>>>>>>
>>>>>>>
>>>>>>> On Fri, Sep 27, 2024 at 11:13 AM Joey Tran <
>>>>>>> joey.t...@schrodinger.com> wrote:
>>>>>>>
>>>>>>>> Ah! That is exactly the kind of primitive I was looking for but
>>>>>>>> thought didn't exist. Thanks for pointing it out. Yeah that works well 
>>>>>>>> for
>>>>>>>> me, I'll use that in my combiners (with an API of `PerGroupedValues`).
>>>>>>>> Thanks!
>>>>>>>>
>>>>>>>> If we did want to add `PerGroupedValues` to our current combiners
>>>>>>>> I'd also be happy to put up a PR doing that
>>>>>>>>
>>>>>>>
>>>>>>> I don't see why not. I'd run by dev@ for naming ideas.  PerGroup is
>>>>>>> another possibility.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>>
>>>>>>>> On Fri, Sep 27, 2024 at 2:01 PM Valentyn Tymofieiev <
>>>>>>>> valen...@google.com> wrote:
>>>>>>>>
>>>>>>>>> The closest primitve to that intent seems to be CombineValues:
>>>>>>>>> https://github.com/apache/beam/blob/c2c640f8c33071d5bb3e854e82c554c03a0bc851/sdks/python/apache_beam/transforms/core.py#L3010
>>>>>>>>> , and you should be able to write:
>>>>>>>>>
>>>>>>>>> max_sample_size = 100_000
>>>>>>>>> ( keyed_nums
>>>>>>>>>  | GroupByKey()
>>>>>>>>>  | Map(lambda k_nums: (k, nums[:max_sample_size]))
>>>>>>>>>  | CombineValues(MeanCombineFn())
>>>>>>>>> ```
>>>>>>>>> Would that work for other scenarios you have in mind?
>>>>>>>>>
>>>>>>>>> Haven't thought too much about this but from looking at
>>>>>>>>> https://github.com/apache/beam/blob/c2c640f8c33071d5bb3e854e82c554c03a0bc851/sdks/python/apache_beam/transforms/combiners.py#L90,
>>>>>>>>> I could see us adding Mean.GroupedValues or Mean.PerGroupedValues 
>>>>>>>>> there.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Fri, Sep 27, 2024 at 10:41 AM Joey Tran <
>>>>>>>>> joey.t...@schrodinger.com> wrote:
>>>>>>>>>
>>>>>>>>>> It feels more natural because it's only using GroupByKey once
>>>>>>>>>> instead of once per combiner. Which I think is still more efficient 
>>>>>>>>>> even
>>>>>>>>>> accounting for combiner lifting (unless there's some kind of pipeline
>>>>>>>>>> optimization that merges multiple groupbykey's on the same 
>>>>>>>>>> pcollection into
>>>>>>>>>> a single GBK).
>>>>>>>>>>
>>>>>>>>>> You can imagine a different use case where this pattern might
>>>>>>>>>> arise that isn't just trying to reduce GBKs though. For example:
>>>>>>>>>>
>>>>>>>>>> ```
>>>>>>>>>> max_sample_size = 100_000
>>>>>>>>>> ( keyed_nums
>>>>>>>>>>  | GroupByKey()
>>>>>>>>>>  | Map(lambda k_nums: (k, nums[:max_sample_size]))
>>>>>>>>>>  | #??  Mean.PerGrouped()?
>>>>>>>>>> ```
>>>>>>>>>>
>>>>>>>>>> To take the mean of every grouped_values using current combiners,
>>>>>>>>>> I think you'd have to use an inverted groupbykey and then call
>>>>>>>>>> `Mean.PerKey()` unless I'm missing something.
>>>>>>>>>>
>>>>>>>>>> (I recognize that writing a Map that takes a mean is simple
>>>>>>>>>> enough, but in a real use case we might have a more complicated 
>>>>>>>>>> combiner)
>>>>>>>>>>
>>>>>>>>>> On Fri, Sep 27, 2024 at 1:31 PM Valentyn Tymofieiev via user <
>>>>>>>>>> u...@beam.apache.org> wrote:
>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Fri, Sep 27, 2024 at 8:35 AM Joey Tran <
>>>>>>>>>>> joey.t...@schrodinger.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hey all,
>>>>>>>>>>>>
>>>>>>>>>>>> Just curious if this pattern comes up for others and if people
>>>>>>>>>>>> have worked out a good convention.
>>>>>>>>>>>>
>>>>>>>>>>>> There are many combiners and a lot of them have two forms: a
>>>>>>>>>>>> global form (e.g. Count.Globally) and a per key form (e.g. 
>>>>>>>>>>>> Count.PerKey).
>>>>>>>>>>>> These are convenient but it feels like often we're running into 
>>>>>>>>>>>> the case
>>>>>>>>>>>> where we GroupBy a set of data once and then wish to perform a 
>>>>>>>>>>>> series of
>>>>>>>>>>>> combines on them, in which case neither of these forms work, and 
>>>>>>>>>>>> it begs
>>>>>>>>>>>> another form which operates on pre-grouped KVs.
>>>>>>>>>>>>
>>>>>>>>>>>> Contrived example: maybe you have a pcollection of keyed
>>>>>>>>>>>> numbers and you want to calculate some summary statistics on them. 
>>>>>>>>>>>> You
>>>>>>>>>>>> could do:
>>>>>>>>>>>> ```
>>>>>>>>>>>> keyed_means = (keyed_nums
>>>>>>>>>>>>  | Mean.PerKey())
>>>>>>>>>>>> keyed_counts = (keyed_num
>>>>>>>>>>>>  | Count.PerKey())
>>>>>>>>>>>> ... # other combines
>>>>>>>>>>>> ```
>>>>>>>>>>>> But it'd feel more natural to pre-group the pcollection.
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Does it feel more natural because it feels as though it would be
>>>>>>>>>>> more performant? Because it seems like it adds an extra grouping 
>>>>>>>>>>> step to
>>>>>>>>>>> the pipeline code, which otherwise might be not necessary. Note that
>>>>>>>>>>> Dataflow has the "combiner lifting" optimization, and
>>>>>>>>>>> combiner-specified-reduction happens before the data is written into
>>>>>>>>>>> shuffle as much as possible:
>>>>>>>>>>> https://cloud.google.com/dataflow/docs/pipeline-lifecycle#combine_optimization
>>>>>>>>>>> .
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>> ```
>>>>>>>>>>>> grouped_nums = keyed_nums | GBK()
>>>>>>>>>>>> keyed_means = (grouped_nums | Mean.PerGrouped())
>>>>>>>>>>>> keyed_counts (grouped_nums | Count.PerGrouped())
>>>>>>>>>>>> ```
>>>>>>>>>>>> But these "PerGrouped" variants don't actually currently exist.
>>>>>>>>>>>> Does anyone else run into this pattern often? I might be missing 
>>>>>>>>>>>> an obvious
>>>>>>>>>>>> pattern here.
>>>>>>>>>>>>
>>>>>>>>>>>> --
>>>>>>>>>>>>
>>>>>>>>>>>> Joey Tran | Staff Developer | AutoDesigner TL
>>>>>>>>>>>>
>>>>>>>>>>>> *he/him*
>>>>>>>>>>>>
>>>>>>>>>>>> [image: Schrödinger, Inc.] <https://schrodinger.com/>
>>>>>>>>>>>>
>>>>>>>>>>>

Reply via email to