andygrove opened a new pull request, #1691:
URL: https://github.com/apache/datafusion-ballista/pull/1691

   # Which issue does this PR close?
   
   Closes #1681
   
   # Rationale for this change
   
   TPC-H Q2 with `prefer_hash_join=true` fails on latest `main` with:
   
   ```
   Invalid HashJoinExec, the output partition count of the left child must be 1
   in CollectLeft mode, consider using CoalescePartitionsExec or the
   EnforceDistribution rule
   ```
   
   The Ballista-customized `JoinSelection` rule in
   `ballista/scheduler/src/physical_optimizer/join_selection.rs` calls
   `try_collect_left(..., ignore_threshold=true, 0, 0)` on already-resolved
   `HashJoinExec(CollectLeft)` joins. When `should_swap_join_order(left, right)`
   returns `true` (left has bigger byte-size than right), the `(true, true)` arm
   calls `hash_join.swap_inputs(PartitionMode::CollectLeft)`. `swap_inputs`
   keeps `mode = CollectLeft` and swaps the children, so if the old right was a
   multi-partition shuffle reader (e.g. `Hash([…], 14)`), the new left ends up
   with 14 partitions while still in `CollectLeft` mode. Ballista has no
   post-`JoinSelection` `EnforceDistribution` pass to reshape this, so the
   invalid plan is serialized to the executor and the assertion fires at
   `HashJoinExec::execute`.
   
   The bug is reachable today because PR #1647's broadcast-style hash join
   lowering produces `HashJoinExec(CollectLeft)` joins where the build side
   is a broadcast `ShuffleReaderExec` (1 logical partition). On TPC-H Q2 SF1,
   stage 11's broadcast reader (full supplier row, 7 columns × 10K rows ≈
   1 MB) happens to be heavier in bytes than its hash probe reader
   (post-join 3K rows × 4 columns ≈ 230 KB), so the swap triggers and the
   plan becomes invalid. Stage 22 has the same join shape but a smaller
   broadcast side, so it doesn't swap and runs fine. The existing comment in
   `execution_stage.rs:386–392` already flags this category of risk:
   
   ```rust
   // ballista specific JoinSelection, as datafusion rule can't be used here.
   // Datafusion JoinSelection may produce plans which need change of partitions
   // in order to be valid.
   ```
   
   # What changes are included in this PR?
   
   - `try_collect_left`: guard the `(true, true)` and `(false, true)` swap
     arms with `right.output_partitioning().partition_count() == 1`. A swap
     into `CollectLeft` is only safe when the would-be new left will have a
     single partition. The non-swap rebuild paths are untouched.
   - Regression test
     `collect_left_swap_preserves_one_partition_build`: builds a
     `HashJoinExec(CollectLeft)` with a 1-partition, heavier left and a
     2-partition, lighter right, runs `JoinSelection::optimize`, and asserts
     the resulting join still has a 1-partition build side. The test fails
     on `main` and passes with this change.
   
   Verified end-to-end: TPC-H Q2 SF1 with `prefer_hash_join=true` and
   `target_partitions=14` now returns 100 rows instead of failing at stage 11.
   Other queries in a sanity sweep (Q1, Q3, Q5, Q7, Q9, Q18, Q21 under
   sort-merge default) continue to pass. There is a separate, pre-existing
   correctness issue where Q21 with `prefer_hash_join=true` returns 0 rows; it
   is independent of this assertion bug and out of scope here.
   
   # Are there any user-facing changes?
   
   No public API changes. Plans that previously crashed at the executor with
   the `CollectLeft` assertion will now keep the original (1-partition build)
   shape instead of being swapped into an invalid form.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to