[ 
https://issues.apache.org/jira/browse/ARROW-9458?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17157337#comment-17157337
 ] 

Joris Van den Bossche commented on ARROW-9458:
----------------------------------------------

[~maartenbreddels] how big are the row groups in your parquet files?

I am experimenting a bit with a local version of the taxi data as well, and I 
see that each scan task maps to a single row group, and not a single parquet 
file. In my case, this gave a _lot_ of tasks, where each task doesn't take much 
time (only around 2-3 ms). Possibly too small to overcome python overhead from 
running them in parallel? (but I would need to rewrite the parquet files to 
test this, and that it is not an issue with the scanner).

When using fragments manually (which map to single parquet files), I do see 
parallelization.

> [Python] Dataset singlethreaded only
> ------------------------------------
>
>                 Key: ARROW-9458
>                 URL: https://issues.apache.org/jira/browse/ARROW-9458
>             Project: Apache Arrow
>          Issue Type: Bug
>          Components: Python
>            Reporter: Maarten Breddels
>            Priority: Major
>         Attachments: image-2020-07-14-14-31-29-943.png, 
> image-2020-07-14-14-38-16-767.png
>
>
> I'm not sure this is a misunderstanding, or a compilation issue (flags?) or 
> an issue in the C++ layer.
> I have 1000 parquet files with a total of 1 billion rows (1 million rows each 
> file, ~20 columns). I wanted to see if I could go through all rows 1 of 2 
> columns efficiently (vaex use case).
>  
> {code:java}
> import pyarrow.parquet
> import pyarrow as pa
> import pyarrow.dataset as ds
> import glob
> ds = pa.dataset.dataset(glob.glob('/data/taxi_parquet/data_*.parquet'))
> scanned = 0
> for scan_task in ds.scan(batch_size=1_000_000, columns=['passenger_count'], 
> use_threads=True):
>     for record_batch in scan_task.execute():
>         scanned += record_batch.num_rows
> scanned
> {code}
> This only seems to use 1 cpu.
> Using a threadpool from Python:
> {code:java}
> # %%timeit
> import concurrent.futures
> pool = concurrent.futures.ThreadPoolExecutor()
> ds = pa.dataset.dataset(glob.glob('/data/taxi_parquet/data_*.parquet'))
> def process(scan_task):
>     scan_count = 0
>     for record_batch in scan_task.execute():
>         scan_count += len(record_batch)
>     return scan_count
> sum(pool.map(process, ds.scan(batch_size=1_000_000, 
> columns=['passenger_count'], use_threads=False)))
> {code}
> Gives me a similar performance, again, only 100% cpu usage (=1 core/cpu).
> py-spy (profiler for Python) shows no GIL, so this might be something at the 
> C++ layer.
> Am I 'holding it wrong' or could this be a bug? Note that IO speed is not a 
> problem on this system (it actually all comes from OS cache, no disk read 
> observed)
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to