On Fri, Jan 26, 2024 at 8:43 AM Joey Tran <joey.t...@schrodinger.com> wrote: > > Hmm, I think I might still be missing something. CombinePerKey is made up of > "GBK() | CombineValues". Pulling it out into the Distinct, Distinct looks > like: > > def Distinct(pcoll): # pylint: disable=invalid-name > """Produces a PCollection containing distinct elements of a PCollection.""" > return ( > pcoll > | 'ToPairs' >> Map(lambda v: (v, None)) > | 'Group' >> GroupByKey() > | 'CombineValues >> CombineValues(lambda vs: None) > | 'Distinct' >> Keys()) > > Does the combiner lifting somehow make the GroupByKey operation more > efficient despite coming after it? My intuition would suggest that we could > just remove the `CombineValues` altogether
The key property of CombineFns is that they are commutative and associative which permits an optimization called combiner lifting. Specifically, the operation GroupByKey() | CombineValues(C) re-written into PartialCombineUsingLocalBufferMap(C) | GroupByKey() | FinalCombine(C) that pretty much every runner supports (going back to the days of the original MapReduce), which is what can make this so much more efficient. https://github.com/apache/beam/blob/release-2.21.0/sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py#L669 I am, unfortunately, coming up short in finding good documentation on this (Apache Beam specific or otherwise). > On Fri, Jan 26, 2024 at 11:33 AM Robert Bradshaw via dev > <dev@beam.apache.org> wrote: >> >> This is because it allows us to do some of the deduplication before >> shuffle via combiner lifting. E.g. say we have [A, A, A, B, B] on one >> worker and [B, B, B, B, C, C] on another. Rather than passing all that >> data through the GroupByKey (which involves (relatively) expensive >> materialization and cross-machine traffic, with this form the first >> worker will only emit [A, B] and the second [B, C] and only the B >> needs to be deduplicated post-shuffle. >> >> Wouldn't hurt to have a comment to that effect there. >> >> https://beam.apache.org/documentation/programming-guide/#combine >> >> On Fri, Jan 26, 2024 at 8:22 AM Joey Tran <joey.t...@schrodinger.com> wrote: >> > >> > Hey all, >> > >> > I was poking around and looking at `Distinct` and was confused about why >> > it was implemented the way it was. >> > >> > Reproduced here: >> > @ptransform_fn >> > @typehints.with_input_types(T) >> > @typehints.with_output_types(T) >> > def Distinct(pcoll): # pylint: disable=invalid-name >> > """Produces a PCollection containing distinct elements of a >> > PCollection.""" >> > return ( >> > pcoll >> > | 'ToPairs' >> Map(lambda v: (v, None)) >> > | 'Group' >> CombinePerKey(lambda vs: None) >> > | 'Distinct' >> Keys()) >> > >> > Could anyone clarify why we'd use a `CombinePerKey` instead of just using >> > `GroupByKey`? >> > >> > Cheers, >> > Joey