JanKaul opened a new issue, #22090:
URL: https://github.com/apache/datafusion/issues/22090
### Describe the bug
`RepartitionExec`'s distribution channels (`distributor_channels.rs`) only
throttle producers when **every** output channel has at least one buffered item
("Gate A"). With balanced hash partitioning and a single slow consumer, the
slow channel grows linearly per input batch.
### To reproduce
Trace with N=4 partitions, ch3's consumer slower than the others:
1. After the first input batch each channel has ≥1 item → Gate A closes.
2. ch0 drains → Gate A opens.
3. Producer pushes the next input batch's sub-batches: one
`take`-materialized batch per partition. Push to ch0 succeeds (was empty); ch1,
ch2, ch3 already had data, so the empty-counter never decrements during the
round.
4. After the round all channels are non-empty again → Gate A re-closes. ch3
has gained 1.
Each cycle adds another batch to ch3 — linear growth in the number of input
batches, until OOM (or until the per-partition `SharedMemoryReservation`
triggers spilling).
### Cause
Gate A's invariant is "no live channel is empty." A single fast channel
oscillating empty/non-empty keeps the gate open forever, and every gate-open
window lets the producer push to **every** partition — including the slow one.
Skewed input data is *not* required; even hash distribution + one lagging
consumer is enough.
### Expected behavior
The producer should be throttled when total buffered memory crosses a
configured threshold, regardless of how many channels are technically non-empty.
### Proposed fix
Add a second gate condition — total buffered bytes across all channels ≥ a
configured budget — and close the gate when **either** condition fires (`A ||
B`). Gate A still gives O(1) per-channel depth for balanced workloads with even
consumer rates; Gate B caps total memory whenever some channel never drains.
Velox's `LocalExchangeMemoryManager` is the design template.
I'll send a PR shortly.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]