[ 
https://issues.apache.org/jira/browse/ARROW-9458?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17157306#comment-17157306
 ] 

Joris Van den Bossche commented on ARROW-9458:
----------------------------------------------

That it doesn't do this in parallel automatically is expected (in principle 
reading a row group could also be done in parallel, but since in the datasets 
project different scan tasks can be run in parallel, I suppose reading a single 
row group is limited to one thread to avoid oversubscription, not fully sure 
about this).

To quote the {{Dataset.scan()}} docstring:

{quote} It produces a stream of ScanTasks which is meant to be a unit of work
to be dispatched. The tasks are not executed automatically, the user is
responsible to execute and dispatch the individual tasks, so custom
local task scheduling can be implemented.{quote}

The {{Scanner.scan()}} (which is used under the hood) is a bit more explicit:

{quote}The caller is responsible to dispatch/schedule said tasks. Tasks should
 be safe to run in a concurrent fashion and outlive the iterator.{quote}

But the {{use_threads}} argument to {{Dataset.scan()}} is thus certainly 
confusing. I need to check to be sure, but AFAIK this keyword is only used when 
using the {{to_table}} method (which does automatic execution of the scan 
tasks).

But, so since the {{scan}} method is meant to do this task execution yourself, 
your second example should be a target use case. So if that is not working, 
that's something to further investigate.

> [Python] Dataset singlethreaded only
> ------------------------------------
>
>                 Key: ARROW-9458
>                 URL: https://issues.apache.org/jira/browse/ARROW-9458
>             Project: Apache Arrow
>          Issue Type: Bug
>          Components: Python
>            Reporter: Maarten Breddels
>            Priority: Major
>
> I'm not sure this is a misunderstanding, or a compilation issue (flags?) or 
> an issue in the C++ layer.
> I have 1000 parquet files with a total of 1 billion rows (1 million rows each 
> file, ~20 columns). I wanted to see if I could go through all rows 1 of 2 
> columns efficiently (vaex use case).
>  
> {code:java}
> import pyarrow.parquet
> import pyarrow as pa
> import pyarrow.dataset as ds
> import glob
> ds = pa.dataset.dataset(glob.glob('/data/taxi_parquet/data_*.parquet'))
> scanned = 0
> for scan_task in ds.scan(batch_size=1_000_000, columns=['passenger_count'], 
> use_threads=True):
>     for record_batch in scan_task.execute():
>         scanned += record_batch.num_rows
> scanned
> {code}
> This only seems to use 1 cpu.
> Using a threadpool from Python:
> {code:java}
> # %%timeit
> import concurrent.futures
> pool = concurrent.futures.ThreadPoolExecutor()
> ds = pa.dataset.dataset(glob.glob('/data/taxi_parquet/data_*.parquet'))
> def process(scan_task):
>     scan_count = 0
>     for record_batch in scan_task.execute():
>         scan_count += len(record_batch)
>     return scan_count
> sum(pool.map(process, ds.scan(batch_size=1_000_000, 
> columns=['passenger_count'], use_threads=False)))
> {code}
> Gives me a similar performance, again, only 100% cpu usage (=1 core/cpu).
> py-spy (profiler for Python) shows no GIL, so this might be something at the 
> C++ layer.
> Am I 'holding it wrong' or could this be a bug? Note that IO speed is not a 
> problem on this system (it actually all comes from OS cache, no disk read 
> observed)
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to