cyrankiewicz commented on issue #47832:
URL: https://github.com/apache/arrow/issues/47832#issuecomment-3410638781
Here is an example:
```python
from typing import Tuple
import numpy as np
import pyarrow as pa
import pyarrow.acero as ac
import contextlib
import resource
import time
def generate_data(N: int):
return pa.Table.from_pydict({'col': np.random.rand(N)})
def parallel_sort(table: pa.Table, sort_keys: list[Tuple[str, str]]) ->
pa.Table:
num_producers = pa.cpu_count()
num_rows = table.num_rows
slice_sz = (num_rows + num_producers - 1) // num_producers
if num_producers > 1:
return ac.Declaration.from_sequence(
[
ac.Declaration(
"union",
ac.ExecNodeOptions(),
inputs=[
ac.Declaration.from_sequence(
[
ac.Declaration(
"table_source",
ac.TableSourceNodeOptions(
table.slice(k * slice_sz, slice_sz)
),
),
ac.Declaration(
"order_by",
ac.OrderByNodeOptions(sort_keys=sort_keys),
),
]
)
for k in range(num_producers)
],
),
ac.Declaration("order_by",
ac.OrderByNodeOptions(sort_keys=sort_keys)),
]
).to_table()
else:
return table.sort_by(sort_keys=sort_keys)
@contextlib.contextmanager
def timing(title: str):
start_time = time.perf_counter()
start_process_time = time.process_time()
yield
wall_time = time.perf_counter() - start_time
process_time = time.process_time() - start_process_time
print(f"{title}: {wall_time=:.4} {process_time=:.4}")
if __name__ == '__main__':
table = generate_data(10_000_000)
with timing('table.sort_by'):
table.sort_by([('col', 'ascending')])
with timing('hand-written-acero-plan (messy)'):
parallel_sort(table, [('col', 'ascending')])
```
running this on the latest stable `pyarrow: 21.0.0`, I get:
```
$ python benchmark.py
table.sort_by: wall_time=2.174 process_time=2.174
hand-written-acero-plan (messy): wall_time=1.1 process_time=6.261
```
For the regular `table.sort_by` we always get process_time ~ wall_time, this
means that even if sorting somehow uses multiple cores it fails to get any
speedup from it.
To show that pyarrow is capable of doing "better" I hand-crafted a plan in
acero, it's essentialy an paralle mergesort variant (but weaker since I don't
think merging using `OrderByNode` is efficient in this case), this is not a
proposal, just an example.
You can see that for that code:
* the reported wall time is shorter than for `table.sort_by` on the same data
* the process_time is much higher than the wall time - multiple CPUs were
engaged in parallel.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]