[jira] [Assigned] (SPARK-32609) Incorrect exchange reuse with DataSourceV2
[ https://issues.apache.org/jira/browse/SPARK-32609?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xiao Li reassigned SPARK-32609: --- Assignee: Mingjia Liu > Incorrect exchange reuse with DataSourceV2 > -- > > Key: SPARK-32609 > URL: https://issues.apache.org/jira/browse/SPARK-32609 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.4.5 >Reporter: Mingjia Liu >Assignee: Mingjia Liu >Priority: Major > Labels: correctness > Fix For: 2.4.7 > > Original Estimate: 48h > Remaining Estimate: 48h > > > {code:java} > spark.conf.set("spark.sql.exchange.reuse","true") > spark.read.format("com.google.cloud.spark.bigquery.v2.BigQueryDataSourceV2").option("table",'tpcds_1G.date_dim').load() > df.createOrReplaceTempView(table) > > df = spark.sql(""" > WITH t1 AS ( > SELECT > d_year, d_month_seq > FROM ( > SELECT t1.d_year , t2.d_month_seq > FROM > date_dim t1 > cross join > date_dim t2 > where t1.d_day_name = "Monday" and t1.d_fy_year > 2000 > and t2.d_day_name = "Monday" and t2.d_fy_year > 2000 > ) > GROUP BY d_year, d_month_seq) > > SELECT > prev_yr.d_year AS prev_year, curr_yr.d_year AS year, curr_yr.d_month_seq > FROM t1 curr_yr cross join t1 prev_yr > WHERE curr_yr.d_year=2002 AND prev_yr.d_year=2002-1 > ORDER BY d_month_seq > LIMIT 100 > > """) > df.explain() > df.show(){code} > > the repro query : > A. defines a temp table t1 > B. cross join t1 (year 2002) and t2 (year 2001) > With reuse exchange enabled, the plan incorrectly "decides" to re-use > persisted shuffle writes of A filtering on year 2002 , for year 2001. > {code:java} > == Physical Plan == > TakeOrderedAndProject(limit=100, orderBy=[d_month_seq#24371L ASC NULLS > FIRST], output=[prev_year#24366L,year#24367L,d_month_seq#24371L]) > +- *(9) Project [d_year#24402L AS prev_year#24366L, d_year#23551L AS > year#24367L, d_month_seq#24371L] >+- CartesianProduct > :- *(4) HashAggregate(keys=[d_year#23551L, d_month_seq#24371L], > functions=[]) > : +- Exchange hashpartitioning(d_year#23551L, d_month_seq#24371L, 200) > : +- *(3) HashAggregate(keys=[d_year#23551L, d_month_seq#24371L], > functions=[]) > :+- BroadcastNestedLoopJoin BuildRight, Cross > : :- *(1) Project [d_year#23551L] > : : +- *(1) ScanV2 BigQueryDataSourceV2[d_year#23551L] > (Filters: [isnotnull(d_day_name#23559), (d_day_name#23559 = Monday), > isnotnull(d_fy_year#23556L), (d_fy_yea..., Options: > [table=tpcds_1G.date_dim,paths=[]]) > : +- BroadcastExchange IdentityBroadcastMode > : +- *(2) Project [d_month_seq#24371L] > : +- *(2) ScanV2 > BigQueryDataSourceV2[d_month_seq#24371L] (Filters: > [isnotnull(d_day_name#24382), (d_day_name#24382 = Monday), > isnotnull(d_fy_year#24379L), (d_fy_yea..., Options: > [table=tpcds_1G.date_dim,paths=[]]) > +- *(8) HashAggregate(keys=[d_year#24402L, d_month_seq#24455L], > functions=[]) > +- ReusedExchange [d_year#24402L, d_month_seq#24455L], Exchange > hashpartitioning(d_year#23551L, d_month_seq#24371L, 200){code} > > > And the result is obviously incorrect because prev_year should be 2001 > {code:java} > +-++---+ > |prev_year|year|d_month_seq| > +-++---+ > | 2002|2002| 1212| > | 2002|2002| 1212| > | 2002|2002| 1212| > | 2002|2002| 1212| > | 2002|2002| 1212| > | 2002|2002| 1212| > | 2002|2002| 1212| > | 2002|2002| 1212| > | 2002|2002| 1212| > | 2002|2002| 1212| > | 2002|2002| 1212| > | 2002|2002| 1212| > | 2002|2002| 1212| > | 2002|2002| 1212| > | 2002|2002| 1212| > | 2002|2002| 1212| > | 2002|2002| 1212| > | 2002|2002| 1212| > | 2002|2002| 1212| > | 2002|2002| 1212| > +-++---+ > only showing top 20 rows > {code} > > -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-32609) Incorrect exchange reuse with DataSourceV2
[ https://issues.apache.org/jira/browse/SPARK-32609?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-32609: Assignee: Apache Spark > Incorrect exchange reuse with DataSourceV2 > -- > > Key: SPARK-32609 > URL: https://issues.apache.org/jira/browse/SPARK-32609 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.4.5 >Reporter: Mingjia Liu >Assignee: Apache Spark >Priority: Major > Labels: correctness > Original Estimate: 48h > Remaining Estimate: 48h > > > {code:java} > spark.conf.set("spark.sql.exchange.reuse","true") > spark.read.format("com.google.cloud.spark.bigquery.v2.BigQueryDataSourceV2").option("table",'tpcds_1G.date_dim').load() > df.createOrReplaceTempView(table) > > df = spark.sql(""" > WITH t1 AS ( > SELECT > d_year, d_month_seq > FROM ( > SELECT t1.d_year , t2.d_month_seq > FROM > date_dim t1 > cross join > date_dim t2 > where t1.d_day_name = "Monday" and t1.d_fy_year > 2000 > and t2.d_day_name = "Monday" and t2.d_fy_year > 2000 > ) > GROUP BY d_year, d_month_seq) > > SELECT > prev_yr.d_year AS prev_year, curr_yr.d_year AS year, curr_yr.d_month_seq > FROM t1 curr_yr cross join t1 prev_yr > WHERE curr_yr.d_year=2002 AND prev_yr.d_year=2002-1 > ORDER BY d_month_seq > LIMIT 100 > > """) > df.explain() > df.show(){code} > > the repro query : > A. defines a temp table t1 > B. cross join t1 (year 2002) and t2 (year 2001) > With reuse exchange enabled, the plan incorrectly "decides" to re-use > persisted shuffle writes of A filtering on year 2002 , for year 2001. > {code:java} > == Physical Plan == > TakeOrderedAndProject(limit=100, orderBy=[d_month_seq#24371L ASC NULLS > FIRST], output=[prev_year#24366L,year#24367L,d_month_seq#24371L]) > +- *(9) Project [d_year#24402L AS prev_year#24366L, d_year#23551L AS > year#24367L, d_month_seq#24371L] >+- CartesianProduct > :- *(4) HashAggregate(keys=[d_year#23551L, d_month_seq#24371L], > functions=[]) > : +- Exchange hashpartitioning(d_year#23551L, d_month_seq#24371L, 200) > : +- *(3) HashAggregate(keys=[d_year#23551L, d_month_seq#24371L], > functions=[]) > :+- BroadcastNestedLoopJoin BuildRight, Cross > : :- *(1) Project [d_year#23551L] > : : +- *(1) ScanV2 BigQueryDataSourceV2[d_year#23551L] > (Filters: [isnotnull(d_day_name#23559), (d_day_name#23559 = Monday), > isnotnull(d_fy_year#23556L), (d_fy_yea..., Options: > [table=tpcds_1G.date_dim,paths=[]]) > : +- BroadcastExchange IdentityBroadcastMode > : +- *(2) Project [d_month_seq#24371L] > : +- *(2) ScanV2 > BigQueryDataSourceV2[d_month_seq#24371L] (Filters: > [isnotnull(d_day_name#24382), (d_day_name#24382 = Monday), > isnotnull(d_fy_year#24379L), (d_fy_yea..., Options: > [table=tpcds_1G.date_dim,paths=[]]) > +- *(8) HashAggregate(keys=[d_year#24402L, d_month_seq#24455L], > functions=[]) > +- ReusedExchange [d_year#24402L, d_month_seq#24455L], Exchange > hashpartitioning(d_year#23551L, d_month_seq#24371L, 200){code} > > > And the result is obviously incorrect because prev_year should be 2001 > {code:java} > +-++---+ > |prev_year|year|d_month_seq| > +-++---+ > | 2002|2002| 1212| > | 2002|2002| 1212| > | 2002|2002| 1212| > | 2002|2002| 1212| > | 2002|2002| 1212| > | 2002|2002| 1212| > | 2002|2002| 1212| > | 2002|2002| 1212| > | 2002|2002| 1212| > | 2002|2002| 1212| > | 2002|2002| 1212| > | 2002|2002| 1212| > | 2002|2002| 1212| > | 2002|2002| 1212| > | 2002|2002| 1212| > | 2002|2002| 1212| > | 2002|2002| 1212| > | 2002|2002| 1212| > | 2002|2002| 1212| > | 2002|2002| 1212| > +-++---+ > only showing top 20 rows > {code} > > -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-32609) Incorrect exchange reuse with DataSourceV2
[ https://issues.apache.org/jira/browse/SPARK-32609?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-32609: Assignee: (was: Apache Spark) > Incorrect exchange reuse with DataSourceV2 > -- > > Key: SPARK-32609 > URL: https://issues.apache.org/jira/browse/SPARK-32609 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.4.5 >Reporter: Mingjia Liu >Priority: Major > Labels: correctness > Original Estimate: 48h > Remaining Estimate: 48h > > > {code:java} > spark.conf.set("spark.sql.exchange.reuse","true") > spark.read.format("com.google.cloud.spark.bigquery.v2.BigQueryDataSourceV2").option("table",'tpcds_1G.date_dim').load() > df.createOrReplaceTempView(table) > > df = spark.sql(""" > WITH t1 AS ( > SELECT > d_year, d_month_seq > FROM ( > SELECT t1.d_year , t2.d_month_seq > FROM > date_dim t1 > cross join > date_dim t2 > where t1.d_day_name = "Monday" and t1.d_fy_year > 2000 > and t2.d_day_name = "Monday" and t2.d_fy_year > 2000 > ) > GROUP BY d_year, d_month_seq) > > SELECT > prev_yr.d_year AS prev_year, curr_yr.d_year AS year, curr_yr.d_month_seq > FROM t1 curr_yr cross join t1 prev_yr > WHERE curr_yr.d_year=2002 AND prev_yr.d_year=2002-1 > ORDER BY d_month_seq > LIMIT 100 > > """) > df.explain() > df.show(){code} > > the repro query : > A. defines a temp table t1 > B. cross join t1 (year 2002) and t2 (year 2001) > With reuse exchange enabled, the plan incorrectly "decides" to re-use > persisted shuffle writes of A filtering on year 2002 , for year 2001. > {code:java} > == Physical Plan == > TakeOrderedAndProject(limit=100, orderBy=[d_month_seq#24371L ASC NULLS > FIRST], output=[prev_year#24366L,year#24367L,d_month_seq#24371L]) > +- *(9) Project [d_year#24402L AS prev_year#24366L, d_year#23551L AS > year#24367L, d_month_seq#24371L] >+- CartesianProduct > :- *(4) HashAggregate(keys=[d_year#23551L, d_month_seq#24371L], > functions=[]) > : +- Exchange hashpartitioning(d_year#23551L, d_month_seq#24371L, 200) > : +- *(3) HashAggregate(keys=[d_year#23551L, d_month_seq#24371L], > functions=[]) > :+- BroadcastNestedLoopJoin BuildRight, Cross > : :- *(1) Project [d_year#23551L] > : : +- *(1) ScanV2 BigQueryDataSourceV2[d_year#23551L] > (Filters: [isnotnull(d_day_name#23559), (d_day_name#23559 = Monday), > isnotnull(d_fy_year#23556L), (d_fy_yea..., Options: > [table=tpcds_1G.date_dim,paths=[]]) > : +- BroadcastExchange IdentityBroadcastMode > : +- *(2) Project [d_month_seq#24371L] > : +- *(2) ScanV2 > BigQueryDataSourceV2[d_month_seq#24371L] (Filters: > [isnotnull(d_day_name#24382), (d_day_name#24382 = Monday), > isnotnull(d_fy_year#24379L), (d_fy_yea..., Options: > [table=tpcds_1G.date_dim,paths=[]]) > +- *(8) HashAggregate(keys=[d_year#24402L, d_month_seq#24455L], > functions=[]) > +- ReusedExchange [d_year#24402L, d_month_seq#24455L], Exchange > hashpartitioning(d_year#23551L, d_month_seq#24371L, 200){code} > > > And the result is obviously incorrect because prev_year should be 2001 > {code:java} > +-++---+ > |prev_year|year|d_month_seq| > +-++---+ > | 2002|2002| 1212| > | 2002|2002| 1212| > | 2002|2002| 1212| > | 2002|2002| 1212| > | 2002|2002| 1212| > | 2002|2002| 1212| > | 2002|2002| 1212| > | 2002|2002| 1212| > | 2002|2002| 1212| > | 2002|2002| 1212| > | 2002|2002| 1212| > | 2002|2002| 1212| > | 2002|2002| 1212| > | 2002|2002| 1212| > | 2002|2002| 1212| > | 2002|2002| 1212| > | 2002|2002| 1212| > | 2002|2002| 1212| > | 2002|2002| 1212| > | 2002|2002| 1212| > +-++---+ > only showing top 20 rows > {code} > > -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org