wirybeaver opened a new pull request, #1676:
URL: https://github.com/apache/datafusion-ballista/pull/1676
## Summary
Brings three Pinot V2 Physical Optimizer ideas to Ballista so queries
against pre-bucketed tables can avoid the network shuffle on every join:
1. **Colocated joins** — when both join inputs declare the same hash key,
hash function, and bucket count, the join's required hash distribution
is satisfied directly and the `RepartitionExec` above each input is
stripped before `DistributedExchangeRule` would promote it to an
`ExchangeExec` stage boundary.
2. **Sub-partitioning** for divisor bucket counts (e.g. 16 vs 8). The
larger side is locally coalesced via a new `BucketSubPartitionExec`
(pure stream-chaining, no network) — `(hash(k) % 16) % 8 == hash(k) % 8`.
3. **Small-side broadcast** — opt-in via
`ballista.optimizer.broadcast_threshold_bytes` (default 0). When one
side's `total_byte_size` is below the threshold, the join is promoted
to `CollectLeft`, swapping inputs if the small side is on the right.
Both features are opt-in: tables only get colocation behavior if the user
wraps them with the new `PartitionedTableProvider`, and broadcast only
fires when the threshold is set above 0. Existing snapshot tests are
unchanged.
## Architecture
```
DataFusion physical planner
→ DataFusion optimizer rules (incl. EnforceDistribution, JoinSelection)
→ ColocatedJoinRule ← new: strip redundant RepartitionExec
(handles divisor sub-partitioning)
→ BroadcastSmallSideRule ← new: replace partitioned join with
CollectLeft
→ DistributedExchangeRule ← existing: maps remaining repartitions to
ExchangeExec
→ DefaultDistributedPlanner ← existing: cuts stages at ExchangeExec
```
No new IR. The metadata path is a small trait
(`BallistaPartitionMetadata`) that any `TableProvider` can implement (or
get for free via the `PartitionedTableProvider` wrapper); the optimizer
rules are pure tree rewrites over `Arc<dyn ExecutionPlan>`. Restricted to
`JoinType::Inner` for the broadcast rule pending issue
[#1055](https://github.com/apache/datafusion-ballista/issues/1055).
## Commits (stacked, reviewable independently)
| Commit | Phase | Notes |
|---|---|---|
| `dcb3b56` | Metadata foundation | `BallistaPartitionMetadata` trait,
`HashDistribution`, `PartitionedTableProvider`, `HashDistributedScanExec`
adapter |
| `afd248c` | `ColocatedJoinRule` | Elides redundant repartitions for
matching bucket counts |
| `265ac76` | Sub-partitioning | `BucketSubPartitionExec` + divisor-case
logic in the rule |
| `7f9426b` | `BroadcastSmallSideRule` | Opt-in via
`broadcast_threshold_bytes` config |
| `bacbc7e` | E2E verification | Three plan-snapshot tests through
`AdaptivePlanner` |
`default_optimizers` now takes the `SessionConfig` so the broadcast
threshold flows through. `ColocatedJoinRule` runs before
`BroadcastSmallSideRule` so colocation wins when both could apply.
## Test plan
- [x] `cargo test -p ballista-core partitioning::` — 86 passed
(trait, wrapper, scan adapter, `BucketSubPartitionExec` correctness +
rejection of non-divisor)
- [x] `cargo test -p ballista-scheduler` — 84 passed (5 colocated, 4
divisor, 5 broadcast, 3 E2E + all pre-existing)
- [x] `cargo check -p ballista-scheduler --tests` — clean
- [x] E2E plan snapshots cover three cases:
- matching bucket counts → no `ExchangeExec`
- 8/4 divisor → `BucketSubPartitionExec(out=4, factor=2)`, no shuffle
- plain `MemTable` → `ExchangeExec` retained, optimizer is silent
- [ ] TPC-H Q5 against bucketed customer/orders/lineitem — out of scope
for this PR (env-dependent); to be done as a follow-up benchmark
validation
## Risks
- **Bucket-to-file alignment** is by filename convention in
`PartitionedTableProvider`; documented but no manifest format yet — a
follow-up could add one.
- **Wrong declaration**: trust + document. If a user declares `BUCKETS=N`
but data isn't actually bucketed, results may be wrong. We chose trust
over verify to keep the metadata layer lightweight.
- **AQE replan loop** re-runs the optimizer after each stage; all three
new rules are stateless tree rewrites, so they're idempotent.
🤖 Generated with [Claude Code](https://claude.com/claude-code)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]