[ https://issues.apache.org/jira/browse/SPARK-32041?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Peter Toth reopened SPARK-32041: -------------------------------- > Exchange reuse won't work in cases when DPP, subqueries are involved > -------------------------------------------------------------------- > > Key: SPARK-32041 > URL: https://issues.apache.org/jira/browse/SPARK-32041 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 2.4.6, 3.0.0 > Reporter: Prakhar Jain > Priority: Major > > When an Exchange node is repeated at multiple places in the PhysicalPlan, and > if that exchange has some some DPP Subquery filter, then ReuseExchange > doesn't work for such Exchange and different stages are launched to compute > same thing. > Example: > {noformat} > // generate data > val factData = (1 to 100).map(i => (i%5, i%20, i)) > factData.toDF("store_id", "product_id", "units_sold") > .write > .partitionBy("store_id") > .format("parquet") > .saveAsTable("fact_stats") > val dimData = Seq[(Int, String, String)]( > (1, "AU", "US"), > (2, "CA", "US"), > (3, "KA", "IN"), > (4, "DL", "IN"), > (5, "GA", "PA")) > dimData.toDF("store_id", "state_province", "country") > .write > .format("parquet") > .saveAsTable("dim_stats") > sql("ANALYZE TABLE fact_stats COMPUTE STATISTICS FOR COLUMNS store_id") > sql("ANALYZE TABLE dim_stats COMPUTE STATISTICS FOR COLUMNS store_id") > // Set Configs > spark.sql("set > spark.sql.optimizer.dynamicPartitionPruning.reuseBroadcastOnly=true") > spark.sql("set spark.sql.autoBroadcastJoinThreshold=1000") > val query = """ > With view1 as ( > SELECT product_id, f.store_id > FROM fact_stats f JOIN dim_stats > ON f.store_id = dim_stats.store_id WHERE dim_stats.country = 'IN') > SELECT * FROM view1 v1 join view1 v2 WHERE v1.product_id = v2.product_id > """ > val df = spark.sql(query) > println(df.queryExecution.executedPlan) > {noformat} > {noformat} > Plan: > *(7) SortMergeJoin [product_id#1968|#1968], [product_id#2060|#2060], Inner > :- *(3) Sort [product_id#1968 ASC NULLS FIRST|#1968 ASC NULLS FIRST], false, > 0 > : +- Exchange hashpartitioning(product_id#1968, 5), true, [id=#1140|#1140] > : +- *(2) Project [product_id#1968, store_id#1970|#1968, store_id#1970] > : +- *(2) BroadcastHashJoin [store_id#1970|#1970], [store_id#1971|#1971], > Inner, BuildRight > : :- *(2) Project [product_id#1968, store_id#1970|#1968, store_id#1970] > : : +- *(2) Filter isnotnull(product_id#1968) > : : +- *(2) ColumnarToRow > : : +- FileScan parquet > default.fact_stats[product_id#1968,store_id#1970|#1968,store_id#1970] > Batched: true, DataFilters: [isnotnull(product_id#1968)|#1968)], Format: > Parquet, Location: > InMemoryFileIndex[file:/home/prakhar/src/os/1_spark/sql/core/spark-warehouse/org.apache.spark.sql..., > PartitionFilters: [isnotnull(store_id#1970), > dynamicpruningexpression(store_id#1970 IN dynamicpruning#2067)|#1970), > dynamicpruningexpression(store_id#1970 IN dynamicpruning#2067)], > PushedFilters: [IsNotNull(product_id)], ReadSchema: struct<product_id:int> > : : +- SubqueryBroadcast dynamicpruning#2067, 0, [store_id#1971|#1971], > [id=#1131|#1131] > : : +- ReusedExchange [store_id#1971|#1971], BroadcastExchange > HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint))), > [id=#1021|#1021] > : +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, > true] as bigint))), [id=#1021|#1021] > : +- *(1) Project [store_id#1971|#1971] > : +- *(1) Filter ((isnotnull(country#1973) AND (country#1973 = IN)) AND > isnotnull(store_id#1971)) > : +- *(1) ColumnarToRow > : +- FileScan parquet > default.dim_stats[store_id#1971,country#1973|#1971,country#1973] Batched: > true, DataFilters: [isnotnull(country#1973), (country#1973 = IN), > isnotnull(store_id#1971)|#1973), (country#1973 = IN), > isnotnull(store_id#1971)], Format: Parquet, Location: > InMemoryFileIndex[file:/home/prakhar/src/os/1_spark/sql/core/spark-warehouse/org.apache.spark.sql..., > PartitionFilters: [], PushedFilters: [IsNotNull(country), > EqualTo(country,IN), IsNotNull(store_id)], ReadSchema: > struct<store_id:int,country:string> > +- *(6) Sort [product_id#2060 ASC NULLS FIRST|#2060 ASC NULLS FIRST], false, > 0 > +- ReusedExchange [product_id#2060, store_id#2062|#2060, store_id#2062], > Exchange hashpartitioning(product_id#1968, 5), true, [id=#1026|#1026] > {noformat} > Issue: > Note the last line of plan. Its a ReusedExchange which is pointing to > id=1026. But There is no Exchange node in plan with ID 1026. ReusedExchange > node is pointing to incorrect Child node (1026 instead of 1140) and so in > actual, exchange reuse won't happen in this query. > Another query where issue is because of ReuseSubquery: > {noformat} > spark.sql("set spark.sql.autoBroadcastJoinThreshold=-1") > val query1 = """ > | With view1 as ( > | SELECT product_id, units_sold > | FROM fact_stats > | WHERE store_id = (SELECT max(store_id) FROM dim_stats) > | and units_sold = 2 > | ), view2 as ( > | SELECT product_id, units_sold > | FROM fact_stats > | WHERE store_id = (SELECT max(store_id) FROM dim_stats) > | and units_sold = 1 > | ) > | > | SELECT * > | FROM view1 v1 join view2 v2 join view2 v3 > | WHERE v1.product_id = v2.product_id and v2.product_id = > v3.product_id > """ > // Here we are joining v2 with self. So it should use ReuseExchange. But > final plan computes v2 twice. > val df = spark.sql(query1); > println(df.queryExecution.executedPlan){noformat} > Here we are joining v2 with self. So it should use ReuseExchange. But final > plan computes v2 twice. > -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org