metegenez opened a new pull request, #1684:
URL: https://github.com/apache/datafusion-ballista/pull/1684
Adds a new AQE physical-optimizer rule that, after upstream shuffle stages
finalize, attaches a per-stage **coalesce decision** to every leaf
`ExchangeExec` feeding the next stage. Each downstream `ShuffleReaderExec`
is then built with `K < M` partitions instead of `M`, eliminating wasted
scheduler/task overhead on near-empty shuffle outputs.
Built on top of @milenkovicm's lazy AQE (#1649, merged 2026-05-09). The
rule implements DataFusion's standard `PhysicalOptimizerRule` and slots
into `AdaptivePlanner::default_optimizers()` — no new trait, no
upstream DataFusion changes, no executor data-path changes.
## How it works
For each `replan_stages()` pass:
1. Collect every leaf `ExchangeExec` in the current stage subtree — this
is the **alignment group**. All members share the upstream partition
count `M` and were hash-partitioned on the same join key upstream, so
partition `i` means the same logical hash bucket on every leaf.
2. Sum per-partition byte stats element-wise across the group.
3. Bin-pack the summed sizes via Spark's `ShufflePartitionsUtil` algorithm
(`split_size_list_by_target_size`, with small/merged factors).
4. If `1 < K < M`, attach the *same* `CoalescePlan` (with K partition
groups) to every leaf via `set_coalesce(..)`.
Sharing one `CoalescePlan` across the whole alignment group is what makes
the rule safe for hash-Partitioned joins and SortMergeJoin: hash buckets
that were aligned at M stay aligned at K, so DataFusion's
`EnforceDistribution` doesn't insert remediation repartitions.
## SF=100 TPC-H results (4-way A/B)
16 partitions/table, 32 GB memory pool, 16 concurrent task slots,
`target_partition_bytes=64 MB`, warm = median of iters 1-4.
| Variant | Total warm | vs baseline | Wins (≥5%) | Losses (≥5%) |
|---|---:|---:|---:|---:|
| H+AQE (hash join, no coalesce) | 239.7 s | baseline | — | — |
| **H+coal** (hash join + rule) | **230.6 s** | **-3.8%** | 7 | 3 |
| S+AQE (sort-merge join, no coalesce) | 425.5 s | baseline | — | — |
| **S+coal** (sort-merge join + rule) | **419.5 s** | **-1.4%** | 9 | 4 |
### Wins explained by coalesce activity
These are the queries where the rule *fired* (`C` column = coalesce events
across all stages). The pattern is clear: queries with multi-leg shuffles
benefit most because the rule reduces the downstream task fan-out across
*all* legs of the alignment group simultaneously.
| q | shape | C(hash) | Δ hash | C(sort) | Δ sort | why |
|---:|:---|---:|---:|---:|---:|:---|
| q2 | correlated subquery | 45 | **-10.9%** | 45 | +4.3% | most
coalesce events of any query; each subquery iteration's shuffle gets packed |
| q5 | 5-way join chain | 30 | **-15.5%** | 30 | **-5.6%** | 5
alignment-group members all get the same K — task count drops at every join |
| q7 | 4-way join | 20 | -2.6% | 20 | **-12.5%** |
sort-merge benefits more here — fewer sort inputs at K |
| q8 | nation/region 3-way join | 20 | **-6.9%** | 20 | **-8.3%** |
balanced wins on both join modes |
| q9 | 5-chained inner joins | 10 | **-9.7%** | 10 | +5.8% | hash
wins clean; sort has a small regression we haven't dug into |
| q10 | customer-orders agg join | 10 | **-7.5%** | 10 | **-5.5%** | win
in both modes — typical \"build side near-empty after filter\" case |
| q11 | two parallel multi-way joins | 20 | +1.8% | 20 | **-5.3%** | |
| q18 | GROUP BY o_orderkey | 0 | **-16.8%** | 0 | -4.9% | C=0 win
— pure timing variance, not coalesce-attributable |
### Wins on `C=0` queries (q14, q18, q22, q3-sort, q19-sort)
These are not real coalesce wins — they're queries where the bin-pack
returned `K=1` or `K=M` (degenerate, rule no-ops) but the timing still
moved. Cause is timing variance on sub-3-second queries, not the rule.
Listed for completeness; not part of the headline story.
### Losses
`C=0` regressions (q3 hash +32.8%, q17 +17.4%, q4 +10.9%, q20-sort +15.7%)
trace to stage-pipeline overhead, not the rule's logic: the AQE pipeline
runs the full DataFusion optimizer per stage capture. Bigger plan trees
take longer to re-optimize per pass. This is independent of whether
coalesce is enabled and tracked separately.
## Correctness
- **21/22 queries pass** at SF=100 with `coalesce.enabled=true` (one
manual re-run with all 22 also clean — see local sanity run).
- Compared to an earlier walker-based prototype: q2, q7, q8 went from
\`partition count mismatch X!=Y\` panics → pass. q15 went from hang → pass.
- All queries return identical row counts across enabled / disabled
variants.
## What's in this PR
| Area | Change |
|---|---|
| **Rule** | New `CoalescePartitionsRule` in
`ballista/scheduler/src/state/aqe/optimizer_rule/coalesce_partitions.rs`.
Single transform-up walk; one bin-pack decision per alignment group; same
`CoalescePlan` attached to every leaf. |
| **Carrier** | `ExchangeExec` gains an
`Arc<Mutex<Option<Arc<CoalescePlan>>>>` slot with `set_coalesce` / `coalesce`
accessors. |
| **Adapter** | `BallistaAdapter::adapt_to_ballista` checks
`exchange.coalesce()`; on `Some(cp)` it builds a K-partition
`ShuffleReaderExec` via `try_new_coalesced` instead of the default `try_new`. |
| **Config** | New keys: `ballista.coalesce.enabled` (default `true`),
`target_partition_bytes` (default 64 MB), `small_partition_factor` (default
0.2), `merged_partition_factor` (default 1.2). |
| **Algorithm** | Pure-CPU bin-pack helpers in
`state/aqe/coalesce/algorithm.rs` — `split_size_list_by_target_size` +
`start_indices_to_partition_groups`, with rstest cases for each Spark-parity
edge case (small-tail folding, merged-factor early flush, strict-overshoot
boundary). |
| **Proto** | `CoalescePlan` + `PartitionGroup` round-trip via
`BallistaPhysicalExtensionCodec`. |
## Test plan
- [x] All 22 TPC-H queries pass at SF=100 with `coalesce.enabled=true`
(single iteration, all `rc=0`, zero panics, zero `ERROR`-level lines in
scheduler/executor logs).
- [x] 4-way SF=100 A/B benchmark (hash×coalesce × sort×coalesce, N=5 each) →
table above.
- [x] 29 scheduler-side unit tests, including 6 new functional tests in
`state::aqe::test::coalesce_rule` that snapshot displayed plan trees and assert
on the `coalesce=K of M` field for: happy path, disabled rule, K=M degenerate,
single hash join, two hash joins, sort-merge join.
- [x] Algorithm-level rstest cases for bin-pack edge cases.
## Why draft
Sequencing — wanted the structural refactor (legacy v1 removed, broadcast
config dropped, walker types deleted) and the SF=100 evidence visible in
one place before treating the rule as a final design.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]