On Tue, May 30, 2023 at 10:37 AM Kenneth Knowles <k...@apache.org> wrote:
> > On Sat, May 27, 2023 at 4:20 PM Stephan Hoyer via dev <dev@beam.apache.org> > wrote: > >> On Fri, May 26, 2023 at 2:59 PM Robert Bradshaw <rober...@google.com> >> wrote: >> >>> Yes, with_hot_key_fanout only performs a single level of fanout. I don't >>> think fanning out more than this has been explored, but I would imagine >>> that for most cases the increased IO would negate most if not all of the >>> benefits. >>> >> >> My reasoning for multi-level fanout would be that the total amount of IO >> is that it converges as a geometric series: at each level, the amount of >> data is reduced by a factor of 1/fanout. So even if fanout=2 at each level, >> the total amount of IO is twice the IO of not using fanout at all. The >> general IO overhead would be a factor of "fanout / (fanout - 1)". >> >> >>> In particular, note that we already do "combiner lifting" to do as much >>> combining as we can on the mapper side, e.g. suppose we have M elements and >>> N workers. Each worker will (to a first order of approximation) combine M/N >>> elements down to a single element, leaving N elements total to be combined >>> by a worker in the subsequent stage. If N is large (or the combining >>> computation expensive) one can use with_hot_key_fanout to add an >>> intermediate step and let the N workers each combine M/N elements into >>> sqrt(N) partial aggregates, and the subsequent worker only needs to combine >>> the sqrt(N) partial aggregates. Generally N (the number of workers, not the >>> number of elements) is small enough that multiple levels are not needed. >>> >> >> Thanks for clarifying Robert. I did not realize that "combiner lifting" >> was a thing! We had been operating under the assumption that we should use >> fanout to sqrt(M), which could indeed be bigger than sqrt(N). In general >> "with_hot_key_fanout" could use documentation to explain exactly what the >> parameter means and to indicate suggested usage (e.g., set it to sqrt(N)). >> >> I will say that one other concern for us is memory usage. We typically >> work with large, image-like data coming out of weather simulations, which >> we try to divide into 10-100 MB chunks. With 1000 workers, this would >> suggest fanout=30, which means combining up to 3 GB of data on a single >> machine. This is probably fine but doesn't leave a large machine for error. >> > > One thing in Robert's "to a first approximation" is that the pre-shuffle > combine does flush things to shuffle to avoid running out of memory. This > logic is per-SDK (because of how the combiner is invoked and also how to > measure memory footprint, what caching data structure are performant, etc). > So if this is working as intended, the impact of doing a large amount of > pre-combining on a machine is just a lesser benefit because of having to > flush more than one result per key, not excessive memory pressure. I'm sure > it is inexact in specific cases, though. > >> +1. Also, the general CombineFn only combines a single element into the aggregate at a time--no need to store all the inputs in memory at the same time. (The flushing is primarily a concern for the many-keys situation, in which case many aggregates are stored, one per key.) On Fri, May 26, 2023 at 1:57 PM Stephan Hoyer via dev <dev@beam.apache.org> >>> wrote: >>> >>>> We have some use-cases where we are combining over very large sets >>>> (e.g., computing the average of 1e5 to 1e6 elements, corresponding to >>>> hourly weather observations over the past 50 years). >>>> >>>> "with_hot_key_fanout" seems to be rather essential for performing these >>>> calculations, but as far as I can tell it only performs a single level of >>>> fanout, i.e., instead of summing up 1e6 elements on a single node, you sum >>>> 1e3 element 1e3 times, and then sum those 1e3 results together. >>>> >>>> My guess is that such combiners could be much more efficient if this >>>> was performed in a hierarchical/multi-stage fashion proportional to >>>> log(element_count), e.g., summing 100 elements with 3 stages, or maybe >>>> summing 10 elements with 6 stages. Dask uses such a "tree reduction" >>>> strategy as controlled by the "split_every" parameter: >>>> https://github.com/dask/dask/blob/453bd7031828f72e4602498c1a1f776280794bea/dask/array/reductions.py#L109 >>>> >>>> I understand that the number of fanout stages could not be computed >>>> automatically in the Beam data model, but it would still be nice to be able >>>> to specify this manually. Has there been any thought to introducing this >>>> feature? >>>> >>>> Thanks, >>>> Stephan >>>> >>>