[ 
https://issues.apache.org/jira/browse/ARROW-9458?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17157338#comment-17157338
 ] 

Maarten Breddels commented on ARROW-9458:
-----------------------------------------

 

Running this (now with all columns)
{code:java}
import pyarrow as pa
import pyarrow.dataset as ds
import concurrent.futures
import glob
pool = concurrent.futures.ThreadPoolExecutor()
ds = pa.dataset.dataset(glob.glob('/data/taxi_parquet/data_*.parquet'))
def process(scan_task):
    scan_count = 0
    for record_batch in scan_task.execute():
        scan_count += len(record_batch)
    return scan_count
sum(pool.map(process, ds.scan(batch_size=1_000_000, use_threads=False))){code}
The output of py-spy is:

 

!image-2020-07-14-14-31-29-943.png!

And it takes 5 minutes.

Indeed, if I run similarly with to_table()

 
{code:java}
import pyarrow as pa
import pyarrow.dataset as ds
import concurrent.futures
import glob
pool = concurrent.futures.ThreadPoolExecutor()
ds = pa.dataset.dataset(glob.glob('/data/taxi_parquet/data_*.parquet'))
def process(f):
    scan_count = 0
    return len(f.to_table(use_threads=False))
sum(pool.map(process, ds.get_fragments()))
{code}
I see much better cpu usage (although it seems to have difficulty getting 
started):

!image-2020-07-14-14-38-16-767.png!

 

And it takes 30 seconds (sometimes 16... seem very irregular).

changing the last part to use_threads=True, i.e.:
{code:java}
def process(f):
    scan_count = 0
    return len(f.to_table(use_threads=True))
sum(pool.map(process, ds.get_fragments()))
{code}
speeds it up to 9 seconds.

Note that I have 1000 files/fragments.

I hope this info gives some clues.

 

My best guess is that the path using the scanner/execute path has a lock/mutex 
at the c++ layer avoiding the effective use of multithreading.

 

> [Python] Dataset singlethreaded only
> ------------------------------------
>
>                 Key: ARROW-9458
>                 URL: https://issues.apache.org/jira/browse/ARROW-9458
>             Project: Apache Arrow
>          Issue Type: Bug
>          Components: Python
>            Reporter: Maarten Breddels
>            Priority: Major
>         Attachments: image-2020-07-14-14-31-29-943.png, 
> image-2020-07-14-14-38-16-767.png
>
>
> I'm not sure this is a misunderstanding, or a compilation issue (flags?) or 
> an issue in the C++ layer.
> I have 1000 parquet files with a total of 1 billion rows (1 million rows each 
> file, ~20 columns). I wanted to see if I could go through all rows 1 of 2 
> columns efficiently (vaex use case).
>  
> {code:java}
> import pyarrow.parquet
> import pyarrow as pa
> import pyarrow.dataset as ds
> import glob
> ds = pa.dataset.dataset(glob.glob('/data/taxi_parquet/data_*.parquet'))
> scanned = 0
> for scan_task in ds.scan(batch_size=1_000_000, columns=['passenger_count'], 
> use_threads=True):
>     for record_batch in scan_task.execute():
>         scanned += record_batch.num_rows
> scanned
> {code}
> This only seems to use 1 cpu.
> Using a threadpool from Python:
> {code:java}
> # %%timeit
> import concurrent.futures
> pool = concurrent.futures.ThreadPoolExecutor()
> ds = pa.dataset.dataset(glob.glob('/data/taxi_parquet/data_*.parquet'))
> def process(scan_task):
>     scan_count = 0
>     for record_batch in scan_task.execute():
>         scan_count += len(record_batch)
>     return scan_count
> sum(pool.map(process, ds.scan(batch_size=1_000_000, 
> columns=['passenger_count'], use_threads=False)))
> {code}
> Gives me a similar performance, again, only 100% cpu usage (=1 core/cpu).
> py-spy (profiler for Python) shows no GIL, so this might be something at the 
> C++ layer.
> Am I 'holding it wrong' or could this be a bug? Note that IO speed is not a 
> problem on this system (it actually all comes from OS cache, no disk read 
> observed)
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to