[ 
https://issues.apache.org/jira/browse/ARROW-11781?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17374798#comment-17374798
 ] 

David Li edited comment on ARROW-11781 at 7/5/21, 12:40 PM:
------------------------------------------------------------

I think we can close it for now, as it's tracked in benchmarks and had some 
improvements in 4.0.0. There's also some more potential improvements linked. If 
there's still issues we can reopen or open a new issue.


was (Author: lidavidm):
I think we can close it for now, as it's tracked in benchmarks and had some 
improvements in 4.0.0. If there's still issues we can reopen or open a new 
issue.

> [Python] Reading small amount of files from a partitioned dataset is 
> unexpectedly slow
> --------------------------------------------------------------------------------------
>
>                 Key: ARROW-11781
>                 URL: https://issues.apache.org/jira/browse/ARROW-11781
>             Project: Apache Arrow
>          Issue Type: Bug
>          Components: Python
>            Reporter: Jeroen
>            Priority: Minor
>             Fix For: 5.0.0
>
>         Attachments: spy.svg, spy2.svg, spy3.svg
>
>
> I posted this on StackOverflow and was told I should probably create an issue 
> here.
> I managed to create a relative minimal example:
> {code:java}
>     df = spark.createDataFrame(
>         [
>             (str(a), b, c, random.randint(0, 1000))
>             for a in range(100)
>             for b in range(10)
>             for c in range(10000)
>         ],
>         ['a', 'b', 'c', 'd']
>     )    
> print("Writing the spark dataframe to the file system in partitioned 
> folders.")
>     df.repartition('a').write.partitionBy('a', 'b').parquet(str(data_dir), 
> compression='snappy', mode='overwrite')    
> def time_it(func, repetition=10):
>         start = time.time()
>         for _ in range(repetition):
>             func()
>         return (time.time() - start) / repetition    
> print("Loading the entire dataset")
> print(time_it(lambda: pd.read_parquet(data_dir, engine='pyarrow')))    
> print("Loading a single file using filters")
> print(time_it(lambda: pd.read_parquet(data_dir, engine='pyarrow', 
> filters=[[('a', '=', '0'), ('b', '=', '0')]])))    
> print("Loading a single file using filters and a specified partitioning")
>     partitioning = pa.dataset.HivePartitioning(
>         pa.schema([
>             pa.field('a', pa.string()),
>             pa.field('b', pa.string())
>         ])
>     )
> print(time_it(lambda: pd.read_parquet(data_dir, engine='pyarrow', 
> filters=[[('a', '=', '0'), ('b', '=', '0')]], partitioning=partitioning)))    
> print("Loading a single file by specifying the path")
> print(time_it(lambda: pd.read_parquet(data_dir / 'a=0' / 'b=0', 
> engine='pyarrow')))
> {code}
> Which gives me the following output:
> {code:java}
> Writing the spark dataframe to the file system in partitioned folders.
> Loading the entire dataset
> 0.23926825523376466
> Loading a single file using filters
> 0.04788286685943603
> Loading a single file using filters and a specified partitioning
> 0.0323061466217041
> Loading a single file by specifying the path
> 0.0017130613327026368
> {code}
>  
> Loading the small amount of files is about 20 times faster if you address the 
> paths directly, compared to the pyarrow filters.
>  
> The question as I posted it on StackOverflow:
> I am having some problems with the speed of loading `.parquet` files. 
> However, I don't know what I am doing wrong.
> *Problem*
> I am trying to read a single `.parquet` file from from my local filesystem 
> which is the partitioned output from a spark job. Such that there are 
> `.parquet` files in hierarchical directories named `a=x` and `b=y`.
> To achieve this, I am using `pandas.read_parquet` (which uses 
> `pyarrow.parquet.read_table`) for which I include the `filters` kwarg. The 
> run time of using the `filters` is way longer than I would expect.
> {code:java}
> # The following runs for about 55 seconds
> pd.read_parquet(<path_to_entire_dataset>, filters=[[('a', '=', 'x'), ('b', 
> '=', 'y')]])
> # The following runs for about 0.04 seconds
> pd.read_parquet(<path_to_entire_dataset>/a=x/b=y/)
> # The following runs for about 70 seconds
> pd.read_parquet(<path_to_entire_dataset>){code}
> Reading a single parquet file by specifying filters is only slightly faster 
> than loading the entire dataset, where I would expect a run time 
> approximately linear in the amount of files.
> *What mistake do I make here?*
> I realize that simply putting the filters in the path would work, however 
> this will quickly become complex as what I want to filter on will / can 
> change. Besides, I think `read_table` should be able to load this data 
> efficiently.
> PS: The entire dataset contains many millions of rows, the data I want to 
> load is only a few thousand rows.
> *Edit 1:*
>  As suggested by 0x26res I manually defined the partitioning, this lead to a 
> significant speed up, but still not as much as I would have expected. In this 
> situation the run time was about 5 seconds.
> {code:java}
> partitioning = HivePartitioning(
>  pa.schema([
>  pa.field('a', pa.string()),
>  pa.field('b', pa.int32()),
>  ])
> )
> pd.read_parquet(
>  <path_to_entire_dataset>,
>  engine='pyarrow',
>  filters=[
>  [
>  ('a', '=', x),
>  ('b', '=', y),
>  ]
>  ],
>  partitioning=partitioning
> )
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to