wirybeaver opened a new issue, #1677:
URL: https://github.com/apache/datafusion-ballista/issues/1677

   ## Background
   
   PR #1676 introduces `ColocatedJoinRule` (skip the shuffle when both join 
inputs are co-bucketed) and `BroadcastSmallSideRule` (promote `Partitioned` to 
`CollectLeft` when one side fits under a threshold). Both rules currently only 
match `HashJoinExec`.
   
   PR #1651 made `SortMergeJoinExec` the default join in Ballista (issue 
#1648), because DataFusion's hash join has no spill support. Users now opt back 
into hash join via `SET datafusion.optimizer.prefer_hash_join = true`.
   
   The combined effect: under default settings, neither of the rules from #1676 
fire for the planned `SortMergeJoinExec`, so co-bucketed tables still get a 
network shuffle and small-side joins still get repartitioned.
   
   ## Proposed work
   
   1. **`ColocatedJoinRule`** — extend the matcher in 
`try_elide_join_repartitions` to handle `SortMergeJoinExec` in addition to 
`HashJoinExec`. The hash-distribution-satisfaction logic (matching keys, equal 
or divisor-related bucket counts) is the same; only the rewrite step (stripping 
the `RepartitionExec` and/or wrapping a side in `BucketSubPartitionExec`) needs 
the new variant. Note: `SortMergeJoinExec` requires sorted inputs, so the 
`SortExec` above each input needs to be preserved or re-derived after the 
rewrite — verify it doesn't get lost when the underlying source already 
advertises a compatible sort order.
   2. **`BroadcastSmallSideRule`** — broadcast doesn't apply to sort-merge in 
the same way (no `CollectLeft` mode for `SortMergeJoinExec`). Instead, when one 
side is small enough to fit, consider lowering to `HashJoinExec(CollectLeft)` 
directly — which means a join-mode swap, not just a partition-mode swap. Needs 
a small spike to confirm this is sound for sort-merge inputs.
   3. **Tests** — add e2e snapshot cases mirroring the three in 
`colocated_join_e2e.rs` (matching → no exchange; divisor → 
`BucketSubPartitionExec`; unbucketed control), but with the default 
`prefer_hash_join = false` so the planned join is `SortMergeJoinExec`.
   
   ## Why this matters
   
   Without this extension, the optimizations from #1676 are effectively gated 
behind a non-default config (`prefer_hash_join = true`). Users running Ballista 
with default settings against pre-bucketed tables won't benefit from colocation 
or small-side broadcast.
   
   ## References
   
   - PR #1676 — Pinot-style colocated-join optimizer
   - PR #1651 / issue #1648 — sort-merge as default join
   - PR #1647 — broadcast threshold config
   
   🤖 Generated with [Claude Code](https://claude.com/claude-code)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to