cyrankiewicz commented on issue #47832:
URL: https://github.com/apache/arrow/issues/47832#issuecomment-3410638781

   Here is an example:
   ```python
   from typing import Tuple
   
   import numpy as np
   import pyarrow as pa
   import pyarrow.acero as ac
   
   import contextlib
   import resource
   import time
   
   
   def generate_data(N: int):
       return pa.Table.from_pydict({'col': np.random.rand(N)})
   
   
   def parallel_sort(table: pa.Table, sort_keys: list[Tuple[str, str]]) -> 
pa.Table:
       num_producers = pa.cpu_count()
       num_rows = table.num_rows
       slice_sz = (num_rows + num_producers - 1) // num_producers
       if num_producers > 1:
           return ac.Declaration.from_sequence(
               [
                   ac.Declaration(
                       "union",
                       ac.ExecNodeOptions(),
                       inputs=[
                           ac.Declaration.from_sequence(
                               [
                                   ac.Declaration(
                                       "table_source",
                                       ac.TableSourceNodeOptions(
                                           table.slice(k * slice_sz, slice_sz)
                                       ),
                                   ),
                                   ac.Declaration(
                                       "order_by",
                                       
ac.OrderByNodeOptions(sort_keys=sort_keys),
                                   ),
                               ]
                           )
                           for k in range(num_producers)
                       ],
                   ),
                   ac.Declaration("order_by", 
ac.OrderByNodeOptions(sort_keys=sort_keys)),
               ]
           ).to_table()
       else:
           return table.sort_by(sort_keys=sort_keys)
   
   @contextlib.contextmanager
   def timing(title: str):
       start_time = time.perf_counter()
       start_process_time = time.process_time()
       yield
       wall_time = time.perf_counter() - start_time
       process_time = time.process_time() - start_process_time
       print(f"{title}: {wall_time=:.4} {process_time=:.4}")
   
   if __name__ == '__main__':
       table = generate_data(10_000_000)
       with timing('table.sort_by'):
           table.sort_by([('col', 'ascending')])
       with timing('hand-written-acero-plan (messy)'):
           parallel_sort(table, [('col', 'ascending')])
   
   ```
   
   running this on the latest stable `pyarrow: 21.0.0`, I get:
   ```
   $ python benchmark.py 
   table.sort_by: wall_time=2.174 process_time=2.174
   hand-written-acero-plan (messy): wall_time=1.1 process_time=6.261
   ```
   
   For the regular `table.sort_by` we always get process_time ~ wall_time, this 
means that even if sorting somehow uses multiple cores it fails to get any 
speedup from it.
   
   To show that pyarrow is capable of doing "better" I hand-crafted a plan in 
acero, it's essentialy an paralle mergesort variant (but weaker since I don't 
think merging using `OrderByNode` is efficient in this case), this is not a 
proposal, just an example.
   
   You can see that for that code:
   * the reported wall time is shorter than for `table.sort_by` on the same data
   * the process_time is much higher than the wall time - multiple CPUs were 
engaged in parallel.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to