[ https://issues.apache.org/jira/browse/ARROW-14965?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17454174#comment-17454174 ]
Weston Pace commented on ARROW-14965: ------------------------------------- That makes a lot of sense. I didn't realize you were using S3. There is another throttle before you even get to the maxConnections throttle which is the I/O thread pool size. Although, since this is parquet, it might be the CPU thread pool size. Can you try modifying the CPU and I/O thread pool sizes to see if they have an effect on performance? We should also bump that maxConnections parameter up too. The python calls are: [pyarrow.set_cpu_count|https://arrow.apache.org/docs/python/generated/pyarrow.set_cpu_count.html] pyarrow.set_io_thread_count (which appears to be missing from the docs, I'll open a ticket on that) > [Python][C++] Contention when reading Parquet files with multi-threading > ------------------------------------------------------------------------ > > Key: ARROW-14965 > URL: https://issues.apache.org/jira/browse/ARROW-14965 > Project: Apache Arrow > Issue Type: Improvement > Components: C++, Python > Affects Versions: 6.0.0 > Reporter: Nick Gates > Priority: Minor > > I'm attempting to read a table from multiple Parquet files where I already > know which row_groups I want to read from each file. I also want to apply a > filter expression while reading. To do this my code looks roughly like this: > > {code:java} > def read_file(filepath): > format = ds.ParquetFileFormat(...) > fragment = format.make_fragment(filepath, row_groups=[0, 1, 2, ...]) > scanner = ds.Scanner.from_fragment( > fragment, > use_threads=True, > use_async=False, > filter=... > ) > return scanner.to_reader().read_all() > with ThreadPoolExecutor() as pool: > pa.concat_tables(pool.map(read_file, file_paths)) {code} > Running with a ProcessPoolExecutor, each of my 13 read_file calls takes at > most 2 seconds. However, with a ThreadPoolExecutor some of the read_file > calls take 20+ seconds. > > I've tried running this with various combinations of use_threads and > use_async to try and see what's happening. The code blocks are sourced from > py-spy, and identifying contention was done with viztracer. > > *use_threads: False, use_async: False* > * It looks like pyarrow._dataset.Scanner.to_reader doesn't release the GIL: > [https://github.com/apache/arrow/blob/be9a22b9b76d9cd83d85d52ffc2844056d90f367/python/pyarrow/_dataset.pyx#L3278-L3283] > * pyarrow._dataset.from_fragment seems to be contended. Py-spy suggests this > is around getting the physical_schema from the fragment? > > {code:java} > from_fragment (pyarrow/_dataset.cpython-37m-x86_64-linux-gnu.so) > __pyx_getprop_7pyarrow_8_dataset_8Fragment_physical_schema > (pyarrow/_dataset.cpython-37m-x86_64-linux-gnu.so) > __pthread_cond_timedwait (libpthread-2.17.so) {code} > > *use_threads: False, use_async: True* > * There's no longer any contention for pyarrow._dataset.from_fragment > * But there's lots of contention for pyarrow.lib.RecordBatchReader.read_all > > {code:java} > arrow::RecordBatchReader::ReadAll (pyarrow/libarrow.so.600) > arrow::dataset::(anonymous namespace)::ScannerRecordBatchReader::ReadNext > (pyarrow/libarrow_dataset.so.600) > arrow::Iterator<arrow::dataset::TaggedRecordBatch>::Next<arrow::GeneratorIterator<arrow::dataset::TaggedRecordBatch> > > (pyarrow/libarrow_dataset.so.600) > arrow::FutureImpl::Wait (pyarrow/libarrow.so.600) > std::condition_variable::wait (libstdc++.so.6.0.19){code} > *use_threads: True, use_async: False* > * Appears to be some contention on Scanner.to_reader > * But most contention remains for RecordBatchReader.read_all > {code:java} > arrow::RecordBatchReader::ReadAll (pyarrow/libarrow.so.600) > arrow::dataset::(anonymous namespace)::ScannerRecordBatchReader::ReadNext > (pyarrow/libarrow_dataset.so.600) > arrow::Iterator<arrow::dataset::TaggedRecordBatch>::Next<arrow::FunctionIterator<arrow::dataset::(anonymous > > namespace)::SyncScanner::ScanBatches(arrow::Iterator<std::shared_ptr<arrow::dataset::ScanTask> > >)::{lambda()#1}, arrow::dataset::TaggedRecordBatch> > > (pyarrow/libarrow_dataset.so.600) > std::condition_variable::wait (libstdc++.so.6.0.19) > __pthread_cond_wait (libpthread-2.17.so) {code} > *use_threads: True, use_async: True* > * Contention again mostly for RecordBatchReader.read_all, but seems to > complete in ~12 seconds rather than 20 > {code:java} > arrow::RecordBatchReader::ReadAll (pyarrow/libarrow.so.600) > arrow::dataset::(anonymous namespace)::ScannerRecordBatchReader::ReadNext > (pyarrow/libarrow_dataset.so.600) > arrow::Iterator<arrow::dataset::TaggedRecordBatch>::Next<arrow::GeneratorIterator<arrow::dataset::TaggedRecordBatch> > > (pyarrow/libarrow_dataset.so.600) > arrow::FutureImpl::Wait (pyarrow/libarrow.so.600) > std::condition_variable::wait (libstdc++.so.6.0.19) > __pthread_cond_wait (libpthread-2.17.so) {code} > Is this expected behaviour? Or should it be possible to achieve the same > performance from multi-threading as from multi-processing? > > -- This message was sent by Atlassian Jira (v8.20.1#820001)