On Fri, Jan 26, 2024 at 8:43 AM Joey Tran <[email protected]> wrote:
>
> Hmm, I think I might still be missing something. CombinePerKey is made up of
> "GBK() | CombineValues". Pulling it out into the Distinct, Distinct looks
> like:
>
> def Distinct(pcoll): # pylint: disable=invalid-name
> """Produces a PCollection containing distinct elements of a PCollection."""
> return (
> pcoll
> | 'ToPairs' >> Map(lambda v: (v, None))
> | 'Group' >> GroupByKey()
> | 'CombineValues >> CombineValues(lambda vs: None)
> | 'Distinct' >> Keys())
>
> Does the combiner lifting somehow make the GroupByKey operation more
> efficient despite coming after it? My intuition would suggest that we could
> just remove the `CombineValues` altogether
The key property of CombineFns is that they are commutative and
associative which permits an optimization called combiner lifting.
Specifically, the operation
GroupByKey() | CombineValues(C)
re-written into
PartialCombineUsingLocalBufferMap(C) | GroupByKey() | FinalCombine(C)
that pretty much every runner supports (going back to the days of the
original MapReduce), which is what can make this so much more
efficient.
https://github.com/apache/beam/blob/release-2.21.0/sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py#L669
I am, unfortunately, coming up short in finding good documentation on
this (Apache Beam specific or otherwise).
> On Fri, Jan 26, 2024 at 11:33 AM Robert Bradshaw via dev
> <[email protected]> wrote:
>>
>> This is because it allows us to do some of the deduplication before
>> shuffle via combiner lifting. E.g. say we have [A, A, A, B, B] on one
>> worker and [B, B, B, B, C, C] on another. Rather than passing all that
>> data through the GroupByKey (which involves (relatively) expensive
>> materialization and cross-machine traffic, with this form the first
>> worker will only emit [A, B] and the second [B, C] and only the B
>> needs to be deduplicated post-shuffle.
>>
>> Wouldn't hurt to have a comment to that effect there.
>>
>> https://beam.apache.org/documentation/programming-guide/#combine
>>
>> On Fri, Jan 26, 2024 at 8:22 AM Joey Tran <[email protected]> wrote:
>> >
>> > Hey all,
>> >
>> > I was poking around and looking at `Distinct` and was confused about why
>> > it was implemented the way it was.
>> >
>> > Reproduced here:
>> > @ptransform_fn
>> > @typehints.with_input_types(T)
>> > @typehints.with_output_types(T)
>> > def Distinct(pcoll): # pylint: disable=invalid-name
>> > """Produces a PCollection containing distinct elements of a
>> > PCollection."""
>> > return (
>> > pcoll
>> > | 'ToPairs' >> Map(lambda v: (v, None))
>> > | 'Group' >> CombinePerKey(lambda vs: None)
>> > | 'Distinct' >> Keys())
>> >
>> > Could anyone clarify why we'd use a `CombinePerKey` instead of just using
>> > `GroupByKey`?
>> >
>> > Cheers,
>> > Joey