wirybeaver opened a new issue, #1677: URL: https://github.com/apache/datafusion-ballista/issues/1677
## Background PR #1676 introduces `ColocatedJoinRule` (skip the shuffle when both join inputs are co-bucketed) and `BroadcastSmallSideRule` (promote `Partitioned` to `CollectLeft` when one side fits under a threshold). Both rules currently only match `HashJoinExec`. PR #1651 made `SortMergeJoinExec` the default join in Ballista (issue #1648), because DataFusion's hash join has no spill support. Users now opt back into hash join via `SET datafusion.optimizer.prefer_hash_join = true`. The combined effect: under default settings, neither of the rules from #1676 fire for the planned `SortMergeJoinExec`, so co-bucketed tables still get a network shuffle and small-side joins still get repartitioned. ## Proposed work 1. **`ColocatedJoinRule`** — extend the matcher in `try_elide_join_repartitions` to handle `SortMergeJoinExec` in addition to `HashJoinExec`. The hash-distribution-satisfaction logic (matching keys, equal or divisor-related bucket counts) is the same; only the rewrite step (stripping the `RepartitionExec` and/or wrapping a side in `BucketSubPartitionExec`) needs the new variant. Note: `SortMergeJoinExec` requires sorted inputs, so the `SortExec` above each input needs to be preserved or re-derived after the rewrite — verify it doesn't get lost when the underlying source already advertises a compatible sort order. 2. **`BroadcastSmallSideRule`** — broadcast doesn't apply to sort-merge in the same way (no `CollectLeft` mode for `SortMergeJoinExec`). Instead, when one side is small enough to fit, consider lowering to `HashJoinExec(CollectLeft)` directly — which means a join-mode swap, not just a partition-mode swap. Needs a small spike to confirm this is sound for sort-merge inputs. 3. **Tests** — add e2e snapshot cases mirroring the three in `colocated_join_e2e.rs` (matching → no exchange; divisor → `BucketSubPartitionExec`; unbucketed control), but with the default `prefer_hash_join = false` so the planned join is `SortMergeJoinExec`. ## Why this matters Without this extension, the optimizations from #1676 are effectively gated behind a non-default config (`prefer_hash_join = true`). Users running Ballista with default settings against pre-bucketed tables won't benefit from colocation or small-side broadcast. ## References - PR #1676 — Pinot-style colocated-join optimizer - PR #1651 / issue #1648 — sort-merge as default join - PR #1647 — broadcast threshold config 🤖 Generated with [Claude Code](https://claude.com/claude-code) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
