[ https://issues.apache.org/jira/browse/SPARK-32609?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Xiao Li updated SPARK-32609: ---------------------------- Affects Version/s: (was: 2.4.5) 2.4.0 > Incorrect exchange reuse with DataSourceV2 > ------------------------------------------ > > Key: SPARK-32609 > URL: https://issues.apache.org/jira/browse/SPARK-32609 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 2.4.0 > Reporter: Mingjia Liu > Assignee: Mingjia Liu > Priority: Major > Labels: correctness > Fix For: 2.4.7 > > Original Estimate: 48h > Remaining Estimate: 48h > > > {code:java} > spark.conf.set("spark.sql.exchange.reuse","true") > spark.read.format("com.google.cloud.spark.bigquery.v2.BigQueryDataSourceV2").option("table",'tpcds_1G.date_dim').load() > df.createOrReplaceTempView(table) > > df = spark.sql(""" > WITH t1 AS ( > SELECT > d_year, d_month_seq > FROM ( > SELECT t1.d_year , t2.d_month_seq > FROM > date_dim t1 > cross join > date_dim t2 > where t1.d_day_name = "Monday" and t1.d_fy_year > 2000 > and t2.d_day_name = "Monday" and t2.d_fy_year > 2000 > ) > GROUP BY d_year, d_month_seq) > > SELECT > prev_yr.d_year AS prev_year, curr_yr.d_year AS year, curr_yr.d_month_seq > FROM t1 curr_yr cross join t1 prev_yr > WHERE curr_yr.d_year=2002 AND prev_yr.d_year=2002-1 > ORDER BY d_month_seq > LIMIT 100 > > """) > df.explain() > df.show(){code} > > the repro query : > A. defines a temp table t1 > B. cross join t1 (year 2002) and t2 (year 2001) > With reuse exchange enabled, the plan incorrectly "decides" to re-use > persisted shuffle writes of A filtering on year 2002 , for year 2001. > {code:java} > == Physical Plan == > TakeOrderedAndProject(limit=100, orderBy=[d_month_seq#24371L ASC NULLS > FIRST], output=[prev_year#24366L,year#24367L,d_month_seq#24371L]) > +- *(9) Project [d_year#24402L AS prev_year#24366L, d_year#23551L AS > year#24367L, d_month_seq#24371L] > +- CartesianProduct > :- *(4) HashAggregate(keys=[d_year#23551L, d_month_seq#24371L], > functions=[]) > : +- Exchange hashpartitioning(d_year#23551L, d_month_seq#24371L, 200) > : +- *(3) HashAggregate(keys=[d_year#23551L, d_month_seq#24371L], > functions=[]) > : +- BroadcastNestedLoopJoin BuildRight, Cross > : :- *(1) Project [d_year#23551L] > : : +- *(1) ScanV2 BigQueryDataSourceV2[d_year#23551L] > (Filters: [isnotnull(d_day_name#23559), (d_day_name#23559 = Monday), > isnotnull(d_fy_year#23556L), (d_fy_yea..., Options: > [table=tpcds_1G.date_dim,paths=[]]) > : +- BroadcastExchange IdentityBroadcastMode > : +- *(2) Project [d_month_seq#24371L] > : +- *(2) ScanV2 > BigQueryDataSourceV2[d_month_seq#24371L] (Filters: > [isnotnull(d_day_name#24382), (d_day_name#24382 = Monday), > isnotnull(d_fy_year#24379L), (d_fy_yea..., Options: > [table=tpcds_1G.date_dim,paths=[]]) > +- *(8) HashAggregate(keys=[d_year#24402L, d_month_seq#24455L], > functions=[]) > +- ReusedExchange [d_year#24402L, d_month_seq#24455L], Exchange > hashpartitioning(d_year#23551L, d_month_seq#24371L, 200){code} > > > And the result is obviously incorrect because prev_year should be 2001 > {code:java} > +---------+----+-----------+ > |prev_year|year|d_month_seq| > +---------+----+-----------+ > | 2002|2002| 1212| > | 2002|2002| 1212| > | 2002|2002| 1212| > | 2002|2002| 1212| > | 2002|2002| 1212| > | 2002|2002| 1212| > | 2002|2002| 1212| > | 2002|2002| 1212| > | 2002|2002| 1212| > | 2002|2002| 1212| > | 2002|2002| 1212| > | 2002|2002| 1212| > | 2002|2002| 1212| > | 2002|2002| 1212| > | 2002|2002| 1212| > | 2002|2002| 1212| > | 2002|2002| 1212| > | 2002|2002| 1212| > | 2002|2002| 1212| > | 2002|2002| 1212| > +---------+----+-----------+ > only showing top 20 rows > {code} > > -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org