[ 
https://issues.apache.org/jira/browse/SPARK-32041?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Peter Toth reopened SPARK-32041:
--------------------------------

> Exchange reuse won't work in cases when DPP, subqueries are involved
> --------------------------------------------------------------------
>
>                 Key: SPARK-32041
>                 URL: https://issues.apache.org/jira/browse/SPARK-32041
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.4.6, 3.0.0
>            Reporter: Prakhar Jain
>            Priority: Major
>
> When an Exchange node is repeated at multiple places in the PhysicalPlan, and 
> if that exchange has some some DPP Subquery filter, then ReuseExchange 
> doesn't work for such Exchange and different stages are launched to compute 
> same thing.
> Example:
> {noformat}
> // generate data
> val factData = (1 to 100).map(i => (i%5, i%20, i))
> factData.toDF("store_id", "product_id", "units_sold")
>   .write
>   .partitionBy("store_id")
>   .format("parquet")
>   .saveAsTable("fact_stats")
> val dimData = Seq[(Int, String, String)](
>   (1, "AU", "US"),
>   (2, "CA", "US"),
>   (3, "KA", "IN"),
>   (4, "DL", "IN"),
>   (5, "GA", "PA"))
> dimData.toDF("store_id", "state_province", "country")
>   .write
>   .format("parquet")
>   .saveAsTable("dim_stats")
> sql("ANALYZE TABLE fact_stats COMPUTE STATISTICS FOR COLUMNS store_id")
> sql("ANALYZE TABLE dim_stats COMPUTE STATISTICS FOR COLUMNS store_id")
> // Set Configs
> spark.sql("set 
> spark.sql.optimizer.dynamicPartitionPruning.reuseBroadcastOnly=true")
> spark.sql("set spark.sql.autoBroadcastJoinThreshold=1000")
> val query = """
>     With view1 as (
>       SELECT product_id, f.store_id
>       FROM fact_stats f JOIN dim_stats
>       ON f.store_id = dim_stats.store_id WHERE dim_stats.country = 'IN')
>     SELECT * FROM view1 v1 join view1 v2 WHERE v1.product_id = v2.product_id
> """
> val df = spark.sql(query)
> println(df.queryExecution.executedPlan)
> {noformat}
> {noformat}
> Plan:
>  *(7) SortMergeJoin [product_id#1968|#1968], [product_id#2060|#2060], Inner
>  :- *(3) Sort [product_id#1968 ASC NULLS FIRST|#1968 ASC NULLS FIRST], false, > 0
>  : +- Exchange hashpartitioning(product_id#1968, 5), true, [id=#1140|#1140]
>  : +- *(2) Project [product_id#1968, store_id#1970|#1968, store_id#1970]
>  : +- *(2) BroadcastHashJoin [store_id#1970|#1970], [store_id#1971|#1971], 
> Inner, BuildRight
>  : :- *(2) Project [product_id#1968, store_id#1970|#1968, store_id#1970]
>  : : +- *(2) Filter isnotnull(product_id#1968)
>  : : +- *(2) ColumnarToRow
>  : : +- FileScan parquet 
> default.fact_stats[product_id#1968,store_id#1970|#1968,store_id#1970] 
> Batched: true, DataFilters: [isnotnull(product_id#1968)|#1968)], Format: 
> Parquet, Location: 
> InMemoryFileIndex[file:/home/prakhar/src/os/1_spark/sql/core/spark-warehouse/org.apache.spark.sql...,
>  PartitionFilters: [isnotnull(store_id#1970), 
> dynamicpruningexpression(store_id#1970 IN dynamicpruning#2067)|#1970), 
> dynamicpruningexpression(store_id#1970 IN dynamicpruning#2067)], 
> PushedFilters: [IsNotNull(product_id)], ReadSchema: struct<product_id:int>
>  : : +- SubqueryBroadcast dynamicpruning#2067, 0, [store_id#1971|#1971], 
> [id=#1131|#1131]
>  : : +- ReusedExchange [store_id#1971|#1971], BroadcastExchange 
> HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint))), 
> [id=#1021|#1021]
>  : +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, 
> true] as bigint))), [id=#1021|#1021]
>  : +- *(1) Project [store_id#1971|#1971]
>  : +- *(1) Filter ((isnotnull(country#1973) AND (country#1973 = IN)) AND 
> isnotnull(store_id#1971))
>  : +- *(1) ColumnarToRow
>  : +- FileScan parquet 
> default.dim_stats[store_id#1971,country#1973|#1971,country#1973] Batched: 
> true, DataFilters: [isnotnull(country#1973), (country#1973 = IN), 
> isnotnull(store_id#1971)|#1973), (country#1973 = IN), 
> isnotnull(store_id#1971)], Format: Parquet, Location: 
> InMemoryFileIndex[file:/home/prakhar/src/os/1_spark/sql/core/spark-warehouse/org.apache.spark.sql...,
>  PartitionFilters: [], PushedFilters: [IsNotNull(country), 
> EqualTo(country,IN), IsNotNull(store_id)], ReadSchema: 
> struct<store_id:int,country:string>
>  +- *(6) Sort [product_id#2060 ASC NULLS FIRST|#2060 ASC NULLS FIRST], false, > 0
>  +- ReusedExchange [product_id#2060, store_id#2062|#2060, store_id#2062], 
> Exchange hashpartitioning(product_id#1968, 5), true, [id=#1026|#1026]
> {noformat}
> Issue:
>  Note the last line of plan. Its a ReusedExchange which is pointing to 
> id=1026. But There is no Exchange node in plan with ID 1026. ReusedExchange 
> node is pointing to incorrect Child node (1026 instead of 1140) and so in 
> actual, exchange reuse won't happen in this query.
> Another query where issue is because of ReuseSubquery:
> {noformat}
> spark.sql("set spark.sql.autoBroadcastJoinThreshold=-1")
> val query1 = """
>                   | With view1 as (
>                   |   SELECT product_id, units_sold
>                   |   FROM fact_stats
>                   |   WHERE store_id = (SELECT max(store_id) FROM dim_stats)
>                   |         and units_sold = 2
>                   | ), view2 as (
>                   |   SELECT product_id, units_sold
>                   |   FROM fact_stats
>                   |   WHERE store_id = (SELECT max(store_id) FROM dim_stats)
>                   |         and units_sold = 1
>                   | )
>                   |
>                   | SELECT *
>                   | FROM view1 v1 join view2 v2 join view2 v3
>                   | WHERE v1.product_id = v2.product_id and v2.product_id = 
> v3.product_id
> """
> // Here we are joining v2 with self. So it should use ReuseExchange. But 
> final plan computes v2 twice.
> val df = spark.sql(query1);
> println(df.queryExecution.executedPlan){noformat}
> Here we are joining v2 with self. So it should use ReuseExchange. But final 
> plan computes v2 twice.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to