On Fri, Jan 26, 2024 at 8:43 AM Joey Tran <joey.t...@schrodinger.com> wrote:
>
> Hmm, I think I might still be missing something. CombinePerKey is made up of 
> "GBK() | CombineValues". Pulling it out into the Distinct, Distinct looks 
> like:
>
> def Distinct(pcoll):  # pylint: disable=invalid-name
>   """Produces a PCollection containing distinct elements of a PCollection."""
>   return (
>      pcoll
>      | 'ToPairs' >> Map(lambda v: (v, None))
>       | 'Group' >> GroupByKey()
>       | 'CombineValues >> CombineValues(lambda vs: None)
>       | 'Distinct' >> Keys())
>
> Does the combiner lifting somehow make the GroupByKey operation more 
> efficient despite coming after it? My intuition would suggest that we could 
> just remove the `CombineValues` altogether

The key property of CombineFns is that they are commutative and
associative which permits an optimization called combiner lifting.
Specifically, the operation

    GroupByKey() | CombineValues(C)

re-written into

    PartialCombineUsingLocalBufferMap(C) | GroupByKey() | FinalCombine(C)

that pretty much every runner supports (going back to the days of the
original MapReduce), which is what can make this so much more
efficient.

https://github.com/apache/beam/blob/release-2.21.0/sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py#L669

I am, unfortunately, coming up short in finding good documentation on
this (Apache Beam specific or otherwise).


> On Fri, Jan 26, 2024 at 11:33 AM Robert Bradshaw via dev 
> <dev@beam.apache.org> wrote:
>>
>> This is because it allows us to do some of the deduplication before
>> shuffle via combiner lifting. E.g. say we have [A, A, A, B, B] on one
>> worker and [B, B, B, B, C, C] on another. Rather than passing all that
>> data through the GroupByKey (which involves (relatively) expensive
>> materialization and cross-machine traffic, with this form the first
>> worker will only emit [A, B] and the second [B, C] and only the B
>> needs to be deduplicated post-shuffle.
>>
>> Wouldn't hurt to have a comment to that effect there.
>>
>> https://beam.apache.org/documentation/programming-guide/#combine
>>
>> On Fri, Jan 26, 2024 at 8:22 AM Joey Tran <joey.t...@schrodinger.com> wrote:
>> >
>> > Hey all,
>> >
>> > I was poking around and looking at `Distinct` and was confused about why 
>> > it was implemented the way it was.
>> >
>> > Reproduced here:
>> > @ptransform_fn
>> > @typehints.with_input_types(T)
>> > @typehints.with_output_types(T)
>> > def Distinct(pcoll):  # pylint: disable=invalid-name
>> >   """Produces a PCollection containing distinct elements of a 
>> > PCollection."""
>> >   return (
>> >       pcoll
>> >       | 'ToPairs' >> Map(lambda v: (v, None))
>> >       | 'Group' >> CombinePerKey(lambda vs: None)
>> >       | 'Distinct' >> Keys())
>> >
>> > Could anyone clarify why we'd use a `CombinePerKey` instead of just using 
>> > `GroupByKey`?
>> >
>> > Cheers,
>> > Joey

Reply via email to