Re: Hierarchical fanout with Beam combiners?

2023-05-30 Thread Robert Bradshaw via dev
On Tue, May 30, 2023 at 10:37 AM Kenneth Knowles  wrote:

>
> On Sat, May 27, 2023 at 4:20 PM Stephan Hoyer via dev 
> wrote:
>
>> On Fri, May 26, 2023 at 2:59 PM Robert Bradshaw 
>> wrote:
>>
>>> Yes, with_hot_key_fanout only performs a single level of fanout. I don't
>>> think fanning out more than this has been explored, but I would imagine
>>> that for most cases the increased IO would negate most if not all of the
>>> benefits.
>>>
>>
>> My reasoning for multi-level fanout would be that the total amount of IO
>> is that it converges as a geometric series: at each level, the amount of
>> data is reduced by a factor of 1/fanout. So even if fanout=2 at each level,
>> the total amount of IO is twice the IO of not using fanout at all. The
>> general IO overhead would be a factor of "fanout / (fanout - 1)".
>>
>>
>>> In particular, note that we already do "combiner lifting" to do as much
>>> combining as we can on the mapper side, e.g. suppose we have M elements and
>>> N workers. Each worker will (to a first order of approximation) combine M/N
>>> elements down to a single element, leaving N elements total to be combined
>>> by a worker in the subsequent stage. If N is large (or the combining
>>> computation expensive) one can use with_hot_key_fanout to add an
>>> intermediate step and let the N workers each combine M/N elements into
>>> sqrt(N) partial aggregates, and the subsequent worker only needs to combine
>>> the sqrt(N) partial aggregates. Generally N (the number of workers, not the
>>> number of elements) is small enough that multiple levels are not needed.
>>>
>>
>> Thanks for clarifying Robert. I did not realize that "combiner lifting"
>> was a thing! We had been operating under the assumption that we should use
>> fanout to sqrt(M), which could indeed be bigger than sqrt(N). In general
>> "with_hot_key_fanout" could use documentation to explain exactly what the
>> parameter means and to indicate suggested usage (e.g., set it to sqrt(N)).
>>
>> I will say that one other concern for us is memory usage. We typically
>> work with large, image-like data coming out of weather simulations, which
>> we try to divide into 10-100 MB chunks. With 1000 workers, this would
>> suggest fanout=30, which means combining up to 3 GB of data on a single
>> machine. This is probably fine but doesn't leave a large machine for error.
>>
>
> One thing in Robert's "to a first approximation" is that the pre-shuffle
> combine does flush things to shuffle to avoid running out of memory. This
> logic is per-SDK (because of how the combiner is invoked and also how to
> measure memory footprint, what caching data structure are performant, etc).
> So if this is working as intended, the impact of doing a large amount of
> pre-combining on a machine is just a lesser benefit because of having to
> flush more than one result per key, not excessive memory pressure. I'm sure
> it is inexact in specific cases, though.
>
>>
+1.

Also, the general CombineFn only combines a single element into the
aggregate at a time--no need to store all the inputs in memory at the same
time. (The flushing is primarily a concern for the many-keys situation, in
which case many aggregates are stored, one per key.)

 On Fri, May 26, 2023 at 1:57 PM Stephan Hoyer via dev 
>>> wrote:
>>>
 We have some use-cases where we are combining over very large sets
 (e.g., computing the average of 1e5 to 1e6 elements, corresponding to
 hourly weather observations over the past 50 years).

 "with_hot_key_fanout" seems to be rather essential for performing these
 calculations, but as far as I can tell it only performs a single level of
 fanout, i.e., instead of summing up 1e6 elements on a single node, you sum
 1e3 element 1e3 times, and then sum those 1e3 results together.

 My guess is that such combiners could be much more efficient if this
 was performed in a hierarchical/multi-stage fashion proportional to
 log(element_count), e.g., summing 100 elements with 3 stages, or maybe
 summing 10 elements with 6 stages. Dask uses such a "tree reduction"
 strategy as controlled by the "split_every" parameter:
 https://github.com/dask/dask/blob/453bd7031828f72e4602498c1a1f776280794bea/dask/array/reductions.py#L109

 I understand that the number of fanout stages could not be computed
 automatically in the Beam data model, but it would still be nice to be able
 to specify this manually. Has there been any thought to introducing this
 feature?

 Thanks,
 Stephan

>>>


Re: Hierarchical fanout with Beam combiners?

2023-05-30 Thread Kenneth Knowles
On Sat, May 27, 2023 at 4:20 PM Stephan Hoyer via dev 
wrote:

