avantgardnerio opened a new issue, #20735:
URL: https://github.com/apache/datafusion/issues/20735

   ### Is your feature request related to a problem or challenge?
   
   When executing query plans like:
   
   ```
   01)ProjectionExec
   02)--SortPreservingMergeExec: [depname@0 ASC NULLS LAST, empno@2 ASC NULLS 
LAST], fetch=3
   03)----ProjectionExec
   04)------BoundedWindowAggExec: wdw=[sum(Int64(1)) PARTITION BY 
[employees.depname] ORDER BY [employees.empno ASC NULLS LAST] ...
   05)--------SortExec: TopK(fetch=3), expr=[depname@0 ASC NULLS LAST, empno@1 
ASC NULLS LAST], 
   06)----------RepartitionExec: partitioning=Hash([depname@0], 4), 
input_partitions=1
   07)------------DataSourceExec: file_groups=lots
   ```
   
   We observed:
   1. lots of data was being shuffled (in ballista)
   2. lots of data was being copied in RepartitionExec
   3. a giant sort took place (OOMing), only to limit to a reasonable amount of 
data
   4. the rest of the query ran
   
   ### Describe the solution you'd like
   
   If the partition key is a subset of the sort key, sort before the 
re-partition, ensuring the minimum data is shuffled (copied in datafusion), and 
avoid an OOM/spill on the big sort. This can be done by copying the SortExec 
below the RepartitionExec. 
   
   I think it's possible to even avoid the SortExec _after_ the 
RepartitionExec, if we do a (local to partition) SPM like thing, but that's 
possibly limited to Ballista and certainly not part of this PR, just noting for 
later.
   
   ### Describe alternatives you've considered
   
   The juice might not be worth the squeeze, but `cargo bench` was showing:
   
   ```
   ┌──────────────────────┬─────────┬──────────┬─────────┐                      
                                                                                
                           
     │       Scenario       │ Enabled │ Disabled │ Speedup │                    
                                                                                
                             
     ├──────────────────────┼─────────┼──────────┼─────────┤                    
                                                                                
                             
     │ Fan-in (32 -> 4)     │ 2.03 ms │ 3.93 ms  │ 48%     │
     ├──────────────────────┼─────────┼──────────┼─────────┤                    
                                                                                
                             
     │ No fan-in (32 -> 32) │ 2.91 ms │ 4.37 ms  │ 33%     │                    
                                                                                
                           
     └──────────────────────┴─────────┴──────────┴─────────┘
   ```
   and
   ```
   ┌─────────┬──────────┬──────────┬─────────┐                                  
                                                                                
                           
     │  LIMIT  │ Enabled  │ Disabled │ Speedup │                                
                                                                                
                             
     ├─────────┼──────────┼──────────┼─────────┤                                
                                                                                
                             
     │ 10      │ 2.03 ms  │ 3.93 ms  │ 48%     │                                
                                                                                
                           
     ├─────────┼──────────┼──────────┼─────────┤                                
                                                                                
                             
     │ 1,000   │ 3.43 ms  │ 6.38 ms  │ 46%     │                                
                                                                                
                             
     ├─────────┼──────────┼──────────┼─────────┤                                
                                                                                
                             
     │ 10,000  │ 15.39 ms │ 16.43 ms │ 6%      │                                
                                                                                
                             
     ├─────────┼──────────┼──────────┼─────────┤                                
                                                                                
                             
     │ 100,000 │ 85.87 ms │ 97.50 ms │ 12%     │                                
                                                                                
                             
     └─────────┴──────────┴──────────┴─────────┘
   ```
   When translated into a distributed plan these gains would be much more 
significant, but I thought I'd offer this upstream first, see if there was 
interest, otherwise just keep it on our fork.
   
   ### Additional context
   
   Draft PR incoming.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to