wangyum opened a new pull request #33664:
URL: https://github.com/apache/spark/pull/33664


   ### What changes were proposed in this pull request?
   
   Remove `OptimizeSubqueries` from batch of `PartitionPruning` to make DPP 
support more cases. For example:
   ```sql
   SELECT date_id, product_id FROM fact_sk f                                    
    
   JOIN (select store_id + 3 as new_store_id from dim_store where country = 
'US') s 
   ON f.store_id = s.new_store_id                                               
    
   ```
   
   Before this PR:
   ```
   == Physical Plan ==
   *(2) Project [date_id#3998, product_id#3999]
   +- *(2) BroadcastHashJoin [store_id#4001], [new_store_id#3997], Inner, 
BuildRight, false
      :- *(2) ColumnarToRow
      :  +- FileScan parquet 
default.fact_sk[date_id#3998,product_id#3999,store_id#4001] Batched: true, 
DataFilters: [], Format: Parquet, PartitionFilters: [isnotnull(store_id#4001), 
dynamicpruningexpression(true)], PushedFilters: [], ReadSchema: 
struct<date_id:int,product_id:int>
      +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, 
true] as bigint)),false), [id=#274]
         +- *(1) Project [(store_id#4002 + 3) AS new_store_id#3997]
            +- *(1) Filter ((isnotnull(country#4004) AND (country#4004 = US)) 
AND isnotnull((store_id#4002 + 3)))
               +- *(1) ColumnarToRow
                  +- FileScan parquet 
default.dim_store[store_id#4002,country#4004] Batched: true, DataFilters: 
[isnotnull(country#4004), (country#4004 = US), isnotnull((store_id#4002 + 3))], 
Format: Parquet, PartitionFilters: [], PushedFilters: [IsNotNull(country), 
EqualTo(country,US)], ReadSchema: struct<store_id:int,country:string>
   ```
   
   After this PR:
   ```
   == Physical Plan ==
   *(2) Project [date_id#3998, product_id#3999]
   +- *(2) BroadcastHashJoin [store_id#4001], [new_store_id#3997], Inner, 
BuildRight, false
      :- *(2) ColumnarToRow
      :  +- FileScan parquet 
default.fact_sk[date_id#3998,product_id#3999,store_id#4001] Batched: true, 
DataFilters: [], Format: Parquet, PartitionFilters: [isnotnull(store_id#4001), 
dynamicpruningexpression(store_id#4001 IN dynamicpruning#4007)], PushedFilters: 
[], ReadSchema: struct<date_id:int,product_id:int>
      :        +- SubqueryBroadcast dynamicpruning#4007, 0, 
[new_store_id#3997], [id=#263]
      :           +- BroadcastExchange 
HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), 
[id=#262]
      :              +- *(1) Project [(store_id#4002 + 3) AS new_store_id#3997]
      :                 +- *(1) Filter ((isnotnull(country#4004) AND 
(country#4004 = US)) AND isnotnull((store_id#4002 + 3)))
      :                    +- *(1) ColumnarToRow
      :                       +- FileScan parquet 
default.dim_store[store_id#4002,country#4004] Batched: true, DataFilters: 
[isnotnull(country#4004), (country#4004 = US), isnotnull((store_id#4002 + 3))], 
Format: Parquet, PartitionFilters: [], PushedFilters: [IsNotNull(country), 
EqualTo(country,US)], ReadSchema: struct<store_id:int,country:string>
      +- ReusedExchange [new_store_id#3997], BroadcastExchange 
HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), 
[id=#262]
   ```
   This is because `OptimizeSubqueries` will infer more filters, so we cannot 
reuse broadcasts. The following is the plan if disable 
`spark.sql.optimizer.dynamicPartitionPruning.reuseBroadcastOnly`:
   ```
   == Physical Plan ==
   *(2) Project [date_id#3998, product_id#3999]
   +- *(2) BroadcastHashJoin [store_id#4001], [new_store_id#3997], Inner, 
BuildRight, false
      :- *(2) ColumnarToRow
      :  +- FileScan parquet 
default.fact_sk[date_id#3998,product_id#3999,store_id#4001] Batched: true, 
DataFilters: [], Format: Parquet, PartitionFilters: [isnotnull(store_id#4001), 
dynamicpruningexpression(store_id#4001 IN subquery#4009)], PushedFilters: [], 
ReadSchema: struct<date_id:int,product_id:int>
      :        +- Subquery subquery#4009, [id=#284]
      :           +- *(2) HashAggregate(keys=[new_store_id#3997#4008], 
functions=[])
      :              +- Exchange hashpartitioning(new_store_id#3997#4008, 5), 
ENSURE_REQUIREMENTS, [id=#280]
      :                 +- *(1) HashAggregate(keys=[new_store_id#3997 AS 
new_store_id#3997#4008], functions=[])
      :                    +- *(1) Project [(store_id#4002 + 3) AS 
new_store_id#3997]
      :                       +- *(1) Filter (((isnotnull(store_id#4002) AND 
isnotnull(country#4004)) AND (country#4004 = US)) AND isnotnull((store_id#4002 
+ 3)))
      :                          +- *(1) ColumnarToRow
      :                             +- FileScan parquet 
default.dim_store[store_id#4002,country#4004] Batched: true, DataFilters: 
[isnotnull(store_id#4002), isnotnull(country#4004), (country#4004 = US), 
isnotnull((store_id#4002..., Format: Parquet, PartitionFilters: [], 
PushedFilters: [IsNotNull(store_id), IsNotNull(country), EqualTo(country,US)], 
ReadSchema: struct<store_id:int,country:string>
      +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, 
true] as bigint)),false), [id=#305]
         +- *(1) Project [(store_id#4002 + 3) AS new_store_id#3997]
            +- *(1) Filter ((isnotnull(country#4004) AND (country#4004 = US)) 
AND isnotnull((store_id#4002 + 3)))
               +- *(1) ColumnarToRow
                  +- FileScan parquet 
default.dim_store[store_id#4002,country#4004] Batched: true, DataFilters: 
[isnotnull(country#4004), (country#4004 = US), isnotnull((store_id#4002 + 3))], 
Format: Parquet, PartitionFilters: [], PushedFilters: [IsNotNull(country), 
EqualTo(country,US)], ReadSchema: struct<store_id:int,country:string>
   ```
   
   ### Why are the changes needed?
   
   Improve DPP.
   
   ### Does this PR introduce _any_ user-facing change?
   
   No.
   
   ### How was this patch tested?
   
   Unit test.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to