JanKaul opened a new pull request, #22035:
URL: https://github.com/apache/datafusion/pull/22035

   ## Summary
   
   This is an **exploratory PR** for the discussion in
   [#`<issue-number>`](./fairspillpool_blockingoperator_v2.md) — 
*"`FairSpillPool`
   penalises the active operator in a pipeline of blocking spillable
   operators"*.
   
   It introduces `SubPool`, a `MemoryPool` implementation that wraps a parent
   `MemoryPool` and aggregates a single operator's reservations into **one**
   outer slot on the parent. Within the sub-pool, `FairSpillPool`-style
   fair-share semantics are applied across the operator's own internal
   reservations.
   
   `ExternalSorter` is wired up as the first user: every `SortExec` node
   lazily builds one sub-pool on first `execute()` and shares it across all
   of its partitions. From the parent pool's point of view there is now one
   spillable consumer per `SortExec` *node*, not one per partition and not
   one per internal reservation (`ExternalSorter[N]` /
   `ExternalSorterMerge[N]`).
   
   This is the smallest concrete step toward the Velox-style hierarchical
   memory pool direction sketched in the issue. It does not yet solve the
   core "blocking operator penalised for inactive consumers" problem on its
   own — but it does give us the structure (per-operator sub-pools, one
   slot per operator at the parent) on which a richer arbitration policy
   could be built. See *Open questions* below.
   
   ## What's in the change
   
   * **`datafusion/execution/src/memory_pool/pool.rs`**
     * New `SubPool` type. Holds one outer `MemoryReservation`
       (`can_spill = true`) against the parent, plus a local `num_spill` /
       `spillable` / `unspillable` triple.
     * Internal `try_grow` applies a `FairSpillPool`-style per-reservation
       cap computed from a *dynamic* capacity:
       `parent.memory_limit() - parent.reserved() + self.outer.size()`. The
       `+ outer.size()` term re-includes what we already hold so the
       sub-pool doesn't punish itself twice.
     * Public `sub_pool(parent, outer_name) -> Arc<dyn MemoryPool>`
       convenience constructor.
     * Falls back to `usize::MAX` capacity when the parent reports
       `Infinite` / `Unknown`.
   
   * **`datafusion/execution/src/memory_pool/mod.rs`**
     * Doc-comment on `MemoryPool` describing `SubPool` alongside
       `TrackConsumersPool`, including a recommendation: use
       `GreedyMemoryPool` as the parent for plans that chain multiple
       blocking spillable operators back-to-back, so each sub-pool can grow
       into the full parent pool when it is the only one currently using
       memory.
   
   * **`datafusion/physical-plan/src/sorts/sort.rs`**
     * `SortExec` gains a `shared_pool: OnceLock<Arc<dyn MemoryPool>>`.
       Lazily initialised on the first `execute()` call (the only point at
       which the `RuntimeEnv` is in scope), reused across all partitions
       of that `SortExec` instance.
     * `Clone for SortExec` is custom: the clone gets a fresh `OnceLock`,
       so a cloned `SortExec` is treated as a separate operator instance
       and gets its own sub-pool registration. Same treatment in
       `SortExec::with_fetch`.
     * `ExternalSorter::new` takes the per-`SortExec` sub-pool by
       parameter and registers `ExternalSorter[N]` /
       `ExternalSorterMerge[N]` against it instead of against
       `runtime.memory_pool`.
     * Process-wide `SORT_EXEC_INSTANCE_ID` counter names each sub-pool's
       outer consumer (`SortExec[#k]:subpool`) so
       `TrackConsumersPool` reports remain disambiguable.
   
   ## Behaviour change
   
   For a plan with N `SortExec` nodes against the runtime's pool:
   
   | Before                               | After                               
              |
   | ------------------------------------ | 
------------------------------------------------- |
   | 2 spillable consumers per partition  | 1 spillable consumer per `SortExec` 
node          |
   | Parent `num_spill` grows with partitions and with internal reservations | 
Parent `num_spill` grows only with operator count |
   | Per-partition reservations compete directly at the parent | Per-partition 
reservations compete inside the sub-pool; the sub-pool competes at the parent |
   
   The most interesting effect is when the runtime pool is a
   `GreedyMemoryPool`: each `SortExec`'s sub-pool can grow to the full
   parent pool when it is the only one actively reserving, and shrinks
   back as siblings start reserving. This matches the "operator currently
   doing work gets the headroom it needs" intuition that the original
   issue argues for, *as long as the parent is greedy*. With a
   `FairSpillPool` parent the sub-pool still counts as one consumer there,
   so M coexisting sub-pools cap their outer at `parent / M` — the
   sub-pool's only added value in that case is the internal fair-share
   across partitions.
   
   ## Open questions for reviewers
   
   1. **Is per-operator sub-pool the right granularity?** The Velox model
      nests further (query → task → pipeline → operator). Should we expose
      sub-pools at the `ExecutionPlan` level rather than wiring each
      spillable operator individually?
   2. **Recommended parent.** The doc-comment recommends
      `GreedyMemoryPool` as the parent. For multi-tenant deployments that
      currently use `FairSpillPool` as their root, is this acceptable, or
      do we need a hybrid that's fair across queries but greedy within a
      query?
   3. **Other spillable operators.** Only `ExternalSorter` is migrated
      here. Hash aggregation, sort-merge join (which has its own
      `with_can_spill(false)` quirk noted in the issue), repartition, and
      nested-loop join are all candidates. Should those land in this PR
      or as follow-ups?
   4. **Naming.** `SubPool` is descriptive but generic. Other options:
      `OperatorMemoryPool`, `AggregatingPool`, `NestedPool`. Preferences?
   5. **Capacity formula.** The dynamic
      `parent.memory_limit() - parent.reserved() + outer.size()` works for
      `GreedyMemoryPool` parents but interacts oddly with `FairSpillPool`
      parents (the sub-pool sees the full parent capacity even though its
      outer is capped at `parent / M`). Should `MemoryPool` grow a
      `headroom_for(consumer)` method instead?
   
   ## Test plan
   
   - [ ] `cargo test -p datafusion-execution memory_pool`
   - [ ] `cargo test -p datafusion --test core_integration 
concurrent_sort_unfairness`
         — chain-of-`SortExec` reproducer from the linked issue. The
         bottom-most operator's spill count should drop substantially when
         the parent is `GreedyMemoryPool`.
   - [ ] `cargo test -p datafusion-physical-plan sorts::sort` — sanity-check
         that single-`SortExec` behaviour is unchanged when the parent
         pool has only this one operator.
   - [ ] Manual check on a multi-`SortExec` plan with
         `TrackConsumersPool(GreedyMemoryPool)` as the runtime pool: the
         top-consumers list should now show `SortExec[#k]:subpool` entries
         rather than per-partition `ExternalSorter[N]` entries.
   - [ ] Confirm error messages from `SubPool::try_grow` still carry the
         inner consumer name (the `inner_context()` wrapping) — important
         so failures like "ExternalSorter[3] couldn't grow" don't get
         reported as "SortExec[#0]:subpool couldn't grow".


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to