On Mon, Sep 19, 2022 at 1:53 PM Stephan Hoyer <sho...@google.com> wrote: >> >> > > My team has an internal implementation of a CartesianProduct transform, >> > > based on using hashing to split a pcollection into a finite number of >> > > groups and CoGroupByKey. >> > >> > Could this be contributed to Beam? > > > If it would be of broader interest, I would be happy to work on this for the > Python SDK. > > I can share a link to the code with Googlers. > > On Mon, Sep 19, 2022 at 10:47 AM Robert Bradshaw <rober...@google.com> wrote: >> >> If one of your inputs fits into memory, using side inputs is >> definitely the way to go. If neither side fits into memory, the cross >> product may be prohibitively large to compute even on a distributed >> computing platform (a billion times a billion is big, though I suppose >> one may hit memory limits with fewer elements if the elements >> themselves are large) > > I agree, in practice the side input solution will usually suffice. > > For CartesianProduct in particular, it is pretty common for one or more of > the inputs to have a statically known size, because it was created from an > in-memory sequence (i.e., with beam.Create). Otherwise we could look at > user-supplied hints, falling back to CoGroupByKey only if required. > > There is also the (not uncommon) special case where _every_ input has > statically known size, e.g., CreateCartesianProduct(). > >> one can still do the partitioning hack. E.g. >> >> partitions = pcoll_B | beam.Partition(hash, N) >> cross_product = tuple([ >> pcoll_A | beam.FlatMap(lambda a, bs: [(a, b) for b in bs], >> beam.pvalue.AsList(part)) >> for part in partitions >> ]) | beam.Flatten() > > > Interesting! I imagine this would break at some scale. Do you have an > intuition for what is a "reasonable" number of partitions -- 10s, 100s, 1000s?
One would want the minimum number of partitions that allows each to fit into memory. N partitions will result in N distinct reads of pcoll_A. Thousands of partitions, if your data requires it, would not pose an issue (other than being able to allocate/afford enough resources to process such a large cross product, even at google scale).