andygrove opened a new pull request, #1691:
URL: https://github.com/apache/datafusion-ballista/pull/1691
# Which issue does this PR close?
Closes #1681
# Rationale for this change
TPC-H Q2 with `prefer_hash_join=true` fails on latest `main` with:
```
Invalid HashJoinExec, the output partition count of the left child must be 1
in CollectLeft mode, consider using CoalescePartitionsExec or the
EnforceDistribution rule
```
The Ballista-customized `JoinSelection` rule in
`ballista/scheduler/src/physical_optimizer/join_selection.rs` calls
`try_collect_left(..., ignore_threshold=true, 0, 0)` on already-resolved
`HashJoinExec(CollectLeft)` joins. When `should_swap_join_order(left, right)`
returns `true` (left has bigger byte-size than right), the `(true, true)` arm
calls `hash_join.swap_inputs(PartitionMode::CollectLeft)`. `swap_inputs`
keeps `mode = CollectLeft` and swaps the children, so if the old right was a
multi-partition shuffle reader (e.g. `Hash([…], 14)`), the new left ends up
with 14 partitions while still in `CollectLeft` mode. Ballista has no
post-`JoinSelection` `EnforceDistribution` pass to reshape this, so the
invalid plan is serialized to the executor and the assertion fires at
`HashJoinExec::execute`.
The bug is reachable today because PR #1647's broadcast-style hash join
lowering produces `HashJoinExec(CollectLeft)` joins where the build side
is a broadcast `ShuffleReaderExec` (1 logical partition). On TPC-H Q2 SF1,
stage 11's broadcast reader (full supplier row, 7 columns × 10K rows ≈
1 MB) happens to be heavier in bytes than its hash probe reader
(post-join 3K rows × 4 columns ≈ 230 KB), so the swap triggers and the
plan becomes invalid. Stage 22 has the same join shape but a smaller
broadcast side, so it doesn't swap and runs fine. The existing comment in
`execution_stage.rs:386–392` already flags this category of risk:
```rust
// ballista specific JoinSelection, as datafusion rule can't be used here.
// Datafusion JoinSelection may produce plans which need change of partitions
// in order to be valid.
```
# What changes are included in this PR?
- `try_collect_left`: guard the `(true, true)` and `(false, true)` swap
arms with `right.output_partitioning().partition_count() == 1`. A swap
into `CollectLeft` is only safe when the would-be new left will have a
single partition. The non-swap rebuild paths are untouched.
- Regression test
`collect_left_swap_preserves_one_partition_build`: builds a
`HashJoinExec(CollectLeft)` with a 1-partition, heavier left and a
2-partition, lighter right, runs `JoinSelection::optimize`, and asserts
the resulting join still has a 1-partition build side. The test fails
on `main` and passes with this change.
Verified end-to-end: TPC-H Q2 SF1 with `prefer_hash_join=true` and
`target_partitions=14` now returns 100 rows instead of failing at stage 11.
Other queries in a sanity sweep (Q1, Q3, Q5, Q7, Q9, Q18, Q21 under
sort-merge default) continue to pass. There is a separate, pre-existing
correctness issue where Q21 with `prefer_hash_join=true` returns 0 rows; it
is independent of this assertion bug and out of scope here.
# Are there any user-facing changes?
No public API changes. Plans that previously crashed at the executor with
the `CollectLeft` assertion will now keep the original (1-partition build)
shape instead of being swapped into an invalid form.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]