JanKaul commented on issue #22090: URL: https://github.com/apache/datafusion/issues/22090#issuecomment-4415793356
Thanks for the quick reply! Let me try to defend my position for the sake of
exploring the solution space.
With my proposal, the buffered bytes are capped at a certain value, but
every time a record batch is consumed, its bytes are released, freeing up
headroom. This essentially means the RepartitionExec stays "open" as long as it
receives and releases memory at roughly the same rate. So yes, it can
theoretically lead to more deadlocks.
That said, the situation under which it "locks" is generally an undesirable
one to begin with: it's the case where a RepartitionExec continuously receives
more bytes than it can release. This can of course be legitimately required by
certain downstream consumers, as your example shows. But there is typically a
limit to how much data downstream consumers need in order to make progress, and
that's what this cap is meant to enforce. The cap could be rather large,
allowing for the scenarios you mentioned. I'd argue that if the unhealthy state
of receiving more than is being released persists for too long, it eventually
leads to OOM, which isn't much better than a deadlock. The problem is that the
higher memory consumption of RepartitionExec leads to even slower downstream
consumers because they are forced to spill which leads to some kind of
self-amplification effect.
Again, thanks a lot for taking the time to explain. I'll try to dig deeper
into it and see if there are other possibilities to apply more backpressure.
Like you suggested.
A bit of context of what I'm trying to achieve. So I'm trying to run TPCH
query 17 sf=100 in a memory constrained environment with either 4gb or 8gb of
memory. I'm using SortMergeJoin as the Join algorithm. And I'm experimenting
with diffent(also custom) memory pools.
The query is:
```sql
select
sum(l_extendedprice) / 7.0 as avg_yearly
from
embucket.tpch.lineitem,
embucket.tpch.part
where
p_partkey = l_partkey
and p_brand = 'Brand#42'
and p_container = 'LG BAG'
and l_quantity < (
select
0.2 * avg(l_quantity)
from
embucket.tpch.lineitem
where
l_partkey = p_partkey
);
```
I run into the problem that only one ExternalSorter partition continues
working and 5.3gb of my 8gb memory pool is stuck in the RepartitionExec.
```
[trace] tc.grow name="ExternalSorter[13]" tid=ThreadId(13) +393216 ->
reserved 254017536 (was 253624320) state=0
[trace] tc.grow name="RepartitionExec[14]" tid=ThreadId(7) +12408 ->
reserved 389708208 (was 389695800) state=0
[trace] tc.grow name="RepartitionExec[15]" tid=ThreadId(7) +12384 ->
reserved 175256112 (was 175243728) state=0
[trace] tc.grow name="RepartitionExec[1]" tid=ThreadId(3) +13512 ->
reserved 513931128 (was 513917616) state=0
[trace] tc.grow name="RepartitionExec[2]" tid=ThreadId(3) +13632 ->
reserved 512652240 (was 512638608) state=0
[trace] tc.grow name="RepartitionExec[3]" tid=ThreadId(3) +12720 ->
reserved 357972600 (was 357959880) state=0
[trace] tc.grow name="RepartitionExec[4]" tid=ThreadId(3) +12360 ->
reserved 512861352 (was 512848992) state=0
[trace] tc.grow name="RepartitionExec[5]" tid=ThreadId(3) +11688 ->
reserved 358234632 (was 358222944) state=0
[trace] tc.grow name="RepartitionExec[6]" tid=ThreadId(3) +11904 ->
reserved 358406016 (was 358394112) state=0
[trace] tc.grow name="RepartitionExec[7]" tid=ThreadId(3) +13392 ->
reserved 378418200 (was 378404808) state=0
[trace] tc.grow name="RepartitionExec[8]" tid=ThreadId(3) +12552 ->
reserved 513046896 (was 513034344) state=0
[trace] tc.grow name="RepartitionExec[9]" tid=ThreadId(3) +11664 ->
reserved 481272456 (was 481260792) state=0
[trace] tc.grow name="RepartitionExec[10]" tid=ThreadId(3) +13128 ->
reserved 513369552 (was 513356424) state=0
[trace] tc.grow name="RepartitionExec[11]" tid=ThreadId(3) +11832 ->
reserved 11832 (was 0) state=0
[trace] tc.grow name="RepartitionExec[12]" tid=ThreadId(3) +12816 ->
reserved 175500096 (was 175487280) state=0
[trace] tc.grow name="RepartitionExec[1]" tid=ThreadId(5) +12600 ->
reserved 513956712 (was 513944112) state=0
[trace] tc.grow name="RepartitionExec[1]" tid=ThreadId(16) +12984 ->
reserved 513904992 (was 513892008) state=0
[trace] tc.shrink name="RepartitionExec[11]" tid=ThreadId(4) -12096 ->
reserved 0 (was 12096) state=0
[trace] tc.grow name="RepartitionExec[1]" tid=ThreadId(14) +12624 ->
reserved 513917616 (was 513904992) state=0
[trace] tc.grow name="RepartitionExec[2]" tid=ThreadId(14) +14112 ->
reserved 512678784 (was 512664672) state=0
[trace] tc.grow name="RepartitionExec[3]" tid=ThreadId(14) +12960 ->
reserved 357985560 (was 357972600) state=0
[trace] tc.grow name="RepartitionExec[4]" tid=ThreadId(14) +12960 ->
reserved 512874312 (was 512861352) state=0
[trace] tc.grow name="RepartitionExec[14]" tid=ThreadId(11) +12360 ->
reserved 389720568 (was 389708208) state=0
[trace] tc.grow name="RepartitionExec[0]" tid=ThreadId(6) +12624 ->
reserved 378636912 (was 378624288) state=0
[trace] tc.grow name="RepartitionExec[2]" tid=ThreadId(2) +11424 ->
reserved 512638608 (was 512627184) state=0
[trace] tc.grow name="RepartitionExec[14]" tid=ThreadId(9) +13176 ->
reserved 389695800 (was 389682624) state=0
[trace] tc.grow name="RepartitionExec[1]" tid=ThreadId(17) +12984 ->
reserved 513944112 (was 513931128) state=0
[trace] tc.grow name="RepartitionExec[14]" tid=ThreadId(8) +12408 ->
reserved 389732976 (was 389720568) state=0
[trace] tc.shrink name="RepartitionExec[11]" tid=ThreadId(4) -11832 ->
reserved 0 (was 11832) state=0
[trace] tc.grow name="RepartitionExec[13]" tid=ThreadId(16) +12960 ->
reserved 36493536 (was 36480576) state=0
[trace] tc.grow name="RepartitionExec[14]" tid=ThreadId(5) +11736 ->
reserved 389744712 (was 389732976) state=0
[trace] tc.grow name="RepartitionExec[2]" tid=ThreadId(3) +12432 ->
reserved 512664672 (was 512652240) state=0
[trace] tc.grow name="RepartitionExec[13]" tid=ThreadId(15) +12840 ->
reserved 36480576 (was 36467736) state=0
[trace] tc.grow name="RepartitionExec[0]" tid=ThreadId(7) +12336 ->
reserved 378649248 (was 378636912) state=0
[trace] tc.grow name="RepartitionExec[4]" tid=ThreadId(12) +12264 ->
reserved 512848992 (was 512836728) state=0
[trace] tc.grow name="RepartitionExec[4]" tid=ThreadId(10) +12264 ->
reserved 512836728 (was 512824464) state=0
[trace] tc.shrink name="ExternalSorter[13]" tid=ThreadId(13) -254017536 ->
reserved 0 (was 254017536) state=0
[trace] tc.grow name="ExternalSorter[13]" tid=ThreadId(13) +393216 ->
reserved 393216 (was 0) state=0
[trace] tc.grow name="ExternalSorter[13]" tid=ThreadId(13) +393216 ->
reserved 786432 (was 393216) state=0
[trace] tc.grow name="ExternalSorter[13]" tid=ThreadId(13) +393216 ->
reserved 1179648 (was 786432) state=0
[trace] tc.grow name="ExternalSorter[13]" tid=ThreadId(13) +393216 ->
reserved 1572864 (was 1179648) state=0
[trace] tc.grow name="ExternalSorter[13]" tid=ThreadId(13) +393216 ->
reserved 1966080 (was 1572864) state=0
[trace] tc.grow name="ExternalSorter[13]" tid=ThreadId(13) +393216 ->
reserved 2359296 (was 1966080) state=0
[trace] tc.grow name="ExternalSorter[13]" tid=ThreadId(13) +393216 ->
reserved 2752512 (was 2359296) state=0
[trace] tc.grow name="ExternalSorter[13]" tid=ThreadId(13) +393216 ->
reserved 3145728 (was 2752512) state=0
[trace] tc.grow name="ExternalSorter[13]" tid=ThreadId(13) +393216 ->
reserved 3538944 (was 3145728) state=0
[trace] tc.grow name="ExternalSorter[13]" tid=ThreadId(13) +393216 ->
reserved 3932160 (was 3538944) state=0
[trace] tc.grow name="ExternalSorter[13]" tid=ThreadId(13) +393216 ->
reserved 4325376 (was 3932160) state=0
[trace] tc.grow name="ExternalSorter[13]" tid=ThreadId(13) +393216 ->
reserved 4718592 (was 4325376) state=0
[trace] tc.grow name="ExternalSorter[13]" tid=ThreadId(13) +393216 ->
reserved 5111808 (was 4718592) state=0
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
