On Fri, May 26, 2023 at 2:59 PM Robert Bradshaw <rober...@google.com> wrote:
> Yes, with_hot_key_fanout only performs a single level of fanout. I don't > think fanning out more than this has been explored, but I would imagine > that for most cases the increased IO would negate most if not all of the > benefits. > My reasoning for multi-level fanout would be that the total amount of IO is that it converges as a geometric series: at each level, the amount of data is reduced by a factor of 1/fanout. So even if fanout=2 at each level, the total amount of IO is twice the IO of not using fanout at all. The general IO overhead would be a factor of "fanout / (fanout - 1)". > In particular, note that we already do "combiner lifting" to do as much > combining as we can on the mapper side, e.g. suppose we have M elements and > N workers. Each worker will (to a first order of approximation) combine M/N > elements down to a single element, leaving N elements total to be combined > by a worker in the subsequent stage. If N is large (or the combining > computation expensive) one can use with_hot_key_fanout to add an > intermediate step and let the N workers each combine M/N elements into > sqrt(N) partial aggregates, and the subsequent worker only needs to combine > the sqrt(N) partial aggregates. Generally N (the number of workers, not the > number of elements) is small enough that multiple levels are not needed. > Thanks for clarifying Robert. I did not realize that "combiner lifting" was a thing! We had been operating under the assumption that we should use fanout to sqrt(M), which could indeed be bigger than sqrt(N). In general "with_hot_key_fanout" could use documentation to explain exactly what the parameter means and to indicate suggested usage (e.g., set it to sqrt(N)). I will say that one other concern for us is memory usage. We typically work with large, image-like data coming out of weather simulations, which we try to divide into 10-100 MB chunks. With 1000 workers, this would suggest fanout=30, which means combining up to 3 GB of data on a single machine. This is probably fine but doesn't leave a large machine for error. > On Fri, May 26, 2023 at 1:57 PM Stephan Hoyer via dev < > dev@beam.apache.org> wrote: > >> We have some use-cases where we are combining over very large sets (e.g., >> computing the average of 1e5 to 1e6 elements, corresponding to hourly >> weather observations over the past 50 years). >> >> "with_hot_key_fanout" seems to be rather essential for performing these >> calculations, but as far as I can tell it only performs a single level of >> fanout, i.e., instead of summing up 1e6 elements on a single node, you sum >> 1e3 element 1e3 times, and then sum those 1e3 results together. >> >> My guess is that such combiners could be much more efficient if this was >> performed in a hierarchical/multi-stage fashion proportional to >> log(element_count), e.g., summing 100 elements with 3 stages, or maybe >> summing 10 elements with 6 stages. Dask uses such a "tree reduction" >> strategy as controlled by the "split_every" parameter: >> https://github.com/dask/dask/blob/453bd7031828f72e4602498c1a1f776280794bea/dask/array/reductions.py#L109 >> >> I understand that the number of fanout stages could not be computed >> automatically in the Beam data model, but it would still be nice to be able >> to specify this manually. Has there been any thought to introducing this >> feature? >> >> Thanks, >> Stephan >> >