On Fri, May 26, 2023 at 2:59 PM Robert Bradshaw <rober...@google.com> wrote:

> Yes, with_hot_key_fanout only performs a single level of fanout. I don't
> think fanning out more than this has been explored, but I would imagine
> that for most cases the increased IO would negate most if not all of the
> benefits.
>

My reasoning for multi-level fanout would be that the total amount of IO is
that it converges as a geometric series: at each level, the amount of data
is reduced by a factor of 1/fanout. So even if fanout=2 at each level, the
total amount of IO is twice the IO of not using fanout at all. The general
IO overhead would be a factor of "fanout / (fanout - 1)".


> In particular, note that we already do "combiner lifting" to do as much
> combining as we can on the mapper side, e.g. suppose we have M elements and
> N workers. Each worker will (to a first order of approximation) combine M/N
> elements down to a single element, leaving N elements total to be combined
> by a worker in the subsequent stage. If N is large (or the combining
> computation expensive) one can use with_hot_key_fanout to add an
> intermediate step and let the N workers each combine M/N elements into
> sqrt(N) partial aggregates, and the subsequent worker only needs to combine
> the sqrt(N) partial aggregates. Generally N (the number of workers, not the
> number of elements) is small enough that multiple levels are not needed.
>

Thanks for clarifying Robert. I did not realize that "combiner lifting" was
a thing! We had been operating under the assumption that we should use
fanout to sqrt(M), which could indeed be bigger than sqrt(N). In general
"with_hot_key_fanout" could use documentation to explain exactly what the
parameter means and to indicate suggested usage (e.g., set it to sqrt(N)).

I will say that one other concern for us is memory usage. We typically work
with large, image-like data coming out of weather simulations, which we try
to divide into 10-100 MB chunks. With 1000 workers, this would suggest
fanout=30, which means combining up to 3 GB of data on a single machine.
This is probably fine but doesn't leave a large machine for error.


>  On Fri, May 26, 2023 at 1:57 PM Stephan Hoyer via dev <
> dev@beam.apache.org> wrote:
>
>> We have some use-cases where we are combining over very large sets (e.g.,
>> computing the average of 1e5 to 1e6 elements, corresponding to hourly
>> weather observations over the past 50 years).
>>
>> "with_hot_key_fanout" seems to be rather essential for performing these
>> calculations, but as far as I can tell it only performs a single level of
>> fanout, i.e., instead of summing up 1e6 elements on a single node, you sum
>> 1e3 element 1e3 times, and then sum those 1e3 results together.
>>
>> My guess is that such combiners could be much more efficient if this was
>> performed in a hierarchical/multi-stage fashion proportional to
>> log(element_count), e.g., summing 100 elements with 3 stages, or maybe
>> summing 10 elements with 6 stages. Dask uses such a "tree reduction"
>> strategy as controlled by the "split_every" parameter:
>> https://github.com/dask/dask/blob/453bd7031828f72e4602498c1a1f776280794bea/dask/array/reductions.py#L109
>>
>> I understand that the number of fanout stages could not be computed
>> automatically in the Beam data model, but it would still be nice to be able
>> to specify this manually. Has there been any thought to introducing this
>> feature?
>>
>> Thanks,
>> Stephan
>>
>

Reply via email to