avantgardnerio opened a new issue, #20735:
URL: https://github.com/apache/datafusion/issues/20735
### Is your feature request related to a problem or challenge?
When executing query plans like:
```
01)ProjectionExec
02)--SortPreservingMergeExec: [depname@0 ASC NULLS LAST, empno@2 ASC NULLS
LAST], fetch=3
03)----ProjectionExec
04)------BoundedWindowAggExec: wdw=[sum(Int64(1)) PARTITION BY
[employees.depname] ORDER BY [employees.empno ASC NULLS LAST] ...
05)--------SortExec: TopK(fetch=3), expr=[depname@0 ASC NULLS LAST, empno@1
ASC NULLS LAST],
06)----------RepartitionExec: partitioning=Hash([depname@0], 4),
input_partitions=1
07)------------DataSourceExec: file_groups=lots
```
We observed:
1. lots of data was being shuffled (in ballista)
2. lots of data was being copied in RepartitionExec
3. a giant sort took place (OOMing), only to limit to a reasonable amount of
data
4. the rest of the query ran
### Describe the solution you'd like
If the partition key is a subset of the sort key, sort before the
re-partition, ensuring the minimum data is shuffled (copied in datafusion), and
avoid an OOM/spill on the big sort. This can be done by copying the SortExec
below the RepartitionExec.
I think it's possible to even avoid the SortExec _after_ the
RepartitionExec, if we do a (local to partition) SPM like thing, but that's
possibly limited to Ballista and certainly not part of this PR, just noting for
later.
### Describe alternatives you've considered
The juice might not be worth the squeeze, but `cargo bench` was showing:
```
┌──────────────────────┬─────────┬──────────┬─────────┐
│ Scenario │ Enabled │ Disabled │ Speedup │
├──────────────────────┼─────────┼──────────┼─────────┤
│ Fan-in (32 -> 4) │ 2.03 ms │ 3.93 ms │ 48% │
├──────────────────────┼─────────┼──────────┼─────────┤
│ No fan-in (32 -> 32) │ 2.91 ms │ 4.37 ms │ 33% │
└──────────────────────┴─────────┴──────────┴─────────┘
```
and
```
┌─────────┬──────────┬──────────┬─────────┐
│ LIMIT │ Enabled │ Disabled │ Speedup │
├─────────┼──────────┼──────────┼─────────┤
│ 10 │ 2.03 ms │ 3.93 ms │ 48% │
├─────────┼──────────┼──────────┼─────────┤
│ 1,000 │ 3.43 ms │ 6.38 ms │ 46% │
├─────────┼──────────┼──────────┼─────────┤
│ 10,000 │ 15.39 ms │ 16.43 ms │ 6% │
├─────────┼──────────┼──────────┼─────────┤
│ 100,000 │ 85.87 ms │ 97.50 ms │ 12% │
└─────────┴──────────┴──────────┴─────────┘
```
When translated into a distributed plan these gains would be much more
significant, but I thought I'd offer this upstream first, see if there was
interest, otherwise just keep it on our fork.
### Additional context
Draft PR incoming.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]