JanKaul opened a new pull request, #22035:
URL: https://github.com/apache/datafusion/pull/22035
## Summary
This is an **exploratory PR** for the discussion in
[#`<issue-number>`](./fairspillpool_blockingoperator_v2.md) —
*"`FairSpillPool`
penalises the active operator in a pipeline of blocking spillable
operators"*.
It introduces `SubPool`, a `MemoryPool` implementation that wraps a parent
`MemoryPool` and aggregates a single operator's reservations into **one**
outer slot on the parent. Within the sub-pool, `FairSpillPool`-style
fair-share semantics are applied across the operator's own internal
reservations.
`ExternalSorter` is wired up as the first user: every `SortExec` node
lazily builds one sub-pool on first `execute()` and shares it across all
of its partitions. From the parent pool's point of view there is now one
spillable consumer per `SortExec` *node*, not one per partition and not
one per internal reservation (`ExternalSorter[N]` /
`ExternalSorterMerge[N]`).
This is the smallest concrete step toward the Velox-style hierarchical
memory pool direction sketched in the issue. It does not yet solve the
core "blocking operator penalised for inactive consumers" problem on its
own — but it does give us the structure (per-operator sub-pools, one
slot per operator at the parent) on which a richer arbitration policy
could be built. See *Open questions* below.
## What's in the change
* **`datafusion/execution/src/memory_pool/pool.rs`**
* New `SubPool` type. Holds one outer `MemoryReservation`
(`can_spill = true`) against the parent, plus a local `num_spill` /
`spillable` / `unspillable` triple.
* Internal `try_grow` applies a `FairSpillPool`-style per-reservation
cap computed from a *dynamic* capacity:
`parent.memory_limit() - parent.reserved() + self.outer.size()`. The
`+ outer.size()` term re-includes what we already hold so the
sub-pool doesn't punish itself twice.
* Public `sub_pool(parent, outer_name) -> Arc<dyn MemoryPool>`
convenience constructor.
* Falls back to `usize::MAX` capacity when the parent reports
`Infinite` / `Unknown`.
* **`datafusion/execution/src/memory_pool/mod.rs`**
* Doc-comment on `MemoryPool` describing `SubPool` alongside
`TrackConsumersPool`, including a recommendation: use
`GreedyMemoryPool` as the parent for plans that chain multiple
blocking spillable operators back-to-back, so each sub-pool can grow
into the full parent pool when it is the only one currently using
memory.
* **`datafusion/physical-plan/src/sorts/sort.rs`**
* `SortExec` gains a `shared_pool: OnceLock<Arc<dyn MemoryPool>>`.
Lazily initialised on the first `execute()` call (the only point at
which the `RuntimeEnv` is in scope), reused across all partitions
of that `SortExec` instance.
* `Clone for SortExec` is custom: the clone gets a fresh `OnceLock`,
so a cloned `SortExec` is treated as a separate operator instance
and gets its own sub-pool registration. Same treatment in
`SortExec::with_fetch`.
* `ExternalSorter::new` takes the per-`SortExec` sub-pool by
parameter and registers `ExternalSorter[N]` /
`ExternalSorterMerge[N]` against it instead of against
`runtime.memory_pool`.
* Process-wide `SORT_EXEC_INSTANCE_ID` counter names each sub-pool's
outer consumer (`SortExec[#k]:subpool`) so
`TrackConsumersPool` reports remain disambiguable.
## Behaviour change
For a plan with N `SortExec` nodes against the runtime's pool:
| Before | After
|
| ------------------------------------ |
------------------------------------------------- |
| 2 spillable consumers per partition | 1 spillable consumer per `SortExec`
node |
| Parent `num_spill` grows with partitions and with internal reservations |
Parent `num_spill` grows only with operator count |
| Per-partition reservations compete directly at the parent | Per-partition
reservations compete inside the sub-pool; the sub-pool competes at the parent |
The most interesting effect is when the runtime pool is a
`GreedyMemoryPool`: each `SortExec`'s sub-pool can grow to the full
parent pool when it is the only one actively reserving, and shrinks
back as siblings start reserving. This matches the "operator currently
doing work gets the headroom it needs" intuition that the original
issue argues for, *as long as the parent is greedy*. With a
`FairSpillPool` parent the sub-pool still counts as one consumer there,
so M coexisting sub-pools cap their outer at `parent / M` — the
sub-pool's only added value in that case is the internal fair-share
across partitions.
## Open questions for reviewers
1. **Is per-operator sub-pool the right granularity?** The Velox model
nests further (query → task → pipeline → operator). Should we expose
sub-pools at the `ExecutionPlan` level rather than wiring each
spillable operator individually?
2. **Recommended parent.** The doc-comment recommends
`GreedyMemoryPool` as the parent. For multi-tenant deployments that
currently use `FairSpillPool` as their root, is this acceptable, or
do we need a hybrid that's fair across queries but greedy within a
query?
3. **Other spillable operators.** Only `ExternalSorter` is migrated
here. Hash aggregation, sort-merge join (which has its own
`with_can_spill(false)` quirk noted in the issue), repartition, and
nested-loop join are all candidates. Should those land in this PR
or as follow-ups?
4. **Naming.** `SubPool` is descriptive but generic. Other options:
`OperatorMemoryPool`, `AggregatingPool`, `NestedPool`. Preferences?
5. **Capacity formula.** The dynamic
`parent.memory_limit() - parent.reserved() + outer.size()` works for
`GreedyMemoryPool` parents but interacts oddly with `FairSpillPool`
parents (the sub-pool sees the full parent capacity even though its
outer is capped at `parent / M`). Should `MemoryPool` grow a
`headroom_for(consumer)` method instead?
## Test plan
- [ ] `cargo test -p datafusion-execution memory_pool`
- [ ] `cargo test -p datafusion --test core_integration
concurrent_sort_unfairness`
— chain-of-`SortExec` reproducer from the linked issue. The
bottom-most operator's spill count should drop substantially when
the parent is `GreedyMemoryPool`.
- [ ] `cargo test -p datafusion-physical-plan sorts::sort` — sanity-check
that single-`SortExec` behaviour is unchanged when the parent
pool has only this one operator.
- [ ] Manual check on a multi-`SortExec` plan with
`TrackConsumersPool(GreedyMemoryPool)` as the runtime pool: the
top-consumers list should now show `SortExec[#k]:subpool` entries
rather than per-partition `ExternalSorter[N]` entries.
- [ ] Confirm error messages from `SubPool::try_grow` still carry the
inner consumer name (the `inner_context()` wrapping) — important
so failures like "ExternalSorter[3] couldn't grow" don't get
reported as "SortExec[#0]:subpool couldn't grow".
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]