[jira] [Commented] (SPARK-32609) Incorrect exchange reuse with DataSourceV2

2020-08-14 Thread Apache Spark (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-32609?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17178032#comment-17178032
 ] 

Apache Spark commented on SPARK-32609:
--

User 'mingjialiu' has created a pull request for this issue:
https://github.com/apache/spark/pull/29435

> Incorrect exchange reuse with DataSourceV2
> --
>
> Key: SPARK-32609
> URL: https://issues.apache.org/jira/browse/SPARK-32609
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.5
>Reporter: Mingjia Liu
>Priority: Major
>  Labels: correctness
>   Original Estimate: 48h
>  Remaining Estimate: 48h
>
>  
> {code:java}
> spark.conf.set("spark.sql.exchange.reuse","true")
> spark.read.format("com.google.cloud.spark.bigquery.v2.BigQueryDataSourceV2").option("table",'tpcds_1G.date_dim').load()
> df.createOrReplaceTempView(table)
> 
> df = spark.sql(""" 
> WITH t1 AS (
> SELECT 
> d_year, d_month_seq
> FROM (
> SELECT t1.d_year , t2.d_month_seq  
> FROM 
> date_dim t1
> cross join
> date_dim t2
> where t1.d_day_name = "Monday" and t1.d_fy_year > 2000
> and t2.d_day_name = "Monday" and t2.d_fy_year > 2000
> )
> GROUP BY d_year, d_month_seq)
>
>  SELECT
> prev_yr.d_year AS prev_year, curr_yr.d_year AS year, curr_yr.d_month_seq
>  FROM t1 curr_yr cross join t1 prev_yr
>  WHERE curr_yr.d_year=2002 AND prev_yr.d_year=2002-1
>  ORDER BY d_month_seq
>  LIMIT 100
>  
>  """)
> df.explain()
> df.show(){code}
>  
> the repro query :
> A. defines a temp table t1  
> B. cross join t1 (year 2002)  and  t2 (year 2001)
> With reuse exchange enabled, the plan incorrectly "decides" to re-use 
> persisted shuffle writes of A filtering on year 2002 , for year 2001.
> {code:java}
> == Physical Plan ==
> TakeOrderedAndProject(limit=100, orderBy=[d_month_seq#24371L ASC NULLS 
> FIRST], output=[prev_year#24366L,year#24367L,d_month_seq#24371L])
> +- *(9) Project [d_year#24402L AS prev_year#24366L, d_year#23551L AS 
> year#24367L, d_month_seq#24371L]
>+- CartesianProduct
>   :- *(4) HashAggregate(keys=[d_year#23551L, d_month_seq#24371L], 
> functions=[])
>   :  +- Exchange hashpartitioning(d_year#23551L, d_month_seq#24371L, 200)
>   : +- *(3) HashAggregate(keys=[d_year#23551L, d_month_seq#24371L], 
> functions=[])
>   :+- BroadcastNestedLoopJoin BuildRight, Cross
>   :   :- *(1) Project [d_year#23551L]
>   :   :  +- *(1) ScanV2 BigQueryDataSourceV2[d_year#23551L] 
> (Filters: [isnotnull(d_day_name#23559), (d_day_name#23559 = Monday), 
> isnotnull(d_fy_year#23556L), (d_fy_yea..., Options: 
> [table=tpcds_1G.date_dim,paths=[]])
>   :   +- BroadcastExchange IdentityBroadcastMode
>   :  +- *(2) Project [d_month_seq#24371L]
>   : +- *(2) ScanV2 
> BigQueryDataSourceV2[d_month_seq#24371L] (Filters: 
> [isnotnull(d_day_name#24382), (d_day_name#24382 = Monday), 
> isnotnull(d_fy_year#24379L), (d_fy_yea..., Options: 
> [table=tpcds_1G.date_dim,paths=[]])
>   +- *(8) HashAggregate(keys=[d_year#24402L, d_month_seq#24455L], 
> functions=[])
>  +- ReusedExchange [d_year#24402L, d_month_seq#24455L], Exchange 
> hashpartitioning(d_year#23551L, d_month_seq#24371L, 200){code}
>  
>  
> And the result is obviously incorrect because prev_year should be 2001
> {code:java}
> +-++---+
> |prev_year|year|d_month_seq|
> +-++---+
> | 2002|2002|   1212|
> | 2002|2002|   1212|
> | 2002|2002|   1212|
> | 2002|2002|   1212|
> | 2002|2002|   1212|
> | 2002|2002|   1212|
> | 2002|2002|   1212|
> | 2002|2002|   1212|
> | 2002|2002|   1212|
> | 2002|2002|   1212|
> | 2002|2002|   1212|
> | 2002|2002|   1212|
> | 2002|2002|   1212|
> | 2002|2002|   1212|
> | 2002|2002|   1212|
> | 2002|2002|   1212|
> | 2002|2002|   1212|
> | 2002|2002|   1212|
> | 2002|2002|   1212|
> | 2002|2002|   1212|
> +-++---+
> only showing top 20 rows
> {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-32609) Incorrect exchange reuse with DataSourceV2

2020-08-14 Thread Apache Spark (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-32609?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17178031#comment-17178031
 ] 

Apache Spark commented on SPARK-32609:
--

User 'mingjialiu' has created a pull request for this issue:
https://github.com/apache/spark/pull/29435

> Incorrect exchange reuse with DataSourceV2
> --
>
> Key: SPARK-32609
> URL: https://issues.apache.org/jira/browse/SPARK-32609
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.5
>Reporter: Mingjia Liu
>Priority: Major
>  Labels: correctness
>   Original Estimate: 48h
>  Remaining Estimate: 48h
>
>  
> {code:java}
> spark.conf.set("spark.sql.exchange.reuse","true")
> spark.read.format("com.google.cloud.spark.bigquery.v2.BigQueryDataSourceV2").option("table",'tpcds_1G.date_dim').load()
> df.createOrReplaceTempView(table)
> 
> df = spark.sql(""" 
> WITH t1 AS (
> SELECT 
> d_year, d_month_seq
> FROM (
> SELECT t1.d_year , t2.d_month_seq  
> FROM 
> date_dim t1
> cross join
> date_dim t2
> where t1.d_day_name = "Monday" and t1.d_fy_year > 2000
> and t2.d_day_name = "Monday" and t2.d_fy_year > 2000
> )
> GROUP BY d_year, d_month_seq)
>
>  SELECT
> prev_yr.d_year AS prev_year, curr_yr.d_year AS year, curr_yr.d_month_seq
>  FROM t1 curr_yr cross join t1 prev_yr
>  WHERE curr_yr.d_year=2002 AND prev_yr.d_year=2002-1
>  ORDER BY d_month_seq
>  LIMIT 100
>  
>  """)
> df.explain()
> df.show(){code}
>  
> the repro query :
> A. defines a temp table t1  
> B. cross join t1 (year 2002)  and  t2 (year 2001)
> With reuse exchange enabled, the plan incorrectly "decides" to re-use 
> persisted shuffle writes of A filtering on year 2002 , for year 2001.
> {code:java}
> == Physical Plan ==
> TakeOrderedAndProject(limit=100, orderBy=[d_month_seq#24371L ASC NULLS 
> FIRST], output=[prev_year#24366L,year#24367L,d_month_seq#24371L])
> +- *(9) Project [d_year#24402L AS prev_year#24366L, d_year#23551L AS 
> year#24367L, d_month_seq#24371L]
>+- CartesianProduct
>   :- *(4) HashAggregate(keys=[d_year#23551L, d_month_seq#24371L], 
> functions=[])
>   :  +- Exchange hashpartitioning(d_year#23551L, d_month_seq#24371L, 200)
>   : +- *(3) HashAggregate(keys=[d_year#23551L, d_month_seq#24371L], 
> functions=[])
>   :+- BroadcastNestedLoopJoin BuildRight, Cross
>   :   :- *(1) Project [d_year#23551L]
>   :   :  +- *(1) ScanV2 BigQueryDataSourceV2[d_year#23551L] 
> (Filters: [isnotnull(d_day_name#23559), (d_day_name#23559 = Monday), 
> isnotnull(d_fy_year#23556L), (d_fy_yea..., Options: 
> [table=tpcds_1G.date_dim,paths=[]])
>   :   +- BroadcastExchange IdentityBroadcastMode
>   :  +- *(2) Project [d_month_seq#24371L]
>   : +- *(2) ScanV2 
> BigQueryDataSourceV2[d_month_seq#24371L] (Filters: 
> [isnotnull(d_day_name#24382), (d_day_name#24382 = Monday), 
> isnotnull(d_fy_year#24379L), (d_fy_yea..., Options: 
> [table=tpcds_1G.date_dim,paths=[]])
>   +- *(8) HashAggregate(keys=[d_year#24402L, d_month_seq#24455L], 
> functions=[])
>  +- ReusedExchange [d_year#24402L, d_month_seq#24455L], Exchange 
> hashpartitioning(d_year#23551L, d_month_seq#24371L, 200){code}
>  
>  
> And the result is obviously incorrect because prev_year should be 2001
> {code:java}
> +-++---+
> |prev_year|year|d_month_seq|
> +-++---+
> | 2002|2002|   1212|
> | 2002|2002|   1212|
> | 2002|2002|   1212|
> | 2002|2002|   1212|
> | 2002|2002|   1212|
> | 2002|2002|   1212|
> | 2002|2002|   1212|
> | 2002|2002|   1212|
> | 2002|2002|   1212|
> | 2002|2002|   1212|
> | 2002|2002|   1212|
> | 2002|2002|   1212|
> | 2002|2002|   1212|
> | 2002|2002|   1212|
> | 2002|2002|   1212|
> | 2002|2002|   1212|
> | 2002|2002|   1212|
> | 2002|2002|   1212|
> | 2002|2002|   1212|
> | 2002|2002|   1212|
> +-++---+
> only showing top 20 rows
> {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-32609) Incorrect exchange reuse with DataSourceV2

2020-08-14 Thread Apache Spark (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-32609?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17177528#comment-17177528
 ] 

Apache Spark commented on SPARK-32609:
--

User 'mingjialiu' has created a pull request for this issue:
https://github.com/apache/spark/pull/29430

> Incorrect exchange reuse with DataSourceV2
> --
>
> Key: SPARK-32609
> URL: https://issues.apache.org/jira/browse/SPARK-32609
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.5
>Reporter: Mingjia Liu
>Priority: Major
>  Labels: correctness
>   Original Estimate: 48h
>  Remaining Estimate: 48h
>
>  
> {code:java}
> spark.conf.set("spark.sql.exchange.reuse","true")
> spark.read.format("com.google.cloud.spark.bigquery.v2.BigQueryDataSourceV2").option("table",'tpcds_1G.date_dim').load()
> df.createOrReplaceTempView(table)
> 
> df = spark.sql(""" 
> WITH t1 AS (
> SELECT 
> d_year, d_month_seq
> FROM (
> SELECT t1.d_year , t2.d_month_seq  
> FROM 
> date_dim t1
> cross join
> date_dim t2
> where t1.d_day_name = "Monday" and t1.d_fy_year > 2000
> and t2.d_day_name = "Monday" and t2.d_fy_year > 2000
> )
> GROUP BY d_year, d_month_seq)
>
>  SELECT
> prev_yr.d_year AS prev_year, curr_yr.d_year AS year, curr_yr.d_month_seq
>  FROM t1 curr_yr cross join t1 prev_yr
>  WHERE curr_yr.d_year=2002 AND prev_yr.d_year=2002-1
>  ORDER BY d_month_seq
>  LIMIT 100
>  
>  """)
> df.explain()
> df.show(){code}
>  
> the repro query :
> A. defines a temp table t1  
> B. cross join t1 (year 2002)  and  t2 (year 2001)
> With reuse exchange enabled, the plan incorrectly "decides" to re-use 
> persisted shuffle writes of A filtering on year 2002 , for year 2001.
> {code:java}
> == Physical Plan ==
> TakeOrderedAndProject(limit=100, orderBy=[d_month_seq#24371L ASC NULLS 
> FIRST], output=[prev_year#24366L,year#24367L,d_month_seq#24371L])
> +- *(9) Project [d_year#24402L AS prev_year#24366L, d_year#23551L AS 
> year#24367L, d_month_seq#24371L]
>+- CartesianProduct
>   :- *(4) HashAggregate(keys=[d_year#23551L, d_month_seq#24371L], 
> functions=[])
>   :  +- Exchange hashpartitioning(d_year#23551L, d_month_seq#24371L, 200)
>   : +- *(3) HashAggregate(keys=[d_year#23551L, d_month_seq#24371L], 
> functions=[])
>   :+- BroadcastNestedLoopJoin BuildRight, Cross
>   :   :- *(1) Project [d_year#23551L]
>   :   :  +- *(1) ScanV2 BigQueryDataSourceV2[d_year#23551L] 
> (Filters: [isnotnull(d_day_name#23559), (d_day_name#23559 = Monday), 
> isnotnull(d_fy_year#23556L), (d_fy_yea..., Options: 
> [table=tpcds_1G.date_dim,paths=[]])
>   :   +- BroadcastExchange IdentityBroadcastMode
>   :  +- *(2) Project [d_month_seq#24371L]
>   : +- *(2) ScanV2 
> BigQueryDataSourceV2[d_month_seq#24371L] (Filters: 
> [isnotnull(d_day_name#24382), (d_day_name#24382 = Monday), 
> isnotnull(d_fy_year#24379L), (d_fy_yea..., Options: 
> [table=tpcds_1G.date_dim,paths=[]])
>   +- *(8) HashAggregate(keys=[d_year#24402L, d_month_seq#24455L], 
> functions=[])
>  +- ReusedExchange [d_year#24402L, d_month_seq#24455L], Exchange 
> hashpartitioning(d_year#23551L, d_month_seq#24371L, 200){code}
>  
>  
> And the result is obviously incorrect because prev_year should be 2001
> {code:java}
> +-++---+
> |prev_year|year|d_month_seq|
> +-++---+
> | 2002|2002|   1212|
> | 2002|2002|   1212|
> | 2002|2002|   1212|
> | 2002|2002|   1212|
> | 2002|2002|   1212|
> | 2002|2002|   1212|
> | 2002|2002|   1212|
> | 2002|2002|   1212|
> | 2002|2002|   1212|
> | 2002|2002|   1212|
> | 2002|2002|   1212|
> | 2002|2002|   1212|
> | 2002|2002|   1212|
> | 2002|2002|   1212|
> | 2002|2002|   1212|
> | 2002|2002|   1212|
> | 2002|2002|   1212|
> | 2002|2002|   1212|
> | 2002|2002|   1212|
> | 2002|2002|   1212|
> +-++---+
> only showing top 20 rows
> {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-32609) Incorrect exchange reuse with DataSourceV2

2020-08-13 Thread Rohit Mishra (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-32609?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17177246#comment-17177246
 ] 

Rohit Mishra commented on SPARK-32609:
--

[~mingjial], Please refrain from adding Priority as "Critical". These are 
reserved for committers. Changing it to "Major".

> Incorrect exchange reuse with DataSourceV2
> --
>
> Key: SPARK-32609
> URL: https://issues.apache.org/jira/browse/SPARK-32609
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.5
>Reporter: Mingjia Liu
>Priority: Critical
>  Labels: correctness
>   Original Estimate: 48h
>  Remaining Estimate: 48h
>
>  
> {code:java}
> spark.conf.set("spark.sql.exchange.reuse","true")
> spark.read.format("com.google.cloud.spark.bigquery.v2.BigQueryDataSourceV2").option("table",'tpcds_1G.date_dim').load()
> df.createOrReplaceTempView(table)
> 
> df = spark.sql(""" 
> WITH t1 AS (
> SELECT 
> d_year, d_month_seq
> FROM (
> SELECT t1.d_year , t2.d_month_seq  
> FROM 
> date_dim t1
> cross join
> date_dim t2
> where t1.d_day_name = "Monday" and t1.d_fy_year > 2000
> and t2.d_day_name = "Monday" and t2.d_fy_year > 2000
> )
> GROUP BY d_year, d_month_seq)
>
>  SELECT
> prev_yr.d_year AS prev_year, curr_yr.d_year AS year, curr_yr.d_month_seq
>  FROM t1 curr_yr cross join t1 prev_yr
>  WHERE curr_yr.d_year=2002 AND prev_yr.d_year=2002-1
>  ORDER BY d_month_seq
>  LIMIT 100
>  
>  """)
> df.explain()
> df.show(){code}
>  
> the repro query :
> A. defines a temp table t1  
> B. cross join t1 (year 2002)  and  t2 (year 2001)
> With reuse exchange enabled, the plan incorrectly "decides" to re-use 
> persisted shuffle writes of A filtering on year 2002 , for year 2001.
> {code:java}
> == Physical Plan ==
> TakeOrderedAndProject(limit=100, orderBy=[d_month_seq#24371L ASC NULLS 
> FIRST], output=[prev_year#24366L,year#24367L,d_month_seq#24371L])
> +- *(9) Project [d_year#24402L AS prev_year#24366L, d_year#23551L AS 
> year#24367L, d_month_seq#24371L]
>+- CartesianProduct
>   :- *(4) HashAggregate(keys=[d_year#23551L, d_month_seq#24371L], 
> functions=[])
>   :  +- Exchange hashpartitioning(d_year#23551L, d_month_seq#24371L, 200)
>   : +- *(3) HashAggregate(keys=[d_year#23551L, d_month_seq#24371L], 
> functions=[])
>   :+- BroadcastNestedLoopJoin BuildRight, Cross
>   :   :- *(1) Project [d_year#23551L]
>   :   :  +- *(1) ScanV2 BigQueryDataSourceV2[d_year#23551L] 
> (Filters: [isnotnull(d_day_name#23559), (d_day_name#23559 = Monday), 
> isnotnull(d_fy_year#23556L), (d_fy_yea..., Options: 
> [table=tpcds_1G.date_dim,paths=[]])
>   :   +- BroadcastExchange IdentityBroadcastMode
>   :  +- *(2) Project [d_month_seq#24371L]
>   : +- *(2) ScanV2 
> BigQueryDataSourceV2[d_month_seq#24371L] (Filters: 
> [isnotnull(d_day_name#24382), (d_day_name#24382 = Monday), 
> isnotnull(d_fy_year#24379L), (d_fy_yea..., Options: 
> [table=tpcds_1G.date_dim,paths=[]])
>   +- *(8) HashAggregate(keys=[d_year#24402L, d_month_seq#24455L], 
> functions=[])
>  +- ReusedExchange [d_year#24402L, d_month_seq#24455L], Exchange 
> hashpartitioning(d_year#23551L, d_month_seq#24371L, 200){code}
>  
>  
> And the result is obviously incorrect because prev_year should be 2001
> {code:java}
> +-++---+
> |prev_year|year|d_month_seq|
> +-++---+
> | 2002|2002|   1212|
> | 2002|2002|   1212|
> | 2002|2002|   1212|
> | 2002|2002|   1212|
> | 2002|2002|   1212|
> | 2002|2002|   1212|
> | 2002|2002|   1212|
> | 2002|2002|   1212|
> | 2002|2002|   1212|
> | 2002|2002|   1212|
> | 2002|2002|   1212|
> | 2002|2002|   1212|
> | 2002|2002|   1212|
> | 2002|2002|   1212|
> | 2002|2002|   1212|
> | 2002|2002|   1212|
> | 2002|2002|   1212|
> | 2002|2002|   1212|
> | 2002|2002|   1212|
> | 2002|2002|   1212|
> +-++---+
> only showing top 20 rows
> {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-32609) Incorrect exchange reuse with DataSourceV2

2020-08-13 Thread Mingjia Liu (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-32609?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17177231#comment-17177231
 ] 

Mingjia Liu commented on SPARK-32609:
-

I am currently working on a fix &  unit test

> Incorrect exchange reuse with DataSourceV2
> --
>
> Key: SPARK-32609
> URL: https://issues.apache.org/jira/browse/SPARK-32609
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.5
>Reporter: Mingjia Liu
>Priority: Critical
>  Labels: correctness
>   Original Estimate: 48h
>  Remaining Estimate: 48h
>
>  
> {code:java}
> spark.conf.set("spark.sql.exchange.reuse","true")
> spark.read.format("com.google.cloud.spark.bigquery.v2.BigQueryDataSourceV2").option("table",'tpcds_1G.date_dim').load()
> df.createOrReplaceTempView(table)
> 
> df = spark.sql(""" 
> WITH t1 AS (
> SELECT 
> d_year, d_month_seq
> FROM (
> SELECT t1.d_year , t2.d_month_seq  
> FROM 
> date_dim t1
> cross join
> date_dim t2
> where t1.d_day_name = "Monday" and t1.d_fy_year > 2000
> and t2.d_day_name = "Monday" and t2.d_fy_year > 2000
> )
> GROUP BY d_year, d_month_seq)
>
>  SELECT
> prev_yr.d_year AS prev_year, curr_yr.d_year AS year, curr_yr.d_month_seq
>  FROM t1 curr_yr cross join t1 prev_yr
>  WHERE curr_yr.d_year=2002 AND prev_yr.d_year=2002-1
>  ORDER BY d_month_seq
>  LIMIT 100
>  
>  """)
> df.explain()
> df.show(){code}
>  
> the repro query :
> A. defines a temp table t1  
> B. cross join t1 (year 2002)  and  t2 (year 2001)
> With reuse exchange enabled, the plan incorrectly "decides" to re-use 
> persisted shuffle writes of A filtering on year 2002 , for year 2001.
> {code:java}
> == Physical Plan ==
> TakeOrderedAndProject(limit=100, orderBy=[d_month_seq#24371L ASC NULLS 
> FIRST], output=[prev_year#24366L,year#24367L,d_month_seq#24371L])
> +- *(9) Project [d_year#24402L AS prev_year#24366L, d_year#23551L AS 
> year#24367L, d_month_seq#24371L]
>+- CartesianProduct
>   :- *(4) HashAggregate(keys=[d_year#23551L, d_month_seq#24371L], 
> functions=[])
>   :  +- Exchange hashpartitioning(d_year#23551L, d_month_seq#24371L, 200)
>   : +- *(3) HashAggregate(keys=[d_year#23551L, d_month_seq#24371L], 
> functions=[])
>   :+- BroadcastNestedLoopJoin BuildRight, Cross
>   :   :- *(1) Project [d_year#23551L]
>   :   :  +- *(1) ScanV2 BigQueryDataSourceV2[d_year#23551L] 
> (Filters: [isnotnull(d_day_name#23559), (d_day_name#23559 = Monday), 
> isnotnull(d_fy_year#23556L), (d_fy_yea..., Options: 
> [table=tpcds_1G.date_dim,paths=[]])
>   :   +- BroadcastExchange IdentityBroadcastMode
>   :  +- *(2) Project [d_month_seq#24371L]
>   : +- *(2) ScanV2 
> BigQueryDataSourceV2[d_month_seq#24371L] (Filters: 
> [isnotnull(d_day_name#24382), (d_day_name#24382 = Monday), 
> isnotnull(d_fy_year#24379L), (d_fy_yea..., Options: 
> [table=tpcds_1G.date_dim,paths=[]])
>   +- *(8) HashAggregate(keys=[d_year#24402L, d_month_seq#24455L], 
> functions=[])
>  +- ReusedExchange [d_year#24402L, d_month_seq#24455L], Exchange 
> hashpartitioning(d_year#23551L, d_month_seq#24371L, 200){code}
>  
>  
> And the result is obviously incorrect because prev_year should be 2001
> {code:java}
> +-++---+
> |prev_year|year|d_month_seq|
> +-++---+
> | 2002|2002|   1212|
> | 2002|2002|   1212|
> | 2002|2002|   1212|
> | 2002|2002|   1212|
> | 2002|2002|   1212|
> | 2002|2002|   1212|
> | 2002|2002|   1212|
> | 2002|2002|   1212|
> | 2002|2002|   1212|
> | 2002|2002|   1212|
> | 2002|2002|   1212|
> | 2002|2002|   1212|
> | 2002|2002|   1212|
> | 2002|2002|   1212|
> | 2002|2002|   1212|
> | 2002|2002|   1212|
> | 2002|2002|   1212|
> | 2002|2002|   1212|
> | 2002|2002|   1212|
> | 2002|2002|   1212|
> +-++---+
> only showing top 20 rows
> {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-32609) Incorrect exchange reuse with DataSourceV2

2020-08-13 Thread Mingjia Liu (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-32609?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17177199#comment-17177199
 ] 

Mingjia Liu commented on SPARK-32609:
-

Mitigation:

Turn off  spark.sql.exchange.reuse. eg: 
spark.conf.set("spark.sql.exchange.reuse", "false")

 

Root cause: 

bug at 
[https://github.com/apache/spark/blob/e5bef51826dc2ff4020879e35ae7eb9019aa7fcd/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala#L48]

 

Fix:

Add pushedfilters comparison in equals function. verified that applying the fix 
brings right plan and result.

 

> Incorrect exchange reuse with DataSourceV2
> --
>
> Key: SPARK-32609
> URL: https://issues.apache.org/jira/browse/SPARK-32609
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.5
>Reporter: Mingjia Liu
>Priority: Critical
>  Labels: correctness
>   Original Estimate: 48h
>  Remaining Estimate: 48h
>
>  
> {code:java}
> spark.conf.set("spark.sql.exchange.reuse","true")
> spark.read.format("com.google.cloud.spark.bigquery.v2.BigQueryDataSourceV2").option("table",'tpcds_1G.date_dim').load()
> df.createOrReplaceTempView(table)
> 
> df = spark.sql(""" 
> WITH t1 AS (
> SELECT 
> d_year, d_month_seq
> FROM (
> SELECT t1.d_year , t2.d_month_seq  
> FROM 
> date_dim t1
> cross join
> date_dim t2
> where t1.d_day_name = "Monday" and t1.d_fy_year > 2000
> and t2.d_day_name = "Monday" and t2.d_fy_year > 2000
> )
> GROUP BY d_year, d_month_seq)
>
>  SELECT
> prev_yr.d_year AS prev_year, curr_yr.d_year AS year, curr_yr.d_month_seq
>  FROM t1 curr_yr cross join t1 prev_yr
>  WHERE curr_yr.d_year=2002 AND prev_yr.d_year=2002-1
>  ORDER BY d_month_seq
>  LIMIT 100
>  
>  """)
> df.explain()
> df.show(){code}
>  
> the repro query :
> A. defines a temp table t1  
> B. cross join t1 (year 2002)  and  t2 (year 2001)
> With reuse exchange enabled, the plan incorrectly "decides" to re-use 
> persisted shuffle writes of A filtering on year 2002 , for year 2001.
> {code:java}
> == Physical Plan ==
> TakeOrderedAndProject(limit=100, orderBy=[d_month_seq#24371L ASC NULLS 
> FIRST], output=[prev_year#24366L,year#24367L,d_month_seq#24371L])
> +- *(9) Project [d_year#24402L AS prev_year#24366L, d_year#23551L AS 
> year#24367L, d_month_seq#24371L]
>+- CartesianProduct
>   :- *(4) HashAggregate(keys=[d_year#23551L, d_month_seq#24371L], 
> functions=[])
>   :  +- Exchange hashpartitioning(d_year#23551L, d_month_seq#24371L, 200)
>   : +- *(3) HashAggregate(keys=[d_year#23551L, d_month_seq#24371L], 
> functions=[])
>   :+- BroadcastNestedLoopJoin BuildRight, Cross
>   :   :- *(1) Project [d_year#23551L]
>   :   :  +- *(1) ScanV2 BigQueryDataSourceV2[d_year#23551L] 
> (Filters: [isnotnull(d_day_name#23559), (d_day_name#23559 = Monday), 
> isnotnull(d_fy_year#23556L), (d_fy_yea..., Options: 
> [table=tpcds_1G.date_dim,paths=[]])
>   :   +- BroadcastExchange IdentityBroadcastMode
>   :  +- *(2) Project [d_month_seq#24371L]
>   : +- *(2) ScanV2 
> BigQueryDataSourceV2[d_month_seq#24371L] (Filters: 
> [isnotnull(d_day_name#24382), (d_day_name#24382 = Monday), 
> isnotnull(d_fy_year#24379L), (d_fy_yea..., Options: 
> [table=tpcds_1G.date_dim,paths=[]])
>   +- *(8) HashAggregate(keys=[d_year#24402L, d_month_seq#24455L], 
> functions=[])
>  +- ReusedExchange [d_year#24402L, d_month_seq#24455L], Exchange 
> hashpartitioning(d_year#23551L, d_month_seq#24371L, 200){code}
>  
>  
> And the result is obviously incorrect because prev_year should be 2001
> {code:java}
> +-++---+
> |prev_year|year|d_month_seq|
> +-++---+
> | 2002|2002|   1212|
> | 2002|2002|   1212|
> | 2002|2002|   1212|
> | 2002|2002|   1212|
> | 2002|2002|   1212|
> | 2002|2002|   1212|
> | 2002|2002|   1212|
> | 2002|2002|   1212|
> | 2002|2002|   1212|
> | 2002|2002|   1212|
> | 2002|2002|   1212|
> | 2002|2002|   1212|
> | 2002|2002|   1212|
> | 2002|2002|   1212|
> | 2002|2002|   1212|
> | 2002|2002|   1212|
> | 2002|2002|   1212|
> | 2002|2002|   1212|
> | 2002|2002|   1212|
> | 2002|2002|   1212|
> +-++---+
> only showing top 20 rows
> {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org