> On Fri, May 26, 2023 at 2:59 PM Robert Bradshaw 
> wrote:
>
>> Yes, with_hot_key_fanout only performs a single level of fanout. I don't
>> think fanning out more than this has been explored, but I would imagine
>> that for most cases the increased IO would negate most if not all of the
>> benefits.
>>
>
> My reasoning for multi-level fanout would be that the total amount of IO
> is that it converges as a geometric series: at each level, the amount of
> data is reduced by a factor of 1/fanout. So even if fanout=2 at each level,
> the total amount of IO is twice the IO of not using fanout at all. The
> general IO overhead would be a factor of "fanout / (fanout - 1)".
>
>
>> In particular, note that we already do "combiner lifting" to do as much
>> combining as we can on the mapper side, e.g. suppose we have M elements and
>> N workers. Each worker will (to a first order of approximation) combine M/N
>> elements down to a single element, leaving N elements total to be combined
>> by a worker in the subsequent stage. If N is large (or the combining
>> computation expensive) one can use with_hot_key_fanout to add an
>> intermediate step and let the N workers each combine M/N elements into
>> sqrt(N) partial aggregates, and the subsequent worker only needs to combine
>> the sqrt(N) partial aggregates. Generally N (the number of workers, not the
>> number of elements) is small enough that multiple levels are not needed.
>>
>
> Thanks for clarifying Robert. I did not realize that "combiner lifting"
> was a thing! We had been operating under the assumption that we should use
> fanout to sqrt(M), which could indeed be bigger than sqrt(N). In general
> "with_hot_key_fanout" could use documentation to explain exactly what the
> parameter means and to indicate suggested usage (e.g., set it to sqrt(N)).
>
> I will say that one other concern for us is memory usage. We typically
> work with large, image-like data coming out of weather simulations, which
> we try to divide into 10-100 MB chunks. With 1000 workers, this would
> suggest fanout=30, which means combining up to 3 GB of data on a single
> machine. This is probably fine but doesn't leave a large machine for error.
>

One thing in Robert's "to a first approximation" is that the pre-shuffle
combine does flush things to shuffle to avoid running out of memory. This
logic is per-SDK (because of how the combiner is invoked and also how to
measure memory footprint, what caching data structure are performant, etc).
So if this is working as intended, the impact of doing a large amount of
pre-combining on a machine is just a lesser benefit because of having to
flush more than one result per key, not excessive memory pressure. I'm sure
it is inexact in specific cases, though.

Kenn


>
>
>>  On Fri, May 26, 2023 at 1:57 PM Stephan Hoyer via dev <
>> dev@beam.apache.org> wrote:
>>
>>> We have some use-cases where we are combining over very large sets
>>> (e.g., computing the average of 1e5 to 1e6 elements, corresponding to
>>> hourly weather observations over the past 50 years).
>>>
>>> "with_hot_key_fanout" seems to be rather essential for performing these
>>> calculations, but as far as I can tell it only performs a single level of
>>> fanout, i.e., instead of summing up 1e6 elements on a single node, you sum
>>> 1e3 element 1e3 times, and then sum those 1e3 results together.
>>>
>>> My guess is that such combiners could be much more efficient if this was
>>> performed in a hierarchical/multi-stage fashion proportional to
>>> log(element_count), e.g., summing 100 elements with 3 stages, or maybe
>>> summing 10 elements with 6 stages. Dask uses such a "tree reduction"
>>> strategy as controlled by the "split_every" parameter:
>>> https://github.com/dask/dask/blob/453bd7031828f72e4602498c1a1f776280794bea/dask/array/reductions.py#L109
>>>
>>> I understand that the number of fanout stages could not be computed
>>> automatically in the Beam data model, but it would still be nice to be able
>>> to specify this manually. Has there been any thought to introducing this
>>> feature?
>>>
>>> Thanks,
>>> Stephan
>>>
>>


Re: Hierarchical fanout with Beam combiners?

2023-05-27 Thread Stephan Hoyer via dev
On Fri, May 26, 2023 at 2:59 PM Robert Bradshaw  wrote:

> Yes, with_hot_key_fanout only performs a single level of fanout. I don't
> think fanning out more than this has been explored, but I would imagine
> that for most cases the increased IO would negate most if not all of the
> benefits.
>

My reasoning for multi-level fanout would be that the total amount of IO is
that it converges as a geometric series: at each level, the amount of data
is reduced by a factor of 1/fanout. So even if fanout=2 at each level, the
total amount of IO is twice the IO of not using fanout at all. The general
IO overhead would be a factor of "fanout / (fanout - 1)".


> In particular, note that we already do "combiner lifting" to do as much
> combining as we can on the mapper side, e.g. suppose we have M elements and
> N workers. Each worker will (to a first order of approximation) combine M/N
> elements down to a single element, leaving N elements total to be combined
> by a worker in the subsequent stage. If N is large (or the combining
> computation expensive) one can use with_hot_key_fanout to add an
> intermediate step and let the N workers each combine M/N elements into
> sqrt(N) partial aggregates, and the subsequent worker only needs to combine
> the sqrt(N) partial aggregates. Generally N (the number of workers, not the
> number of elements) is small enough that multiple levels are not needed.
>

Thanks for clarifying Robert. I did not realize that "combiner lifting" was
a thing! We had been operating under the assumption that we should use
fanout to sqrt(M), which could indeed be bigger than sqrt(N). In general
"with_hot_key_fanout" could use documentation to explain exactly what the
parameter means and to indicate suggested usage (e.g., set it to sqrt(N)).

