andygrove opened a new pull request, #1647:
URL: https://github.com/apache/datafusion-ballista/pull/1647

   # Which issue does this PR close?
   
   <!-- Link the issue if one exists. -->
   
   Closes #.
   
   # Rationale for this change
   
   Today, when DataFusion plans a hash join with a small side, Ballista's 
distributed planner relies on `CoalescePartitionsExec` to satisfy 
`CollectLeft`'s single-partition requirement. That collapses build-side 
parallelism onto a single executor and creates a hot spot for both producing 
and serving the build shuffle file. To work around this, Ballista's default 
session sets DataFusion's `hash_join_single_partition_threshold` to 0, which 
avoids the hot spot but also gives up small-side optimization entirely.
   
   This PR adds a true broadcast-style hash join. For small-side joins, the 
build subtree keeps its natural M-partition layout and writes M shuffle files; 
each downstream join task fetches all M files in parallel and builds its hash 
table locally. Build data flows executor-to-executor; the scheduler is not on 
the data path.
   
   # What changes are included in this PR?
   
   - New session config `ballista.optimizer.broadcast_join_threshold_bytes` 
(default 10 MB, `0` disables).
   - `UnresolvedShuffleExec` and `ShuffleReaderExec` get a `broadcast` flag and 
an explicit `upstream_partition_count`. Proto/codec round-trip the new fields.
   - `ShuffleReaderExec::execute` and `partition_statistics` aggregate across 
all upstream locations when broadcast.
   - `DefaultDistributedPlanner` adds:
     - `maybe_promote_to_broadcast` that promotes eligible 
`HashJoinExec(Partitioned)` to `CollectLeft` when one side is under the 
threshold.
     - A `HashJoinExec(CollectLeft)` lowering branch that strips 
`CoalescePartitionsExec` over the build, writes M-partition build shuffle, and 
inserts a broadcast `UnresolvedShuffleExec`.
     - A broadcast branch in `remove_unresolved_shuffles` that flattens 
upstream locations.
     - A broadcast branch in `rollback_resolved_shuffles` that preserves 
`broadcast` and `upstream_partition_count` on retry.
   - Unit tests in the scheduler planner (broadcast on / off, M>1 build).
   - Codec round-trip tests for the broadcast variants.
   - End-to-end correctness check in `examples/`.
   
   AQE is deferred: the `AdaptivePlanner` does not yet apply this lowering. The 
`hash_join_single_partition_threshold` zero workaround is left in place.
   
   # Are there any user-facing changes?
   
   Yes. New session config `ballista.optimizer.broadcast_join_threshold_bytes` 
(default 10 MB). When the build side of a hash join fits under the threshold, 
the executor task topology changes: build side stays distributed and probe 
tasks fetch all build files in parallel. No SQL semantics change.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to