wirybeaver opened a new pull request, #1676:
URL: https://github.com/apache/datafusion-ballista/pull/1676

   ## Summary
   
   Brings three Pinot V2 Physical Optimizer ideas to Ballista so queries
   against pre-bucketed tables can avoid the network shuffle on every join:
   
   1. **Colocated joins** — when both join inputs declare the same hash key,
      hash function, and bucket count, the join's required hash distribution
      is satisfied directly and the `RepartitionExec` above each input is
      stripped before `DistributedExchangeRule` would promote it to an
      `ExchangeExec` stage boundary.
   2. **Sub-partitioning** for divisor bucket counts (e.g. 16 vs 8). The
      larger side is locally coalesced via a new `BucketSubPartitionExec`
      (pure stream-chaining, no network) — `(hash(k) % 16) % 8 == hash(k) % 8`.
   3. **Small-side broadcast** — opt-in via
      `ballista.optimizer.broadcast_threshold_bytes` (default 0). When one
      side's `total_byte_size` is below the threshold, the join is promoted
      to `CollectLeft`, swapping inputs if the small side is on the right.
   
   Both features are opt-in: tables only get colocation behavior if the user
   wraps them with the new `PartitionedTableProvider`, and broadcast only
   fires when the threshold is set above 0. Existing snapshot tests are
   unchanged.
   
   ## Architecture
   
   ```
   DataFusion physical planner
     → DataFusion optimizer rules (incl. EnforceDistribution, JoinSelection)
     → ColocatedJoinRule           ← new: strip redundant RepartitionExec
                                          (handles divisor sub-partitioning)
     → BroadcastSmallSideRule      ← new: replace partitioned join with 
CollectLeft
     → DistributedExchangeRule     ← existing: maps remaining repartitions to 
ExchangeExec
     → DefaultDistributedPlanner   ← existing: cuts stages at ExchangeExec
   ```
   
   No new IR. The metadata path is a small trait
   (`BallistaPartitionMetadata`) that any `TableProvider` can implement (or
   get for free via the `PartitionedTableProvider` wrapper); the optimizer
   rules are pure tree rewrites over `Arc<dyn ExecutionPlan>`. Restricted to
   `JoinType::Inner` for the broadcast rule pending issue
   [#1055](https://github.com/apache/datafusion-ballista/issues/1055).
   
   ## Commits (stacked, reviewable independently)
   
   | Commit | Phase | Notes |
   |---|---|---|
   | `dcb3b56` | Metadata foundation | `BallistaPartitionMetadata` trait, 
`HashDistribution`, `PartitionedTableProvider`, `HashDistributedScanExec` 
adapter |
   | `afd248c` | `ColocatedJoinRule` | Elides redundant repartitions for 
matching bucket counts |
   | `265ac76` | Sub-partitioning | `BucketSubPartitionExec` + divisor-case 
logic in the rule |
   | `7f9426b` | `BroadcastSmallSideRule` | Opt-in via 
`broadcast_threshold_bytes` config |
   | `bacbc7e` | E2E verification | Three plan-snapshot tests through 
`AdaptivePlanner` |
   
   `default_optimizers` now takes the `SessionConfig` so the broadcast
   threshold flows through. `ColocatedJoinRule` runs before
   `BroadcastSmallSideRule` so colocation wins when both could apply.
   
   ## Test plan
   
   - [x] `cargo test -p ballista-core partitioning::` — 86 passed
     (trait, wrapper, scan adapter, `BucketSubPartitionExec` correctness +
     rejection of non-divisor)
   - [x] `cargo test -p ballista-scheduler` — 84 passed (5 colocated, 4
     divisor, 5 broadcast, 3 E2E + all pre-existing)
   - [x] `cargo check -p ballista-scheduler --tests` — clean
   - [x] E2E plan snapshots cover three cases:
     - matching bucket counts → no `ExchangeExec`
     - 8/4 divisor → `BucketSubPartitionExec(out=4, factor=2)`, no shuffle
     - plain `MemTable` → `ExchangeExec` retained, optimizer is silent
   - [ ] TPC-H Q5 against bucketed customer/orders/lineitem — out of scope
     for this PR (env-dependent); to be done as a follow-up benchmark
     validation
   
   ## Risks
   
   - **Bucket-to-file alignment** is by filename convention in
     `PartitionedTableProvider`; documented but no manifest format yet — a
     follow-up could add one.
   - **Wrong declaration**: trust + document. If a user declares `BUCKETS=N`
     but data isn't actually bucketed, results may be wrong. We chose trust
     over verify to keep the metadata layer lightweight.
   - **AQE replan loop** re-runs the optimizer after each stage; all three
     new rules are stateless tree rewrites, so they're idempotent.
   
   🤖 Generated with [Claude Code](https://claude.com/claude-code)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to