Dandandan opened a new pull request, #21729:
URL: https://github.com/apache/datafusion/pull/21729

   ## Summary
   
   Wrap `AggregateExec`'s output stream with a small `pin_stream_to_thread` 
helper that drives the inner stream on a dedicated `spawn_blocking` thread 
hosting a `current_thread` tokio runtime. Batches flow back to the caller 
through a bounded `mpsc`; the caller still sees a normal 
`SendableRecordBatchStream`.
   
   Applied to every `AggregateMode` (Partial, PartialReduce, Final, 
FinalPartitioned, Single, SinglePartitioned) since they all hold the same kind 
of persistent grouping state.
   
   ## Motivation
   
   On the default multi-threaded tokio runtime, a `RepartitionExec`-fed 
aggregation migrates **constantly** between worker threads — every channel 
`.await` is a scheduling point and tokio's work-stealer happily moves the task 
to an idle worker. That bounces the partial-agg's hash map + accumulators 
across CPU caches on every poll.
   
   ### Measurement
   
   Added (but `#[ignore]`d) 
`aggregates::tests::measure_partial_agg_thread_migration`. 16 input partitions, 
4 MT workers, input = `TestMemoryExec → RepartitionExec(Hash, 16) → 
PartialAgg`. A `ThreadProbe` wraps the raw partial-agg output stream (*inner*) 
and the forwarding stream returned by `pin_stream_to_thread` (*outer*), 
recording `thread::current().id()` at every `poll_next`.
   
   | | inner (partial-agg build) | outer (mpsc receiver) |
   |---|---|---|
   | **before this change** | 16/16 partitions migrated; 254 switches across 
376 polls (**~68%**) | — |
   | **after this change**  | **0/16 partitions migrated; 0 switches across 
2585 polls** | 7/16 partitions migrated, 7 switches across 43 polls |
   
   The outer (driver-side) task still migrates — that's fine, it only carries 
the mpsc receive. The heavy hash-map state is now pinned for the partition's 
lifetime.
   
   Run with: `cargo test -p datafusion-physical-plan --lib 
aggregates::tests::measure_partial_agg_thread_migration -- --ignored 
--nocapture`
   
   ## Tradeoffs
   
   - **Gain**: aggregation state is cache-sticky; no migrations mid-build.
   - **Loss of work-stealing for the agg task**: a pinned aggregation no longer 
participates in MT work-stealing. For balanced partition counts (≈ CPU count, 
the common DataFusion case) the cache-locality win dominates. For very skewed 
partitions the long-tail partition can't be helped by idle threads mid-flight.
   - **Blocking-pool pressure**: each pinned partition holds one thread from 
the blocking pool (default 512) for the partition's lifetime. Heavy concurrency 
× high partition counts could saturate it; a dedicated thread-per-core executor 
is a cleaner answer for that regime but is out of scope for this PR.
   
   ## What's *not* in scope
   
   ClickBench's 43 queries also exercise `SortExec` / `TopKExec` on the ~10 
`ORDER BY ... LIMIT N` queries — same stateful-build-over-many-awaits shape, 
same potential win. Happy to do a follow-up PR if the idea is welcome; keeping 
this one focused on aggregates.
   
   ## Test plan
   
   - [x] `cargo check -p datafusion-physical-plan`
   - [x] `cargo clippy -p datafusion-physical-plan --all-targets -- -D warnings`
   - [x] `cargo fmt --all -- --check`
   - [x] Local probe shows inner migrations drop from 254 → 0.
   - [ ] ClickBench wall-clock before/after (happy to run and post numbers if 
reviewers want them).
   
   🤖 Generated with [Claude Code](https://claude.com/claude-code)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to