Re: Cartesian product of PCollections
On Mon, Sep 19, 2022 at 1:53 PM Stephan Hoyer wrote: >> >> > > My team has an internal implementation of a CartesianProduct transform, >> > > based on using hashing to split a pcollection into a finite number of >> > > groups and CoGroupByKey. >> > >> > Could this be contributed to Beam? > > > If it would be of broader interest, I would be happy to work on this for the > Python SDK. > > I can share a link to the code with Googlers. > > On Mon, Sep 19, 2022 at 10:47 AM Robert Bradshaw wrote: >> >> If one of your inputs fits into memory, using side inputs is >> definitely the way to go. If neither side fits into memory, the cross >> product may be prohibitively large to compute even on a distributed >> computing platform (a billion times a billion is big, though I suppose >> one may hit memory limits with fewer elements if the elements >> themselves are large) > > I agree, in practice the side input solution will usually suffice. > > For CartesianProduct in particular, it is pretty common for one or more of > the inputs to have a statically known size, because it was created from an > in-memory sequence (i.e., with beam.Create). Otherwise we could look at > user-supplied hints, falling back to CoGroupByKey only if required. > > There is also the (not uncommon) special case where _every_ input has > statically known size, e.g., CreateCartesianProduct(). > >> one can still do the partitioning hack. E.g. >> >> partitions = pcoll_B | beam.Partition(hash, N) >> cross_product = tuple([ >> pcoll_A | beam.FlatMap(lambda a, bs: [(a, b) for b in bs], >> beam.pvalue.AsList(part)) >> for part in partitions >> ]) | beam.Flatten() > > > Interesting! I imagine this would break at some scale. Do you have an > intuition for what is a "reasonable" number of partitions -- 10s, 100s, 1000s? One would want the minimum number of partitions that allows each to fit into memory. N partitions will result in N distinct reads of pcoll_A. Thousands of partitions, if your data requires it, would not pose an issue (other than being able to allocate/afford enough resources to process such a large cross product, even at google scale).
Re: Cartesian product of PCollections
> > > > My team has an internal implementation of a CartesianProduct > transform, based on using hashing to split a pcollection into a finite > number of groups and CoGroupByKey. > > > > Could this be contributed to Beam? > If it would be of broader interest, I would be happy to work on this for the Python SDK. I can share a link to the code with Googlers. On Mon, Sep 19, 2022 at 10:47 AM Robert Bradshaw wrote: > If one of your inputs fits into memory, using side inputs is > definitely the way to go. If neither side fits into memory, the cross > product may be prohibitively large to compute even on a distributed > computing platform (a billion times a billion is big, though I suppose > one may hit memory limits with fewer elements if the elements > themselves are large) I agree, in practice the side input solution will usually suffice. For CartesianProduct in particular, it is pretty common for one or more of the inputs to have a statically known size, because it was created from an in-memory sequence (i.e., with beam.Create). Otherwise we could look at user-supplied hints, falling back to CoGroupByKey only if required. There is also the (not uncommon) special case where _every_ input has statically known size, e.g., CreateCartesianProduct(). one can still do the partitioning hack. E.g. > > partitions = pcoll_B | beam.Partition(hash, N) > cross_product = tuple([ > pcoll_A | beam.FlatMap(lambda a, bs: [(a, b) for b in bs], > beam.pvalue.AsList(part)) > for part in partitions > ]) | beam.Flatten() Interesting! I imagine this would break at some scale. Do you have an intuition for what is a "reasonable" number of partitions -- 10s, 100s, 1000s?
Re: Cartesian product of PCollections
If one of your inputs fits into memory, using side inputs is definitely the way to go. If neither side fits into memory, the cross product may be prohibitively large to compute even on a distributed computing platform (a billion times a billion is big, though I suppose one may hit memory limits with fewer elements if the elements themselves are large) but one can still do the partitioning hack. E.g. partitions = pcoll_B | beam.Partition(hash, N) cross_product = tuple([ pcoll_A | beam.FlatMap(lambda a, bs: [(a, b) for b in bs], beam.pvalue.AsList(part)) for part in partitions ]) | beam.Flatten() One may need to break fusion before the FlatMap to avoid trying to pull all parts into memory, e.g. class FusionBreak(beam.PTransform()): def expand(self, pcoll): empty = pcoll | beam.FlatMap(lambda x: ()) return pcoll | beam.Map(lambda x, _: x, beam.pvalue.AsList(empty)) On Mon, Sep 19, 2022 at 8:34 AM Brian Hulette via dev wrote: > > In SQL we just don't support cross joins currently [1]. I'm not aware of an > existing implementation of a cross join/cartesian product. > > > My team has an internal implementation of a CartesianProduct transform, > > based on using hashing to split a pcollection into a finite number of > > groups and CoGroupByKey. > > Could this be contributed to Beam? > > > On the other hand, if any of the input pcollections are small, using side > > inputs would probably be the way to go to avoid the need for a shuffle. > > We run into this problem frequently in Beam SQL. Our optimizer could be much > more effective with accurate size estimates, but we rarely have them, and > they may never be good enough for us to select a side input implementation > over CoGroupByKey. I've had some offline discussions in this space, the best > solution we've come up with is to allow hints in SQL (or just arguments in > join transforms) that allow users to select a side input implementation. We > could also add some logging when a pipeline uses a CoGroupByKey and > PCollection sizes could be handled by a side input implementation, to nudge > users that way for future runs. > > Brian > > [1] https://beam.apache.org/documentation/dsls/sql/extensions/joins/ > > On Mon, Sep 19, 2022 at 8:01 AM Stephan Hoyer via dev > wrote: >> >> I'm wondering if it would make sense to have a built-in Beam transformation >> for calculating the Cartesian product of PCollections. >> >> Just this past week, I've encountered two separate cases where calculating a >> Cartesian product was a bottleneck. The in-memory option of using something >> like Python's itertools.product() is convenient, but it only scales to a >> single node. >> >> Unfortunately, implementing a scalable Cartesian product seems to be >> somewhat non-trivial. I found two version of this question on StackOverflow, >> but neither contains a code solution: >> https://stackoverflow.com/questions/35008721/how-to-get-the-cartesian-product-of-two-pcollections >> https://stackoverflow.com/questions/41050477/how-to-do-a-cartesian-product-of-two-pcollections-in-dataflow/ >> >> There's a fair amount of nuance in an efficient and scalable implementation. >> My team has an internal implementation of a CartesianProduct transform, >> based on using hashing to split a pcollection into a finite number of groups >> and CoGroupByKey. On the other hand, if any of the input pcollections are >> small, using side inputs would probably be the way to go to avoid the need >> for a shuffle. >> >> Any thoughts? >> >> Cheers, >> Stephan
Re: Cartesian product of PCollections
In SQL we just don't support cross joins currently [1]. I'm not aware of an existing implementation of a cross join/cartesian product. > My team has an internal implementation of a CartesianProduct transform, based on using hashing to split a pcollection into a finite number of groups and CoGroupByKey. Could this be contributed to Beam? > On the other hand, if any of the input pcollections are small, using side inputs would probably be the way to go to avoid the need for a shuffle. We run into this problem frequently in Beam SQL. Our optimizer could be much more effective with accurate size estimates, but we rarely have them, and they may never be good enough for us to select a side input implementation over CoGroupByKey. I've had some offline discussions in this space, the best solution we've come up with is to allow hints in SQL (or just arguments in join transforms) that allow users to select a side input implementation. We could also add some logging when a pipeline uses a CoGroupByKey and PCollection sizes could be handled by a side input implementation, to nudge users that way for future runs. Brian [1] https://beam.apache.org/documentation/dsls/sql/extensions/joins/ On Mon, Sep 19, 2022 at 8:01 AM Stephan Hoyer via dev wrote: > I'm wondering if it would make sense to have a built-in Beam > transformation for calculating the Cartesian product of PCollections. > > Just this past week, I've encountered two separate cases where calculating > a Cartesian product was a bottleneck. The in-memory option of using > something like Python's itertools.product() is convenient, but it only > scales to a single node. > > Unfortunately, implementing a scalable Cartesian product seems to be > somewhat non-trivial. I found two version of this question on > StackOverflow, but neither contains a code solution: > > https://stackoverflow.com/questions/35008721/how-to-get-the-cartesian-product-of-two-pcollections > > https://stackoverflow.com/questions/41050477/how-to-do-a-cartesian-product-of-two-pcollections-in-dataflow/ > > There's a fair amount of nuance in an efficient and scalable > implementation. My team has an internal implementation of a > CartesianProduct transform, based on using hashing to split a pcollection > into a finite number of groups and CoGroupByKey. On the other hand, if any > of the input pcollections are small, using side inputs would probably be > the way to go to avoid the need for a shuffle. > > Any thoughts? > > Cheers, > Stephan >
Cartesian product of PCollections
I'm wondering if it would make sense to have a built-in Beam transformation for calculating the Cartesian product of PCollections. Just this past week, I've encountered two separate cases where calculating a Cartesian product was a bottleneck. The in-memory option of using something like Python's itertools.product() is convenient, but it only scales to a single node. Unfortunately, implementing a scalable Cartesian product seems to be somewhat non-trivial. I found two version of this question on StackOverflow, but neither contains a code solution: https://stackoverflow.com/questions/35008721/how-to-get-the-cartesian-product-of-two-pcollections https://stackoverflow.com/questions/41050477/how-to-do-a-cartesian-product-of-two-pcollections-in-dataflow/ There's a fair amount of nuance in an efficient and scalable implementation. My team has an internal implementation of a CartesianProduct transform, based on using hashing to split a pcollection into a finite number of groups and CoGroupByKey. On the other hand, if any of the input pcollections are small, using side inputs would probably be the way to go to avoid the need for a shuffle. Any thoughts? Cheers, Stephan