Samyak2 opened a new issue, #20847:
URL: https://github.com/apache/datafusion/issues/20847
### Is your feature request related to a problem or challenge?
The problem shows up even in our `tpcds_sf1` benchmarks. Here's a simple
demonstration.
TPC-DS Query 99 ran with this command:
```bash
cargo run --profile release-nonlto --bin dfbench tpcds --query 99
--iterations 3 --path benchmarks/data/tpcds_sf1 --query_path
datafusion/core/tests/tpc-ds --prefer_hash_join true
```
Shows these numbers:
```
Query 99 iteration 0 took 5514.4 ms and returned 90 rows
Query 99 iteration 1 took 5303.4 ms and returned 90 rows
Query 99 iteration 2 took 5388.4 ms and returned 90 rows
```
Now, if I force *all* joins to be partitioned joins, by setting
`DATAFUSION_OPTIMIZER_REPARTITION_JOINS=true
DATAFUSION_OPTIMIZER_HASH_JOIN_SINGLE_PARTITION_THRESHOLD=0
DATAFUSION_OPTIMIZER_HASH_JOIN_SINGLE_PARTITION_THRESHOLD_ROWS=0` on the same
command gives me:
```
Query 99 iteration 0 took 204.3 ms and returned 90 rows
Query 99 iteration 1 took 106.5 ms and returned 90 rows
Query 99 iteration 2 took 103.8 ms and returned 90 rows
```
Nearly **52x** speedup!
This query has 4 joins. The default optimizer makes all of them
`CollectLeft` joins. The env vars I mentioned make all of them `Partitioned`
hash joins. This is less than ideal, but it shows that the optimizer doesn't
always get the join type right.
### Overall numbers
I found these queries benefiting from always partitioning in TPC-DS:
```
1. tpcds q99 join_under_partitioned 27.971x (5785.86ms -> 206.85ms)
2. tpcds q62 join_under_partitioned 4.204x (626.49ms -> 149.01ms)
3. tpcds q6 join_under_partitioned 3.809x (840.52ms -> 220.69ms)
4. tpcds q68 join_under_partitioned 3.394x (341.73ms -> 100.68ms)
5. tpcds q27 join_under_partitioned 3.020x (399.90ms -> 132.41ms)
6. tpcds q36 join_under_partitioned 3.005x (303.32ms -> 100.94ms)
7. tpcds q46 join_under_partitioned 2.878x (299.31ms -> 104.00ms)
8. tpcds q37 join_under_partitioned 2.675x (253.45ms -> 94.74ms)
9. tpcds q61 join_under_partitioned 2.629x (273.10ms -> 103.88ms)
10. tpcds q4 agg_over_partitioned 1.955x (1197.97ms -> 612.73ms)
```
And these modest improvements in TPC-H:
```
- tpch q7 join_over_partitioned 1.204x (169.83ms -> 141.06ms)
- tpch q5 join_over_partitioned 1.165x (158.94ms -> 136.48ms)
- tpch q10 join_over_partitioned 1.106x (182.66ms -> 165.09ms)
- tpch q3 agg_over_partitioned 1.102x (140.40ms -> 127.46ms)
```
Note: these numbers are from a script that Codex CLI wrote. I have only
manually confirmed the numbers for TPC-DS 99. The rest are unconfirmed.
Optimizer can and will get this wrong: even if we had a sophisticated
cost-based join selection algorithm, it can still get these things wrong in
some cases. We should instead make the execution robust to optimizer's plans.
### Describe the solution you'd like
I propose that both `HashJoinExec` and `AggregateExec` should *dynamically*
switch from `CollectLeft`/`Final` to `Partitioned`/`FinalPartitioned` mode
based on certain metrics.
### Hash join
- `HashJoinExec` would always ask for an input distribution of
`[SinglePartition, UnspecifiedDistribution]` (same as `CollectLeft` currently).
At planning time, we do not insert any repartitions.
- Once we have more than 1M rows (configurable) collected on the build-side
([relevant
code](https://github.com/apache/datafusion/blob/23b88fbed70309926851f6ceea1a490527850e10/datafusion/physical-plan/src/joins/hash_join/exec.rs#L1921-L1940)),
we switch the mode.
- Internally, the `HashJoinExec` would need to repartition (hash-based) both
the build and probe side inputs and set the mode to `Partitioned` for the
stream.
- The repartitioning can be done in one of two ways:
- Literally insert `RepartitionExec`s internal to HashJoinExec (so
they won't show up in the tree, but the streams from below are wrapped)
- Abstract out the [relevant
parts](https://github.com/apache/datafusion/blob/23b88fbed70309926851f6ceea1a490527850e10/datafusion/physical-plan/src/repartition/mod.rs#L568-L640)
in `RepartitionExec` and use them here.
- One optimization we could do is to start re-partitioning as soon as the
threshold is breached. Instead of waiting for all build-side rows to be
accumulated.
- The output partitioning of this tree would always be
`UnknownPartitioning(N)`. Even when it switches to partitioned mode, the parent
operators cannot make use of this information at planning time. So they may try
to repartition the data again. This is one trade-off compared to deciding the
partitioning at planning time.
### Aggregation
- `AggregateExec` would always ask for an input distribution of
`[SinglePartition]` (same as `Final` mode currently). At planning time, we do
not insert any repartitions.
- This dynamic switching would only be done for agg with group-bys.
- Once we have accumulated more than 200k groups (configurable), we switch.
- Switching is more complex here since we need to do it mid-stream, there's
no neat point like the build side finishing in join.
- Switching would be done in two steps:
- Extract out the partial results from the `GroupedHashAggregateStream`.
Similar behavior as calling the `emit` fn with `spilling = true`
[here](https://github.com/apache/datafusion/blob/23b88fbed70309926851f6ceea1a490527850e10/datafusion/physical-plan/src/aggregates/row_hash.rs#L1098).
We need to ensure that we get the intermediate state, and not the final
results.
- For all subsequent inputs and for the partial state we got from above,
we need to repartition it. Same as in join, this can be done either by
instantiating a `RepartitionExec` internally in the stream, or by abstracting
out the hash repartitioning code and re-using it.
- Note that we also need to repartition the intermediate state from
above, before re-accumulating it.
- We also need to instantiate `target_partitions` number of
`GroupedHashAggregateStream`s here.
- Outputs from all streams would need to be coalesced into one
output stream. This is because we currently cannot change the output
partitioning of a tree mid-execution.
- The output partitioning of this operator would always be single
partitioned. Even when it switches to partitioned mode, the parent operators
cannot make use of this information at planning time. So they may try to
repartition the data again. This is one trade-off compared to deciding the
partitioning at planning time.
### Describe alternatives you've considered
- Make the optimizer better instead of handling this at execution time. This
will not always work, but it can be made better than it currently is.
### Additional context
Some large changes in agg
(https://github.com/apache/datafusion/issues/20773) and join
(https://github.com/apache/datafusion/issues/19789) may affect this.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]