alamb opened a new issue, #7181: URL: https://github.com/apache/arrow-datafusion/issues/7181
### Is your feature request related to a problem or challenge? While working on https://github.com/apache/arrow-datafusion/pull/7179 I noticed a potential improvement The key observation is that merging `K` sorted streams of total rows `N`: 1. takes time proportional to `O(N*K)` 2. is a single threaded operation `K` is often called the "Fan In" of the merge The implementation of [ExternalSorter::in_mem_sort_stream](https://github.com/apache/arrow-datafusion/blob/1e9816b07edd8e73bb45ababa20a7c22de492d96/datafusion/core/src/physical_plan/sorts/sort.rs#L259-L297) will effectively merge all the buffered batches at once, as shown below. This can be a very large fan in -- 100s or 1000s of RecordBatches ```text ┌─────┐ ┌─────┐ │ 2 │ │ 1 │ │ 3 │ │ 2 │ │ 1 │─ ─▶ sort ─ ─▶│ 2 │─ ─ ─ ─ ─ ─ ┐ │ 4 │ │ 3 │ │ 2 │ │ 4 │ │ └─────┘ └─────┘ ┌─────┐ ┌─────┐ │ │ 1 │ │ 1 │ │ 4 │─ ▶ sort ─ ─ ▶│ 1 ├ ─ ┐ │ │ 1 │ │ 4 │ └─────┘ └─────┘ │ │ ... ... ▼ │ Could be 100s depending on ─ ─▶ merge ─ ─ ─ ─ ─▶ sorted output the data being sorted stream ▲ ... ... ┌─────┐ ┌─────┐ │ │ 3 │ │ 3 │ │ 1 │─ ▶ sort ─ ─ ▶│ 1 │─ ─ ─ ─ ─ ─ ┤ └─────┘ └─────┘ ┌─────┐ ┌─────┐ │ │ 4 │ │ 3 │ │ 3 │─ ▶ sort ─ ─ ▶│ 4 │─ ─ ─ ─ ─ ─ ┘ └─────┘ └─────┘ in_mem_batches ``` ### Describe the solution you'd like A classical approach to such sorts is to use a "cascaded merge" which uses a series of merge operations each with a limited the fanout (e.g. to 10) ``` ┌─────┐ ┌─────┐ │ 2 │ │ 1 │ │ 3 │ │ 2 │ │ 1 │─ ─▶ sort ─ ─▶│ 2 │─ ─ ─ ─ ─ ─ ─ ─ ┐ │ 4 │ │ 3 │ │ 2 │ │ 4 │ │ └─────┘ └─────┘ ┌─────┐ ┌─────┐ ▼ │ 1 │ │ 1 │ │ 4 │─ ▶ sort ─ ─ ▶│ 1 ├ ─ ─ ─ ─ ─ ▶ merge ─ ─ ─ ─ │ 1 │ │ 4 │ │ └─────┘ └─────┘ ... ... ... ▼ merge ─ ─ ─ ─ ─ ─ ▶ sorted output stream ▲ ... ... ... │ ┌─────┐ ┌─────┐ │ 3 │ │ 3 │ │ │ 1 │─ ▶ sort ─ ─ ▶│ 1 │─ ─ ─ ─ ─ ─▶ merge ─ ─ ─ ─ └─────┘ └─────┘ ┌─────┐ ┌─────┐ ▲ │ 4 │ │ 3 │ │ 3 │─ ▶ sort ─ ─ ▶│ 4 │─ ─ ─ ─ ─ ─ ─ ─ ┘ └─────┘ └─────┘ in_mem_batches do a series of merges that each has a limited fan-in (number of inputs) ``` This is often better because: 1. Is `O(N*ln(N)*ln(K))` , there is some additional overhead of `ln(N)` as the same row must now be compared several times 2. the intermediate merges can be run in parallel on multiple cores (though the final one is still single threaded) It would be awesome if someone wanted to: 1. Verify the theory that there is a large fan in for large sorts 3. Implement a cascaded merge and measure if it improves performance The sort [benchmark](https://github.com/apache/arrow-datafusion/tree/main/benchmarks) (TODO) (thanks @jaylmiller!) may be interesting: ``` cargo run --release --bin parquet -- sort --path ./data --scale-factor 1.0 ``` ### Describe alternatives you've considered Another potential variation might be to get more cores involves in the merging by parallelizing the merge, as described in the [Morsel-Driven Parallelism paper](https://db.in.tum.de/~leis/papers/morsels.pdf) <img width="763" alt="Screenshot 2023-08-02 at 8 56 46 AM" src="https://github.com/apache/arrow-datafusion/assets/490673/bbc3d166-146c-4248-83c7-31d2d70bbbe6"> ### Additional context _No response_ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
