[ https://issues.apache.org/jira/browse/ARROW-11781?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17296277#comment-17296277 ]
David Li commented on ARROW-11781: ---------------------------------- Using Weston's reproduction and looking at flamegraphs, the overheads are as follows: # Time spent scanning the filesystem, # Time spent parsing paths into partition expressions, # Time spent evaluating the filter against the partition expressions, # Time spent calling C++ destructors for objects that aren't created when you directly specify the file. By default, Arrow infers the partition schema, and infers one that dictionary-encodes the values. However, in this case, this makes #2 and #3 significantly more expensive. When you manually specify the partitioning, you don't use dictionary encoding, and this is much cheaper in #2 and #3. Also, depending on what types you manually specify, there may or may not be casts inserted in the filter - this has a smaller effect but also makes #3 more expensive. These overheads won't completely go away, but can get better, and as you've already found, #2 and #3 are in your control. Here's the flamegraph with a fully specified partition schema that does not require casting or dictionaries: [^spy3.svg] As you can see, now most of the overhead is #1 and #3. Here's the flamegraph for when you directly specify the file path: [^spy2.svg] Note I benchmarked using Weston's repro, which uses all integer types for the partitions - string types may throw in additional wreches. > [Python] Reading small amount of files from a partitioned dataset is > unexpectedly slow > -------------------------------------------------------------------------------------- > > Key: ARROW-11781 > URL: https://issues.apache.org/jira/browse/ARROW-11781 > Project: Apache Arrow > Issue Type: Bug > Components: Python > Reporter: Jeroen > Priority: Minor > Attachments: spy.svg, spy2.svg, spy3.svg > > > I posted this on StackOverflow and was told I should probably create an issue > here. > I managed to create a relative minimal example: > {code:java} > df = spark.createDataFrame( > [ > (str(a), b, c, random.randint(0, 1000)) > for a in range(100) > for b in range(10) > for c in range(10000) > ], > ['a', 'b', 'c', 'd'] > ) > print("Writing the spark dataframe to the file system in partitioned > folders.") > df.repartition('a').write.partitionBy('a', 'b').parquet(str(data_dir), > compression='snappy', mode='overwrite') > def time_it(func, repetition=10): > start = time.time() > for _ in range(repetition): > func() > return (time.time() - start) / repetition > print("Loading the entire dataset") > print(time_it(lambda: pd.read_parquet(data_dir, engine='pyarrow'))) > print("Loading a single file using filters") > print(time_it(lambda: pd.read_parquet(data_dir, engine='pyarrow', > filters=[[('a', '=', '0'), ('b', '=', '0')]]))) > print("Loading a single file using filters and a specified partitioning") > partitioning = pa.dataset.HivePartitioning( > pa.schema([ > pa.field('a', pa.string()), > pa.field('b', pa.string()) > ]) > ) > print(time_it(lambda: pd.read_parquet(data_dir, engine='pyarrow', > filters=[[('a', '=', '0'), ('b', '=', '0')]], partitioning=partitioning))) > print("Loading a single file by specifying the path") > print(time_it(lambda: pd.read_parquet(data_dir / 'a=0' / 'b=0', > engine='pyarrow'))) > {code} > Which gives me the following output: > {code:java} > Writing the spark dataframe to the file system in partitioned folders. > Loading the entire dataset > 0.23926825523376466 > Loading a single file using filters > 0.04788286685943603 > Loading a single file using filters and a specified partitioning > 0.0323061466217041 > Loading a single file by specifying the path > 0.0017130613327026368 > {code} > > Loading the small amount of files is about 20 times faster if you address the > paths directly, compared to the pyarrow filters. > > The question as I posted it on StackOverflow: > I am having some problems with the speed of loading `.parquet` files. > However, I don't know what I am doing wrong. > *Problem* > I am trying to read a single `.parquet` file from from my local filesystem > which is the partitioned output from a spark job. Such that there are > `.parquet` files in hierarchical directories named `a=x` and `b=y`. > To achieve this, I am using `pandas.read_parquet` (which uses > `pyarrow.parquet.read_table`) for which I include the `filters` kwarg. The > run time of using the `filters` is way longer than I would expect. > {code:java} > # The following runs for about 55 seconds > pd.read_parquet(<path_to_entire_dataset>, filters=[[('a', '=', 'x'), ('b', > '=', 'y')]]) > # The following runs for about 0.04 seconds > pd.read_parquet(<path_to_entire_dataset>/a=x/b=y/) > # The following runs for about 70 seconds > pd.read_parquet(<path_to_entire_dataset>){code} > Reading a single parquet file by specifying filters is only slightly faster > than loading the entire dataset, where I would expect a run time > approximately linear in the amount of files. > *What mistake do I make here?* > I realize that simply putting the filters in the path would work, however > this will quickly become complex as what I want to filter on will / can > change. Besides, I think `read_table` should be able to load this data > efficiently. > PS: The entire dataset contains many millions of rows, the data I want to > load is only a few thousand rows. > *Edit 1:* > As suggested by 0x26res I manually defined the partitioning, this lead to a > significant speed up, but still not as much as I would have expected. In this > situation the run time was about 5 seconds. > {code:java} > partitioning = HivePartitioning( > pa.schema([ > pa.field('a', pa.string()), > pa.field('b', pa.int32()), > ]) > ) > pd.read_parquet( > <path_to_entire_dataset>, > engine='pyarrow', > filters=[ > [ > ('a', '=', x), > ('b', '=', y), > ] > ], > partitioning=partitioning > ) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)