I will say that one other concern for us is memory usage. We typically work
with large, image-like data coming out of weather simulations, which we try
to divide into 10-100 MB chunks. With 1000 workers, this would suggest
fanout=30, which means combining up to 3 GB of data on a single machine.
This is probably fine but doesn't leave a large machine for error.


>  On Fri, May 26, 2023 at 1:57 PM Stephan Hoyer via dev <
> dev@beam.apache.org> wrote:
>
>> We have some use-cases where we are combining over very large sets (e.g.,
>> computing the average of 1e5 to 1e6 elements, corresponding to hourly
>> weather observations over the past 50 years).
>>
>> "with_hot_key_fanout" seems to be rather essential for performing these
>> calculations, but as far as I can tell it only performs a single level of
>> fanout, i.e., instead of summing up 1e6 elements on a single node, you sum
>> 1e3 element 1e3 times, and then sum those 1e3 results together.
>>
>> My guess is that such combiners could be much more efficient if this was
>> performed in a hierarchical/multi-stage fashion proportional to
>> log(element_count), e.g., summing 100 elements with 3 stages, or maybe
>> summing 10 elements with 6 stages. Dask uses such a "tree reduction"
>> strategy as controlled by the "split_every" parameter:
>> https://github.com/dask/dask/blob/453bd7031828f72e4602498c1a1f776280794bea/dask/array/reductions.py#L109
>>
>> I understand that the number of fanout stages could not be computed
>> automatically in the Beam data model, but it would still be nice to be able
>> to specify this manually. Has there been any thought to introducing this
>> feature?
>>
>> Thanks,
>> Stephan
>>
>


Re: Hierarchical fanout with Beam combiners?

2023-05-26 Thread Robert Bradshaw via dev
Yes, with_hot_key_fanout only performs a single level of fanout. I don't
think fanning out more than this has been explored, but I would imagine
that for most cases the increased IO would negate most if not all of the
benefits.

In particular, note that we already do "combiner lifting" to do as much
combining as we can on the mapper side, e.g. suppose we have M elements and
N workers. Each worker will (to a first order of approximation) combine M/N
elements down to a single element, leaving N elements total to be combined
by a worker in the subsequent stage. If N is large (or the combining
computation expensive) one can use with_hot_key_fanout to add an
intermediate step and let the N workers each combine M/N elements into
sqrt(N) partial aggregates, and the subsequent worker only needs to combine
the sqrt(N) partial aggregates. Generally N (the number of workers, not the
number of elements) is small enough that multiple levels are not needed.


On Fri, May 26, 2023 at 1:57 PM Stephan Hoyer via dev 
wrote:

> We have some use-cases where we are combining over very large sets (e.g.,
> computing the average of 1e5 to 1e6 elements, corresponding to hourly
> weather observations over the past 50 years).
>
> "with_hot_key_fanout" seems to be rather essential for performing these
> calculations, but as far as I can tell it only performs a single level of
> fanout, i.e., instead of summing up 1e6 elements on a single node, you sum
> 1e3 element 1e3 times, and then sum those 1e3 results together.
>
> My guess is that such combiners could be much more efficient if this was
> performed in a hierarchical/multi-stage fashion proportional to
> log(element_count), e.g., summing 100 elements with 3 stages, or maybe
> summing 10 elements with 6 stages. Dask uses such a "tree reduction"
> strategy as controlled by the "split_every" parameter:
> https://github.com/dask/dask/blob/453bd7031828f72e4602498c1a1f776280794bea/dask/array/reductions.py#L109
>
> I understand that the number of fanout stages could not be computed
> automatically in the Beam data model, but it would still be nice to be able
> to specify this manually. Has there been any thought to introducing this
> feature?
>
> Thanks,
> Stephan
>


Hierarchical fanout with Beam combiners?

2023-05-26 Thread Stephan Hoyer via dev
We have some use-cases where we are combining over very large sets (e.g.,
computing the average of 1e5 to 1e6 elements, corresponding to hourly
weather observations over the past 50 years).

"with_hot_key_fanout" seems to be rather essential for performing these
calculations, but as far as I can tell it only performs a single level of
fanout, i.e., instead of summing up 1e6 elements on a single node, you sum
1e3 element 1e3 times, and then sum those 1e3 results together.

My guess is that such combiners could be much more efficient if this was
performed in a hierarchical/multi-stage fashion proportional to
log(element_count), e.g., summing 100 elements with 3 stages, or maybe
summing 10 elements with 6 stages. Dask uses such a "tree reduction"
strategy as controlled by the "split_every" parameter:
https://github.com/dask/dask/blob/453bd7031828f72e4602498c1a1f776280794bea/dask/array/reductions.py#L109

I understand that the number of fanout stages could not be computed
automatically in the Beam data model, but it would still be nice to be able
to specify this manually. Has there been any thought to introducing this
feature?

Thanks,
Stephan