alamb opened a new issue, #7181:
URL: https://github.com/apache/arrow-datafusion/issues/7181

   ### Is your feature request related to a problem or challenge?
   
   While working on https://github.com/apache/arrow-datafusion/pull/7179 I 
noticed a potential improvement
   
   The key observation is that merging `K` sorted streams of total rows `N`:
   1.  takes time proportional to `O(N*K)`
   2. is a single threaded operation
   
   `K` is often called the "Fan In" of the merge
   
   The implementation of 
[ExternalSorter::in_mem_sort_stream](https://github.com/apache/arrow-datafusion/blob/1e9816b07edd8e73bb45ababa20a7c22de492d96/datafusion/core/src/physical_plan/sorts/sort.rs#L259-L297)
 will  effectively merge all the    
   buffered batches at once, as shown below. This can be a very large fan in -- 
100s or 1000s of RecordBatches
   
   ```text
      ┌─────┐                ┌─────┐                                            
           
      │  2  │                │  1  │                                            
           
      │  3  │                │  2  │                                            
           
      │  1  │─ ─▶  sort  ─ ─▶│  2  │─ ─ ─ ─ ─ ─ ┐                               
           
      │  4  │                │  3  │                                            
           
      │  2  │                │  4  │            │                               
           
      └─────┘                └─────┘                                            
           
      ┌─────┐                ┌─────┐            │                               
           
      │  1  │                │  1  │                                            
           
      │  4  │─ ▶  sort  ─ ─ ▶│  1  ├ ─ ┐        │                               
           
      │  1  │                │  4  │                                            
           
      └─────┘                └─────┘   │        │                               
           
        ...                   ...               ▼                               
           
                                       │                                        
           
         Could be 100s depending on     ─ ─▶ merge  ─ ─ ─ ─ ─▶  sorted output   
           
         the data being sorted                                     stream       
           
                                                ▲                               
           
        ...                   ...                                               
           
      ┌─────┐                ┌─────┐            │                               
           
      │  3  │                │  3  │                                            
           
      │  1  │─ ▶  sort  ─ ─ ▶│  1  │─ ─ ─ ─ ─ ─ ┤                               
           
      └─────┘                └─────┘                                            
           
      ┌─────┐                ┌─────┐            │                               
           
      │  4  │                │  3  │                                            
           
      │  3  │─ ▶  sort  ─ ─ ▶│  4  │─ ─ ─ ─ ─ ─ ┘                               
           
      └─────┘                └─────┘                                            
           
                                                                                
           
   in_mem_batches                                                               
           
                                                                                
           
   ```
   
   ### Describe the solution you'd like
   
   A classical approach to such sorts is to use a "cascaded merge" which uses a 
series of merge operations each with a limited the fanout (e.g. to 10)          
                                       
                                                                                
               
    ```                                                                         
                                                                                
                        
      ┌─────┐                ┌─────┐                                            
               
      │  2  │                │  1  │                                            
               
      │  3  │                │  2  │                                            
               
      │  1  │─ ─▶  sort  ─ ─▶│  2  │─ ─ ─ ─ ─ ─ ─ ─ ┐                           
               
      │  4  │                │  3  │                                            
               
      │  2  │                │  4  │                │                           
               
      └─────┘                └─────┘                                            
               
      ┌─────┐                ┌─────┐                ▼                           
               
      │  1  │                │  1  │                                            
               
      │  4  │─ ▶  sort  ─ ─ ▶│  1  ├ ─ ─ ─ ─ ─ ▶ merge  ─ ─ ─ ─                 
               
      │  1  │                │  4  │                           │                
               
      └─────┘                └─────┘                                            
               
        ...                   ...                ...           ▼                
               
                                                                                
               
                                                            merge  ─ ─ ─ ─ ─ ─ 
▶ sorted output 
                                                                                
    stream     
                                                               ▲                
               
        ...                   ...                ...           │                
               
      ┌─────┐                ┌─────┐                                            
               
      │  3  │                │  3  │                           │                
               
      │  1  │─ ▶  sort  ─ ─ ▶│  1  │─ ─ ─ ─ ─ ─▶ merge  ─ ─ ─ ─                 
               
      └─────┘                └─────┘                                            
               
      ┌─────┐                ┌─────┐                ▲                           
               
      │  4  │                │  3  │                                            
               
      │  3  │─ ▶  sort  ─ ─ ▶│  4  │─ ─ ─ ─ ─ ─ ─ ─ ┘                           
               
      └─────┘                └─────┘                                            
               
                                                                                
               
   in_mem_batches                      do a series of merges that               
               
                                       each has a limited fan-in                
               
                                       (number of inputs)                       
               
   ```
   
   This is often better because:
   1. Is `O(N*ln(N)*ln(K))` , there is some additional overhead of `ln(N)` as 
the same row must now be compared several times
   2. the intermediate merges can be run in parallel on multiple cores (though 
the final one is still single threaded)
   
   
   It would be awesome if someone wanted to:
   
   1. Verify the theory that there is a large fan in for large sorts
   3. Implement a cascaded merge and measure if it improves performance
   
   The sort 
[benchmark](https://github.com/apache/arrow-datafusion/tree/main/benchmarks) 
(TODO) (thanks @jaylmiller!)   may be interesting:
   
   ```
   cargo run --release --bin parquet -- sort  --path ./data --scale-factor 1.0
   ```
   
   
   ### Describe alternatives you've considered
   
   Another potential variation might be to get more cores involves in the 
merging by parallelizing the merge, as described in the [Morsel-Driven 
Parallelism paper](https://db.in.tum.de/~leis/papers/morsels.pdf)
   
   <img width="763" alt="Screenshot 2023-08-02 at 8 56 46 AM" 
src="https://github.com/apache/arrow-datafusion/assets/490673/bbc3d166-146c-4248-83c7-31d2d70bbbe6";>
   
   
   ### Additional context
   
   _No response_


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to