2010YOUY01 commented on issue #6983:
URL: 
https://github.com/apache/arrow-datafusion/issues/6983#issuecomment-1661281753

   @gobraves Thank you for trying! I also took a look at this issue (and find 
it pretty difficult to solve 😨 ), hope the following info might be helpful:
   Here is an overview of parallel parquet scan
   ```rust
   let _df = _ctx.read_parquet(FILENAME, _read_options).await.unwrap();
   
   let _cached = _df.cache().await;
   ```
   After `_ctx.read_parquet(...`,  a `LogicalPlan` with `TableScan` is created 
and stored inside the dataframe.
   Inside `_df.cache()`, the `LogicalPlan` will first be converted into a 
physical plan with `ParquetExec` node, and then the physical optimizer will try 
to modify the `ParquetExec` node's `file_groups` to make it parallel.
   
   My reproducer:
   ```
   DataFusion CLI v28.0.0
   ❯ create external table test stored as parquet location 
'/Users/yongting/Desktop/code/my_datafusion/arrow-datafusion/benchmarks/data/tpch_sf1/lineitem/part-0.parquet';
   0 rows in set. Query took 0.064 seconds.
   ❯ create table t as select * from test;
   0 rows in set. Query took 16.364 seconds.
   ❯ create table t as (select * from test where l_orderkey > 0);
   0 rows in set. Query took 3.646 seconds.
   ```
   
   ```
   ❯ explain select * from test;
   
+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
   | plan_type     | plan                                                       
                                                                                
                                                                                
                                                                                
                                                                  |
   
+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
   | logical_plan  | TableScan: test projection=[l_orderkey, l_partkey, 
l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, 
l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, 
l_shipinstruct, l_shipmode, l_comment]                                          
                                                                                
            |
   | physical_plan | ParquetExec: file_groups={1 group: 
[[Users/yongting/Desktop/code/my_datafusion/arrow-datafusion/benchmarks/data/tpch_sf1/lineitem/part-0.parquet]]},
 projection=[l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, 
l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, 
l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment] |
   |               |                                                            
                                                                                
                                                                                
                                                                                
                                                                  |
   
+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
   2 rows in set. Query took 0.009 seconds.
   ❯ explain select * from test where l_orderkey > 0;
   
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
   | plan_type     | plan                                                       
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                     |
   
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
   | logical_plan  | Filter: test.l_orderkey > Int64(0)                         
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                     |
   |               |   TableScan: test projection=[l_orderkey, l_partkey, 
l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, 
l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, 
l_shipinstruct, l_shipmode, l_comment], partial_filters=[test.l_orderkey > 
Int64(0)]                                                                       
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                  |
   | physical_plan | CoalesceBatchesExec: target_batch_size=8192                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                     |
   |               |   FilterExec: l_orderkey@0 > 0                             
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                     |
   |               |     ParquetExec: file_groups={12 groups: 
[[Users/yongting/Desktop/code/my_datafusion/arrow-datafusion/benchmarks/data/tpch_sf1/lineitem/part-0.parquet:0..13271296],
 
[Users/yongting/Desktop/code/my_datafusion/arrow-datafusion/benchmarks/data/tpch_sf1/lineitem/part-0.parquet:13271296..26542592],
 
[Users/yongting/Desktop/code/my_datafusion/arrow-datafusion/benchmarks/data/tpch_sf1/lineitem/part-0.parquet:26542592..39813888],
 
[Users/yongting/Desktop/code/my_datafusion/arrow-datafusion/benchmarks/data/tpch_sf1/lineitem/part-0.parquet:39813888..53085184],
 
[Users/yongting/Desktop/code/my_datafusion/arrow-datafusion/benchmarks/data/tpch_sf1/lineitem/part-0.parquet:53085184..66356480],
 ...]}, projection=[l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, 
l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, 
l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment], 
predicate=l_orderkey@0 > 0, pruning_predicate=l_orderkey_max@0 > 0 |
   |               |                                                            
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                                     |
   
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
   ```
   The 2nd one is parallelized, `explain verbose select ...` can be used to see 
the specific physical optimizer rule to repartition the `ParquetExec`
   
https://github.com/apache/arrow-datafusion/blob/a9561a0f06c25f370dc39df08d057db85c4e0c7a/datafusion/core/src/physical_optimizer/repartition.rs#L166
   I think there might be some bug inside this function, if 
`parquet_exec.get_repartitioned()` inside it gets called, then the 
`ParquetExec` should be properly parallelized
   
   
   This reproducer should have the same root cause as the original one, for the 
original reproducer, adding a filter to `_df` can also get it parallelized:
   ```rust
       let _df = _ctx
           .read_parquet(FILENAME, _read_options)
           .await
           .unwrap()
           .filter(col("l_orderkey").gt(lit(0)))
           .unwrap();
   // Then can be parallelized
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to