Dandandan opened a new pull request, #21741:
URL: https://github.com/apache/datafusion/pull/21741
## Summary
Ports the experimental morsel-driven scheduler from apache/datafusion#2226
(tustvold, 2022) onto plain `tokio` + `crossbeam-deque` (the original used
`rayon`). Lives in a new workspace crate `datafusion-push-scheduler` and is
opt-in via a new `--push-scheduler` flag on `dfbench`.
- **`Pipeline` trait** (`push` / `close` / `output_partitions` /
`poll_partition`) and `Task` + `TaskWaker` dedup — verbatim port of PR #2226's
`task.rs`.
- **`WorkerPool`**: N OS threads, crossbeam `Injector` + per-worker LIFO
`Worker` deque + thread-local FIFO side-queue (matches rayon's spawn_local /
spawn_local_fifo semantics), parked on a `Condvar`. Each worker enters a
**shared** `tokio::runtime::Handle` so wrapped pull-based operators' internal
`tokio::spawn` calls still target a real multi-thread runtime (this is the key
difference from the existing `morsel-scheduler` branch which uses per-worker
`current_thread` runtimes and deadlocks on nested `tokio::spawn`).
- **`ExecutionPipeline`** wraps the entire `ExecutionPlan` as one push leaf
for v1. `RepartitionPipeline` / `SortPipeline` (hash + round-robin repartition,
in-memory sort via `sort_batch_chunked`) are implemented with the correct
`Pipeline` semantics but not yet wired into the planner — porting PR #2226's
"Inbox" rewiring (to splice scheduler push output into the wrapped subtree's
leaf) is the next step before breaker cuts can be enabled.
- **`dfbench` integration**: `CommonOpt` gains `--push-scheduler`; the
ClickBench runner routes `.collect()` through the scheduler when the flag is
set.
- **Perf work already landed**:
- `RepartitionPipeline` wakes downstream only on the empty→non-empty
transition on its output buffers (avoids redundant atomic ops on the hot push
path).
- `spawn_local` onto the owner's deque skips the peer-wake notification —
preserves cache locality since the owner will LIFO-pop its own push on the next
iteration.
End-to-end SQL round-trip tests cover projection/filter, hash repartition,
sort, top-k with limit, and group-by — all match the default path's output.
## Test plan
- [x] `cargo test -p datafusion-push-scheduler` — 3 unit + 5 end-to-end
tests pass
- [x] `cargo clippy -p datafusion-push-scheduler --all-targets
--all-features -- -D warnings`
- [x] `cargo clippy -p datafusion-benchmarks -- -D warnings`
- [x] `cargo fmt --all`
- [ ] Run ClickBench with `--push-scheduler` vs. default and compare
wall-clock
- [ ] Port PR #2226's Inbox machinery so `RepartitionPipeline` /
`SortPipeline` can be cut at in the planner
🤖 Generated with [Claude Code](https://claude.com/claude-code)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]