On Tue, May 30, 2023 at 10:37 AM Kenneth Knowles <k...@apache.org> wrote:

>
> On Sat, May 27, 2023 at 4:20 PM Stephan Hoyer via dev <dev@beam.apache.org>
> wrote:
>
>> On Fri, May 26, 2023 at 2:59 PM Robert Bradshaw <rober...@google.com>
>> wrote:
>>
>>> Yes, with_hot_key_fanout only performs a single level of fanout. I don't
>>> think fanning out more than this has been explored, but I would imagine
>>> that for most cases the increased IO would negate most if not all of the
>>> benefits.
>>>
>>
>> My reasoning for multi-level fanout would be that the total amount of IO
>> is that it converges as a geometric series: at each level, the amount of
>> data is reduced by a factor of 1/fanout. So even if fanout=2 at each level,
>> the total amount of IO is twice the IO of not using fanout at all. The
>> general IO overhead would be a factor of "fanout / (fanout - 1)".
>>
>>
>>> In particular, note that we already do "combiner lifting" to do as much
>>> combining as we can on the mapper side, e.g. suppose we have M elements and
>>> N workers. Each worker will (to a first order of approximation) combine M/N
>>> elements down to a single element, leaving N elements total to be combined
>>> by a worker in the subsequent stage. If N is large (or the combining
>>> computation expensive) one can use with_hot_key_fanout to add an
>>> intermediate step and let the N workers each combine M/N elements into
>>> sqrt(N) partial aggregates, and the subsequent worker only needs to combine
>>> the sqrt(N) partial aggregates. Generally N (the number of workers, not the
>>> number of elements) is small enough that multiple levels are not needed.
>>>
>>
>> Thanks for clarifying Robert. I did not realize that "combiner lifting"
>> was a thing! We had been operating under the assumption that we should use
>> fanout to sqrt(M), which could indeed be bigger than sqrt(N). In general
>> "with_hot_key_fanout" could use documentation to explain exactly what the
>> parameter means and to indicate suggested usage (e.g., set it to sqrt(N)).
>>
>> I will say that one other concern for us is memory usage. We typically
>> work with large, image-like data coming out of weather simulations, which
>> we try to divide into 10-100 MB chunks. With 1000 workers, this would
>> suggest fanout=30, which means combining up to 3 GB of data on a single
>> machine. This is probably fine but doesn't leave a large machine for error.
>>
>
> One thing in Robert's "to a first approximation" is that the pre-shuffle
> combine does flush things to shuffle to avoid running out of memory. This
> logic is per-SDK (because of how the combiner is invoked and also how to
> measure memory footprint, what caching data structure are performant, etc).
> So if this is working as intended, the impact of doing a large amount of
> pre-combining on a machine is just a lesser benefit because of having to
> flush more than one result per key, not excessive memory pressure. I'm sure
> it is inexact in specific cases, though.
>
>>
+1.

Also, the general CombineFn only combines a single element into the
aggregate at a time--no need to store all the inputs in memory at the same
time. (The flushing is primarily a concern for the many-keys situation, in
which case many aggregates are stored, one per key.)

 On Fri, May 26, 2023 at 1:57 PM Stephan Hoyer via dev <dev@beam.apache.org>
>>> wrote:
>>>
>>>> We have some use-cases where we are combining over very large sets
>>>> (e.g., computing the average of 1e5 to 1e6 elements, corresponding to
>>>> hourly weather observations over the past 50 years).
>>>>
>>>> "with_hot_key_fanout" seems to be rather essential for performing these
>>>> calculations, but as far as I can tell it only performs a single level of
>>>> fanout, i.e., instead of summing up 1e6 elements on a single node, you sum
>>>> 1e3 element 1e3 times, and then sum those 1e3 results together.
>>>>
>>>> My guess is that such combiners could be much more efficient if this
>>>> was performed in a hierarchical/multi-stage fashion proportional to
>>>> log(element_count), e.g., summing 100 elements with 3 stages, or maybe
>>>> summing 10 elements with 6 stages. Dask uses such a "tree reduction"
>>>> strategy as controlled by the "split_every" parameter:
>>>> https://github.com/dask/dask/blob/453bd7031828f72e4602498c1a1f776280794bea/dask/array/reductions.py#L109
>>>>
>>>> I understand that the number of fanout stages could not be computed
>>>> automatically in the Beam data model, but it would still be nice to be able
>>>> to specify this manually. Has there been any thought to introducing this
>>>> feature?
>>>>
>>>> Thanks,
>>>> Stephan
>>>>
>>>

Reply via email to