gruuya commented on code in PR #7180:
URL: https://github.com/apache/arrow-datafusion/pull/7180#discussion_r1287280089


##########
datafusion/core/src/physical_plan/sorts/sort.rs:
##########
@@ -639,15 +614,47 @@ fn write_sorted(
     Ok(())
 }
 
-fn read_spill(sender: Sender<Result<RecordBatch>>, path: &Path) -> Result<()> {
+/// Stream batches from spill files.
+///
+/// Each spill file has one or more batches. Intra-batch order is guaranteed 
(each one is sorted),
+/// but the inter-batch ordering is not guaranteed, hence why we need to 
convert each batch from the
+/// spill to a separate input stream for the merge-sort procedure.

Review Comment:
   Fwiw, I did revise the approach to stream the entire spill in one stream 
when inter-batch order is guaranteed (which involved keeping track of this as 
well). I also added the hard cut-off for eager batching to 100 rows.
   
    <details><summary>sorting benchmarks</summary>
     
     ```text
   ┏━━━━━━━━━━━━━━┳━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
   ┃ Query        ┃       main ┃ top-k-eager-sorting ┃        Change ┃
   ┡━━━━━━━━━━━━━━╇━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
   │ Qsort utf8   │ 28233.61ms │          29251.16ms │     no change │
   │ fetch None   │            │                     │               │
   │ Qsort int    │ 36739.73ms │          36870.39ms │     no change │
   │ fetch None   │            │                     │               │
   │ Qsort        │ 29974.54ms │          30480.98ms │     no change │
   │ decimal      │            │                     │               │
   │ fetch None   │            │                     │               │
   │ Qsort        │ 40214.81ms │          40804.29ms │     no change │
   │ integer      │            │                     │               │
   │ tuple fetch  │            │                     │               │
   │ None         │            │                     │               │
   │ Qsort utf8   │ 29429.65ms │          29875.64ms │     no change │
   │ tuple fetch  │            │                     │               │
   │ None         │            │                     │               │
   │ Qsort mixed  │ 34435.12ms │          34415.10ms │     no change │
   │ tuple fetch  │            │                     │               │
   │ None         │            │                     │               │
   │ Qsort utf8   │  1709.85ms │           1829.22ms │  1.07x slower │
   │ fetch        │            │                     │               │
   │ Some(1)      │            │                     │               │
   │ Qsort int    │  1842.45ms │           1749.96ms │ +1.05x faster │
   │ fetch        │            │                     │               │
   │ Some(1)      │            │                     │               │
   │ Qsort        │  1680.46ms │           1677.47ms │     no change │
   │ decimal      │            │                     │               │
   │ fetch        │            │                     │               │
   │ Some(1)      │            │                     │               │
   │ Qsort        │  1745.56ms │           1871.46ms │  1.07x slower │
   │ integer      │            │                     │               │
   │ tuple fetch  │            │                     │               │
   │ Some(1)      │            │                     │               │
   │ Qsort utf8   │  1965.73ms │           2331.78ms │  1.19x slower │
   │ tuple fetch  │            │                     │               │
   │ Some(1)      │            │                     │               │
   │ Qsort mixed  │  1813.14ms │           2061.60ms │  1.14x slower │
   │ tuple fetch  │            │                     │               │
   │ Some(1)      │            │                     │               │
   │ Qsort utf8   │  1712.66ms │           1829.49ms │  1.07x slower │
   │ fetch        │            │                     │               │
   │ Some(10)     │            │                     │               │
   │ Qsort int    │  1832.73ms │           1759.00ms │     no change │
   │ fetch        │            │                     │               │
   │ Some(10)     │            │                     │               │
   │ Qsort        │  1699.42ms │           1686.28ms │     no change │
   │ decimal      │            │                     │               │
   │ fetch        │            │                     │               │
   │ Some(10)     │            │                     │               │
   │ Qsort        │  1762.86ms │           1877.71ms │  1.07x slower │
   │ integer      │            │                     │               │
   │ tuple fetch  │            │                     │               │
   │ Some(10)     │            │                     │               │
   │ Qsort utf8   │  1974.07ms │           2337.96ms │  1.18x slower │
   │ tuple fetch  │            │                     │               │
   │ Some(10)     │            │                     │               │
   │ Qsort mixed  │  1854.64ms │           2078.75ms │  1.12x slower │
   │ tuple fetch  │            │                     │               │
   │ Some(10)     │            │                     │               │
   │ Qsort utf8   │  1733.28ms │           1853.74ms │  1.07x slower │
   │ fetch        │            │                     │               │
   │ Some(100)    │            │                     │               │
   │ Qsort int    │  1845.74ms │           1797.14ms │     no change │
   │ fetch        │            │                     │               │
   │ Some(100)    │            │                     │               │
   │ Qsort        │  1685.38ms │           1712.97ms │     no change │
   │ decimal      │            │                     │               │
   │ fetch        │            │                     │               │
   │ Some(100)    │            │                     │               │
   │ Qsort        │  1799.54ms │           1907.62ms │  1.06x slower │
   │ integer      │            │                     │               │
   │ tuple fetch  │            │                     │               │
   │ Some(100)    │            │                     │               │
   │ Qsort utf8   │  2008.06ms │           2371.06ms │  1.18x slower │
   │ tuple fetch  │            │                     │               │
   │ Some(100)    │            │                     │               │
   │ Qsort mixed  │  1855.62ms │           2110.55ms │  1.14x slower │
   │ tuple fetch  │            │                     │               │
   │ Some(100)    │            │                     │               │
   │ Qsort utf8   │  1728.54ms │           1760.71ms │     no change │
   │ fetch        │            │                     │               │
   │ Some(101)    │            │                     │               │
   │ Qsort int    │  1867.64ms │           1902.02ms │     no change │
   │ fetch        │            │                     │               │
   │ Some(101)    │            │                     │               │
   │ Qsort        │  1691.19ms │           1708.74ms │     no change │
   │ decimal      │            │                     │               │
   │ fetch        │            │                     │               │
   │ Some(101)    │            │                     │               │
   │ Qsort        │  1784.79ms │           1830.82ms │     no change │
   │ integer      │            │                     │               │
   │ tuple fetch  │            │                     │               │
   │ Some(101)    │            │                     │               │
   │ Qsort utf8   │  1999.16ms │           2034.22ms │     no change │
   │ tuple fetch  │            │                     │               │
   │ Some(101)    │            │                     │               │
   │ Qsort mixed  │  1869.82ms │           1914.20ms │     no change │
   │ tuple fetch  │            │                     │               │
   │ Some(101)    │            │                     │               │
   │ Qsort utf8   │  1774.79ms │           1814.19ms │     no change │
   │ fetch        │            │                     │               │
   │ Some(1000)   │            │                     │               │
   │ Qsort int    │  1959.51ms │           1999.51ms │     no change │
   │ fetch        │            │                     │               │
   │ Some(1000)   │            │                     │               │
   │ Qsort        │  1714.06ms │           1730.36ms │     no change │
   │ decimal      │            │                     │               │
   │ fetch        │            │                     │               │
   │ Some(1000)   │            │                     │               │
   │ Qsort        │  1940.85ms │           1969.82ms │     no change │
   │ integer      │            │                     │               │
   │ tuple fetch  │            │                     │               │
   │ Some(1000)   │            │                     │               │
   │ Qsort utf8   │  2103.61ms │           2132.59ms │     no change │
   │ tuple fetch  │            │                     │               │
   │ Some(1000)   │            │                     │               │
   │ Qsort mixed  │  2032.56ms │           2087.27ms │     no change │
   │ tuple fetch  │            │                     │               │
   │ Some(1000)   │            │                     │               │
   └──────────────┴────────────┴─────────────────────┴───────────────┘
   ```
   </details>
   
   Overall the times are not that impressive, and the implementation is quite 
convoluted at this point, so I wouldn't merge this either.



##########
datafusion/core/src/physical_plan/sorts/sort.rs:
##########
@@ -639,15 +614,47 @@ fn write_sorted(
     Ok(())
 }
 
-fn read_spill(sender: Sender<Result<RecordBatch>>, path: &Path) -> Result<()> {
+/// Stream batches from spill files.
+///
+/// Each spill file has one or more batches. Intra-batch order is guaranteed 
(each one is sorted),
+/// but the inter-batch ordering is not guaranteed, hence why we need to 
convert each batch from the
+/// spill to a separate input stream for the merge-sort procedure.

Review Comment:
   Fwiw, I did revise the approach to stream the entire spill in one stream 
when inter-batch order is guaranteed (which involved keeping track of this as 
well). I also added the hard cut-off for eager batching to 100 rows.
   
    <details><summary>sorting benchmarks</summary>
     
     ```text
   ┏━━━━━━━━━━━━━━┳━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
   ┃ Query        ┃       main ┃ top-k-eager-sorting ┃        Change ┃
   ┡━━━━━━━━━━━━━━╇━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
   │ Qsort utf8   │ 28233.61ms │          29251.16ms │     no change │
   │ fetch None   │            │                     │               │
   │ Qsort int    │ 36739.73ms │          36870.39ms │     no change │
   │ fetch None   │            │                     │               │
   │ Qsort        │ 29974.54ms │          30480.98ms │     no change │
   │ decimal      │            │                     │               │
   │ fetch None   │            │                     │               │
   │ Qsort        │ 40214.81ms │          40804.29ms │     no change │
   │ integer      │            │                     │               │
   │ tuple fetch  │            │                     │               │
   │ None         │            │                     │               │
   │ Qsort utf8   │ 29429.65ms │          29875.64ms │     no change │
   │ tuple fetch  │            │                     │               │
   │ None         │            │                     │               │
   │ Qsort mixed  │ 34435.12ms │          34415.10ms │     no change │
   │ tuple fetch  │            │                     │               │
   │ None         │            │                     │               │
   │ Qsort utf8   │  1709.85ms │           1829.22ms │  1.07x slower │
   │ fetch        │            │                     │               │
   │ Some(1)      │            │                     │               │
   │ Qsort int    │  1842.45ms │           1749.96ms │ +1.05x faster │
   │ fetch        │            │                     │               │
   │ Some(1)      │            │                     │               │
   │ Qsort        │  1680.46ms │           1677.47ms │     no change │
   │ decimal      │            │                     │               │
   │ fetch        │            │                     │               │
   │ Some(1)      │            │                     │               │
   │ Qsort        │  1745.56ms │           1871.46ms │  1.07x slower │
   │ integer      │            │                     │               │
   │ tuple fetch  │            │                     │               │
   │ Some(1)      │            │                     │               │
   │ Qsort utf8   │  1965.73ms │           2331.78ms │  1.19x slower │
   │ tuple fetch  │            │                     │               │
   │ Some(1)      │            │                     │               │
   │ Qsort mixed  │  1813.14ms │           2061.60ms │  1.14x slower │
   │ tuple fetch  │            │                     │               │
   │ Some(1)      │            │                     │               │
   │ Qsort utf8   │  1712.66ms │           1829.49ms │  1.07x slower │
   │ fetch        │            │                     │               │
   │ Some(10)     │            │                     │               │
   │ Qsort int    │  1832.73ms │           1759.00ms │     no change │
   │ fetch        │            │                     │               │
   │ Some(10)     │            │                     │               │
   │ Qsort        │  1699.42ms │           1686.28ms │     no change │
   │ decimal      │            │                     │               │
   │ fetch        │            │                     │               │
   │ Some(10)     │            │                     │               │
   │ Qsort        │  1762.86ms │           1877.71ms │  1.07x slower │
   │ integer      │            │                     │               │
   │ tuple fetch  │            │                     │               │
   │ Some(10)     │            │                     │               │
   │ Qsort utf8   │  1974.07ms │           2337.96ms │  1.18x slower │
   │ tuple fetch  │            │                     │               │
   │ Some(10)     │            │                     │               │
   │ Qsort mixed  │  1854.64ms │           2078.75ms │  1.12x slower │
   │ tuple fetch  │            │                     │               │
   │ Some(10)     │            │                     │               │
   │ Qsort utf8   │  1733.28ms │           1853.74ms │  1.07x slower │
   │ fetch        │            │                     │               │
   │ Some(100)    │            │                     │               │
   │ Qsort int    │  1845.74ms │           1797.14ms │     no change │
   │ fetch        │            │                     │               │
   │ Some(100)    │            │                     │               │
   │ Qsort        │  1685.38ms │           1712.97ms │     no change │
   │ decimal      │            │                     │               │
   │ fetch        │            │                     │               │
   │ Some(100)    │            │                     │               │
   │ Qsort        │  1799.54ms │           1907.62ms │  1.06x slower │
   │ integer      │            │                     │               │
   │ tuple fetch  │            │                     │               │
   │ Some(100)    │            │                     │               │
   │ Qsort utf8   │  2008.06ms │           2371.06ms │  1.18x slower │
   │ tuple fetch  │            │                     │               │
   │ Some(100)    │            │                     │               │
   │ Qsort mixed  │  1855.62ms │           2110.55ms │  1.14x slower │
   │ tuple fetch  │            │                     │               │
   │ Some(100)    │            │                     │               │
   │ Qsort utf8   │  1728.54ms │           1760.71ms │     no change │
   │ fetch        │            │                     │               │
   │ Some(101)    │            │                     │               │
   │ Qsort int    │  1867.64ms │           1902.02ms │     no change │
   │ fetch        │            │                     │               │
   │ Some(101)    │            │                     │               │
   │ Qsort        │  1691.19ms │           1708.74ms │     no change │
   │ decimal      │            │                     │               │
   │ fetch        │            │                     │               │
   │ Some(101)    │            │                     │               │
   │ Qsort        │  1784.79ms │           1830.82ms │     no change │
   │ integer      │            │                     │               │
   │ tuple fetch  │            │                     │               │
   │ Some(101)    │            │                     │               │
   │ Qsort utf8   │  1999.16ms │           2034.22ms │     no change │
   │ tuple fetch  │            │                     │               │
   │ Some(101)    │            │                     │               │
   │ Qsort mixed  │  1869.82ms │           1914.20ms │     no change │
   │ tuple fetch  │            │                     │               │
   │ Some(101)    │            │                     │               │
   │ Qsort utf8   │  1774.79ms │           1814.19ms │     no change │
   │ fetch        │            │                     │               │
   │ Some(1000)   │            │                     │               │
   │ Qsort int    │  1959.51ms │           1999.51ms │     no change │
   │ fetch        │            │                     │               │
   │ Some(1000)   │            │                     │               │
   │ Qsort        │  1714.06ms │           1730.36ms │     no change │
   │ decimal      │            │                     │               │
   │ fetch        │            │                     │               │
   │ Some(1000)   │            │                     │               │
   │ Qsort        │  1940.85ms │           1969.82ms │     no change │
   │ integer      │            │                     │               │
   │ tuple fetch  │            │                     │               │
   │ Some(1000)   │            │                     │               │
   │ Qsort utf8   │  2103.61ms │           2132.59ms │     no change │
   │ tuple fetch  │            │                     │               │
   │ Some(1000)   │            │                     │               │
   │ Qsort mixed  │  2032.56ms │           2087.27ms │     no change │
   │ tuple fetch  │            │                     │               │
   │ Some(1000)   │            │                     │               │
   └──────────────┴────────────┴─────────────────────┴───────────────┘
   ```
   </details>
   
   Overall the times are not that impressive, and the implementation is quite 
convoluted at this point, so I wouldn't merge this either.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to