andygrove opened a new pull request, #1647:
URL: https://github.com/apache/datafusion-ballista/pull/1647
# Which issue does this PR close?
<!-- Link the issue if one exists. -->
Closes #.
# Rationale for this change
Today, when DataFusion plans a hash join with a small side, Ballista's
distributed planner relies on `CoalescePartitionsExec` to satisfy
`CollectLeft`'s single-partition requirement. That collapses build-side
parallelism onto a single executor and creates a hot spot for both producing
and serving the build shuffle file. To work around this, Ballista's default
session sets DataFusion's `hash_join_single_partition_threshold` to 0, which
avoids the hot spot but also gives up small-side optimization entirely.
This PR adds a true broadcast-style hash join. For small-side joins, the
build subtree keeps its natural M-partition layout and writes M shuffle files;
each downstream join task fetches all M files in parallel and builds its hash
table locally. Build data flows executor-to-executor; the scheduler is not on
the data path.
# What changes are included in this PR?
- New session config `ballista.optimizer.broadcast_join_threshold_bytes`
(default 10 MB, `0` disables).
- `UnresolvedShuffleExec` and `ShuffleReaderExec` get a `broadcast` flag and
an explicit `upstream_partition_count`. Proto/codec round-trip the new fields.
- `ShuffleReaderExec::execute` and `partition_statistics` aggregate across
all upstream locations when broadcast.
- `DefaultDistributedPlanner` adds:
- `maybe_promote_to_broadcast` that promotes eligible
`HashJoinExec(Partitioned)` to `CollectLeft` when one side is under the
threshold.
- A `HashJoinExec(CollectLeft)` lowering branch that strips
`CoalescePartitionsExec` over the build, writes M-partition build shuffle, and
inserts a broadcast `UnresolvedShuffleExec`.
- A broadcast branch in `remove_unresolved_shuffles` that flattens
upstream locations.
- A broadcast branch in `rollback_resolved_shuffles` that preserves
`broadcast` and `upstream_partition_count` on retry.
- Unit tests in the scheduler planner (broadcast on / off, M>1 build).
- Codec round-trip tests for the broadcast variants.
- End-to-end correctness check in `examples/`.
AQE is deferred: the `AdaptivePlanner` does not yet apply this lowering. The
`hash_join_single_partition_threshold` zero workaround is left in place.
# Are there any user-facing changes?
Yes. New session config `ballista.optimizer.broadcast_join_threshold_bytes`
(default 10 MB). When the build side of a hash join fits under the threshold,
the executor task topology changes: build side stays distributed and probe
tasks fetch all build files in parallel. No SQL semantics change.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]