[ https://issues.apache.org/jira/browse/ARROW-11781?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Neal Richardson updated ARROW-11781: ------------------------------------ Fix Version/s: (was: 4.0.0) 5.0.0 > [Python] Reading small amount of files from a partitioned dataset is > unexpectedly slow > -------------------------------------------------------------------------------------- > > Key: ARROW-11781 > URL: https://issues.apache.org/jira/browse/ARROW-11781 > Project: Apache Arrow > Issue Type: Bug > Components: Python > Reporter: Jeroen > Priority: Minor > Fix For: 5.0.0 > > Attachments: spy.svg, spy2.svg, spy3.svg > > > I posted this on StackOverflow and was told I should probably create an issue > here. > I managed to create a relative minimal example: > {code:java} > df = spark.createDataFrame( > [ > (str(a), b, c, random.randint(0, 1000)) > for a in range(100) > for b in range(10) > for c in range(10000) > ], > ['a', 'b', 'c', 'd'] > ) > print("Writing the spark dataframe to the file system in partitioned > folders.") > df.repartition('a').write.partitionBy('a', 'b').parquet(str(data_dir), > compression='snappy', mode='overwrite') > def time_it(func, repetition=10): > start = time.time() > for _ in range(repetition): > func() > return (time.time() - start) / repetition > print("Loading the entire dataset") > print(time_it(lambda: pd.read_parquet(data_dir, engine='pyarrow'))) > print("Loading a single file using filters") > print(time_it(lambda: pd.read_parquet(data_dir, engine='pyarrow', > filters=[[('a', '=', '0'), ('b', '=', '0')]]))) > print("Loading a single file using filters and a specified partitioning") > partitioning = pa.dataset.HivePartitioning( > pa.schema([ > pa.field('a', pa.string()), > pa.field('b', pa.string()) > ]) > ) > print(time_it(lambda: pd.read_parquet(data_dir, engine='pyarrow', > filters=[[('a', '=', '0'), ('b', '=', '0')]], partitioning=partitioning))) > print("Loading a single file by specifying the path") > print(time_it(lambda: pd.read_parquet(data_dir / 'a=0' / 'b=0', > engine='pyarrow'))) > {code} > Which gives me the following output: > {code:java} > Writing the spark dataframe to the file system in partitioned folders. > Loading the entire dataset > 0.23926825523376466 > Loading a single file using filters > 0.04788286685943603 > Loading a single file using filters and a specified partitioning > 0.0323061466217041 > Loading a single file by specifying the path > 0.0017130613327026368 > {code} > > Loading the small amount of files is about 20 times faster if you address the > paths directly, compared to the pyarrow filters. > > The question as I posted it on StackOverflow: > I am having some problems with the speed of loading `.parquet` files. > However, I don't know what I am doing wrong. > *Problem* > I am trying to read a single `.parquet` file from from my local filesystem > which is the partitioned output from a spark job. Such that there are > `.parquet` files in hierarchical directories named `a=x` and `b=y`. > To achieve this, I am using `pandas.read_parquet` (which uses > `pyarrow.parquet.read_table`) for which I include the `filters` kwarg. The > run time of using the `filters` is way longer than I would expect. > {code:java} > # The following runs for about 55 seconds > pd.read_parquet(<path_to_entire_dataset>, filters=[[('a', '=', 'x'), ('b', > '=', 'y')]]) > # The following runs for about 0.04 seconds > pd.read_parquet(<path_to_entire_dataset>/a=x/b=y/) > # The following runs for about 70 seconds > pd.read_parquet(<path_to_entire_dataset>){code} > Reading a single parquet file by specifying filters is only slightly faster > than loading the entire dataset, where I would expect a run time > approximately linear in the amount of files. > *What mistake do I make here?* > I realize that simply putting the filters in the path would work, however > this will quickly become complex as what I want to filter on will / can > change. Besides, I think `read_table` should be able to load this data > efficiently. > PS: The entire dataset contains many millions of rows, the data I want to > load is only a few thousand rows. > *Edit 1:* > As suggested by 0x26res I manually defined the partitioning, this lead to a > significant speed up, but still not as much as I would have expected. In this > situation the run time was about 5 seconds. > {code:java} > partitioning = HivePartitioning( > pa.schema([ > pa.field('a', pa.string()), > pa.field('b', pa.int32()), > ]) > ) > pd.read_parquet( > <path_to_entire_dataset>, > engine='pyarrow', > filters=[ > [ > ('a', '=', x), > ('b', '=', y), > ] > ], > partitioning=partitioning > ) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)