[ https://issues.apache.org/jira/browse/ARROW-9458?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17157338#comment-17157338 ]
Maarten Breddels commented on ARROW-9458: ----------------------------------------- Running this (now with all columns) {code:java} import pyarrow as pa import pyarrow.dataset as ds import concurrent.futures import glob pool = concurrent.futures.ThreadPoolExecutor() ds = pa.dataset.dataset(glob.glob('/data/taxi_parquet/data_*.parquet')) def process(scan_task): scan_count = 0 for record_batch in scan_task.execute(): scan_count += len(record_batch) return scan_count sum(pool.map(process, ds.scan(batch_size=1_000_000, use_threads=False))){code} The output of py-spy is: !image-2020-07-14-14-31-29-943.png! And it takes 5 minutes. Indeed, if I run similarly with to_table() {code:java} import pyarrow as pa import pyarrow.dataset as ds import concurrent.futures import glob pool = concurrent.futures.ThreadPoolExecutor() ds = pa.dataset.dataset(glob.glob('/data/taxi_parquet/data_*.parquet')) def process(f): scan_count = 0 return len(f.to_table(use_threads=False)) sum(pool.map(process, ds.get_fragments())) {code} I see much better cpu usage (although it seems to have difficulty getting started): !image-2020-07-14-14-38-16-767.png! And it takes 30 seconds (sometimes 16... seem very irregular). changing the last part to use_threads=True, i.e.: {code:java} def process(f): scan_count = 0 return len(f.to_table(use_threads=True)) sum(pool.map(process, ds.get_fragments())) {code} speeds it up to 9 seconds. Note that I have 1000 files/fragments. I hope this info gives some clues. My best guess is that the path using the scanner/execute path has a lock/mutex at the c++ layer avoiding the effective use of multithreading. > [Python] Dataset singlethreaded only > ------------------------------------ > > Key: ARROW-9458 > URL: https://issues.apache.org/jira/browse/ARROW-9458 > Project: Apache Arrow > Issue Type: Bug > Components: Python > Reporter: Maarten Breddels > Priority: Major > Attachments: image-2020-07-14-14-31-29-943.png, > image-2020-07-14-14-38-16-767.png > > > I'm not sure this is a misunderstanding, or a compilation issue (flags?) or > an issue in the C++ layer. > I have 1000 parquet files with a total of 1 billion rows (1 million rows each > file, ~20 columns). I wanted to see if I could go through all rows 1 of 2 > columns efficiently (vaex use case). > > {code:java} > import pyarrow.parquet > import pyarrow as pa > import pyarrow.dataset as ds > import glob > ds = pa.dataset.dataset(glob.glob('/data/taxi_parquet/data_*.parquet')) > scanned = 0 > for scan_task in ds.scan(batch_size=1_000_000, columns=['passenger_count'], > use_threads=True): > for record_batch in scan_task.execute(): > scanned += record_batch.num_rows > scanned > {code} > This only seems to use 1 cpu. > Using a threadpool from Python: > {code:java} > # %%timeit > import concurrent.futures > pool = concurrent.futures.ThreadPoolExecutor() > ds = pa.dataset.dataset(glob.glob('/data/taxi_parquet/data_*.parquet')) > def process(scan_task): > scan_count = 0 > for record_batch in scan_task.execute(): > scan_count += len(record_batch) > return scan_count > sum(pool.map(process, ds.scan(batch_size=1_000_000, > columns=['passenger_count'], use_threads=False))) > {code} > Gives me a similar performance, again, only 100% cpu usage (=1 core/cpu). > py-spy (profiler for Python) shows no GIL, so this might be something at the > C++ layer. > Am I 'holding it wrong' or could this be a bug? Note that IO speed is not a > problem on this system (it actually all comes from OS cache, no disk read > observed) > -- This message was sent by Atlassian Jira (v8.3.4#803005)