[jira] [Updated] (SPARK-32708) Query optimization fails to reuse exchange with DataSourceV2

2020-09-21 Thread Dongjoon Hyun (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-32708?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dongjoon Hyun updated SPARK-32708:
--
Affects Version/s: (was: 2.4.8)
   2.4.7

> Query optimization fails to reuse exchange with DataSourceV2
> 
>
> Key: SPARK-32708
> URL: https://issues.apache.org/jira/browse/SPARK-32708
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.7
>Reporter: Mingjia Liu
>Assignee: Mingjia Liu
>Priority: Major
> Fix For: 2.4.8
>
>
> Repro query:
> {code:java}
> spark.conf.set("spark.sql.exchange.reuse","true")
> spark.read.format('parquet').load('gs://dataproc-kokoro-tests-us-central1/tpcds/1G/parquet/date_dim')
> #spark.read.format("com.google.cloud.spark.bigquery.v2.BigQueryDataSourceV2").option("table",'tpcds_1G.date_dim').load()
>  
> df.createOrReplaceTempView(table)
>  
> df = spark.sql(""" 
> WITH t1 AS (
>  SELECT 
>  d_year, d_month_seq
>  FROM (
>  SELECT t1.d_year , t2.d_month_seq 
>  FROM 
>  date_dim t1
>  cross join
>  date_dim t2
>  where t1.d_day_name = "Monday" and t1.d_fy_year > 2000
>  and t2.d_day_name = "Monday" and t2.d_fy_year > 2000
>  )
>  GROUP BY d_year, d_month_seq)
>  
>  SELECT
>  prev_yr.d_year AS prev_year, curr_yr.d_year AS year, curr_yr.d_month_seq
>  FROM t1 curr_yr cross join t1 prev_yr
>  WHERE curr_yr.d_year=2002 AND prev_yr.d_year=2002 
>  ORDER BY d_month_seq
>  LIMIT 100
>  
>  """)
> df.explain()
> #df.show()
> {code}
>  
> *The above query has different plans with Parquet and DataSourceV2. Both 
> plans are correct tho. However, the DataSourceV2 plan is less optimized :*
> *Sub-plan [5-7] is exactly the same as sub-plan [1-3]( Aggregate on BHJed 
> dataset of two tables that are filtered, projected the same way).* 
> *Therefore, in the below parquet plan, exchange that happens after [1-3] is 
> reused to replace [5-6].*
>  *However, the DataSourceV2 plan failed to do so.*
>  
> Parquet:
> {code:java}
> == Physical Plan ==
> TakeOrderedAndProject(limit=100, orderBy=[d_month_seq#21456L ASC NULLS 
> FIRST], output=[prev_year#21451L,year#21452L,d_month_seq#21456L])
> +- *(9) Project [d_year#21487L AS prev_year#21451L, d_year#20481L AS 
> year#21452L, d_month_seq#21456L]
>+- CartesianProduct
>   :- *(4) HashAggregate(keys=[d_year#20481L, d_month_seq#21456L], 
> functions=[])
>   :  +- Exchange hashpartitioning(d_year#20481L, d_month_seq#21456L, 200)
>   : +- *(3) HashAggregate(keys=[d_year#20481L, d_month_seq#21456L], 
> functions=[])
>   :+- BroadcastNestedLoopJoin BuildRight, Cross
>   :   :- *(1) Project [d_year#20481L]
>   :   :  +- *(1) Filter (isnotnull(d_year#20481L) && 
> isnotnull(d_day_name#20489)) && isnotnull(d_fy_year#20486L)) && 
> (d_day_name#20489 = Monday)) && (d_fy_year#20486L > 2000)) && (d_year#20481L 
> = 2002))
>   :   : +- *(1) FileScan parquet 
> [d_year#20481L,d_fy_year#20486L,d_day_name#20489] Batched: true, Format: 
> Parquet, Location: 
> InMemoryFileIndex[gs://dataproc-kokoro-tests-us-central1/tpcds/1G/parquet/date_dim],
>  PartitionFilters: [], PushedFilters: [IsNotNull(d_year), 
> IsNotNull(d_day_name), IsNotNull(d_fy_year), EqualTo(d_day_name,Monday), 
> Grea..., ReadSchema: struct
>   :   +- BroadcastExchange IdentityBroadcastMode
>   :  +- *(2) Project [d_month_seq#21456L]
>   : +- *(2) Filter (((isnotnull(d_day_name#21467) && 
> isnotnull(d_fy_year#21464L)) && (d_day_name#21467 = Monday)) && 
> (d_fy_year#21464L > 2000))
>   :+- *(2) FileScan parquet 
> [d_month_seq#21456L,d_fy_year#21464L,d_day_name#21467] Batched: true, Format: 
> Parquet, Location: 
> InMemoryFileIndex[gs://dataproc-kokoro-tests-us-central1/tpcds/1G/parquet/date_dim],
>  PartitionFilters: [], PushedFilters: [IsNotNull(d_day_name), 
> IsNotNull(d_fy_year), EqualTo(d_day_name,Monday), GreaterThan(d_fy_year,2..., 
> ReadSchema: struct
>   +- *(8) HashAggregate(keys=[d_year#21487L, d_month_seq#21540L], 
> functions=[])
>  +- ReusedExchange [d_year#21487L, d_month_seq#21540L], Exchange 
> hashpartitioning(d_year#20481L, d_month_seq#21456L, 200){code}
>  
> DataSourceV2:
> {code:java}
> == Physical Plan ==
>  TakeOrderedAndProject(limit=100, orderBy=d_month_seq#22325L ASC NULLS FIRST, 
> output=prev_year#22320L,year#22321L,d_month_seq#22325L)
>  +- *(9) Project d_year#22356L AS prev_year#22320L, d_year#21696L AS 
> year#22321L, d_month_seq#22325L
>  +- CartesianProduct
>  :- *(4) HashAggregate(keys=d_year#21696L, d_month_seq#22325L, functions=[])
>  : +- Exchange hashpartitioning(d_year#21696L, d_month_seq#22325L, 200)
>  : +- *(3) HashAggregate(keys=d_year#21696L, d_month_seq#22325L, functions=[])
>  : +

[jira] [Updated] (SPARK-32708) Query optimization fails to reuse exchange with DataSourceV2

2020-09-21 Thread Gengliang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-32708?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gengliang Wang updated SPARK-32708:
---
Affects Version/s: (was: 2.4.7)
   2.4.8

> Query optimization fails to reuse exchange with DataSourceV2
> 
>
> Key: SPARK-32708
> URL: https://issues.apache.org/jira/browse/SPARK-32708
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.8
>Reporter: Mingjia Liu
>Assignee: Mingjia Liu
>Priority: Major
>
> Repro query:
> {code:java}
> spark.conf.set("spark.sql.exchange.reuse","true")
> spark.read.format('parquet').load('gs://dataproc-kokoro-tests-us-central1/tpcds/1G/parquet/date_dim')
> #spark.read.format("com.google.cloud.spark.bigquery.v2.BigQueryDataSourceV2").option("table",'tpcds_1G.date_dim').load()
>  
> df.createOrReplaceTempView(table)
>  
> df = spark.sql(""" 
> WITH t1 AS (
>  SELECT 
>  d_year, d_month_seq
>  FROM (
>  SELECT t1.d_year , t2.d_month_seq 
>  FROM 
>  date_dim t1
>  cross join
>  date_dim t2
>  where t1.d_day_name = "Monday" and t1.d_fy_year > 2000
>  and t2.d_day_name = "Monday" and t2.d_fy_year > 2000
>  )
>  GROUP BY d_year, d_month_seq)
>  
>  SELECT
>  prev_yr.d_year AS prev_year, curr_yr.d_year AS year, curr_yr.d_month_seq
>  FROM t1 curr_yr cross join t1 prev_yr
>  WHERE curr_yr.d_year=2002 AND prev_yr.d_year=2002 
>  ORDER BY d_month_seq
>  LIMIT 100
>  
>  """)
> df.explain()
> #df.show()
> {code}
>  
> *The above query has different plans with Parquet and DataSourceV2. Both 
> plans are correct tho. However, the DataSourceV2 plan is less optimized :*
> *Sub-plan [5-7] is exactly the same as sub-plan [1-3]( Aggregate on BHJed 
> dataset of two tables that are filtered, projected the same way).* 
> *Therefore, in the below parquet plan, exchange that happens after [1-3] is 
> reused to replace [5-6].*
>  *However, the DataSourceV2 plan failed to do so.*
>  
> Parquet:
> {code:java}
> == Physical Plan ==
> TakeOrderedAndProject(limit=100, orderBy=[d_month_seq#21456L ASC NULLS 
> FIRST], output=[prev_year#21451L,year#21452L,d_month_seq#21456L])
> +- *(9) Project [d_year#21487L AS prev_year#21451L, d_year#20481L AS 
> year#21452L, d_month_seq#21456L]
>+- CartesianProduct
>   :- *(4) HashAggregate(keys=[d_year#20481L, d_month_seq#21456L], 
> functions=[])
>   :  +- Exchange hashpartitioning(d_year#20481L, d_month_seq#21456L, 200)
>   : +- *(3) HashAggregate(keys=[d_year#20481L, d_month_seq#21456L], 
> functions=[])
>   :+- BroadcastNestedLoopJoin BuildRight, Cross
>   :   :- *(1) Project [d_year#20481L]
>   :   :  +- *(1) Filter (isnotnull(d_year#20481L) && 
> isnotnull(d_day_name#20489)) && isnotnull(d_fy_year#20486L)) && 
> (d_day_name#20489 = Monday)) && (d_fy_year#20486L > 2000)) && (d_year#20481L 
> = 2002))
>   :   : +- *(1) FileScan parquet 
> [d_year#20481L,d_fy_year#20486L,d_day_name#20489] Batched: true, Format: 
> Parquet, Location: 
> InMemoryFileIndex[gs://dataproc-kokoro-tests-us-central1/tpcds/1G/parquet/date_dim],
>  PartitionFilters: [], PushedFilters: [IsNotNull(d_year), 
> IsNotNull(d_day_name), IsNotNull(d_fy_year), EqualTo(d_day_name,Monday), 
> Grea..., ReadSchema: struct
>   :   +- BroadcastExchange IdentityBroadcastMode
>   :  +- *(2) Project [d_month_seq#21456L]
>   : +- *(2) Filter (((isnotnull(d_day_name#21467) && 
> isnotnull(d_fy_year#21464L)) && (d_day_name#21467 = Monday)) && 
> (d_fy_year#21464L > 2000))
>   :+- *(2) FileScan parquet 
> [d_month_seq#21456L,d_fy_year#21464L,d_day_name#21467] Batched: true, Format: 
> Parquet, Location: 
> InMemoryFileIndex[gs://dataproc-kokoro-tests-us-central1/tpcds/1G/parquet/date_dim],
>  PartitionFilters: [], PushedFilters: [IsNotNull(d_day_name), 
> IsNotNull(d_fy_year), EqualTo(d_day_name,Monday), GreaterThan(d_fy_year,2..., 
> ReadSchema: struct
>   +- *(8) HashAggregate(keys=[d_year#21487L, d_month_seq#21540L], 
> functions=[])
>  +- ReusedExchange [d_year#21487L, d_month_seq#21540L], Exchange 
> hashpartitioning(d_year#20481L, d_month_seq#21456L, 200){code}
>  
> DataSourceV2:
> {code:java}
> == Physical Plan ==
>  TakeOrderedAndProject(limit=100, orderBy=d_month_seq#22325L ASC NULLS FIRST, 
> output=prev_year#22320L,year#22321L,d_month_seq#22325L)
>  +- *(9) Project d_year#22356L AS prev_year#22320L, d_year#21696L AS 
> year#22321L, d_month_seq#22325L
>  +- CartesianProduct
>  :- *(4) HashAggregate(keys=d_year#21696L, d_month_seq#22325L, functions=[])
>  : +- Exchange hashpartitioning(d_year#21696L, d_month_seq#22325L, 200)
>  : +- *(3) HashAggregate(keys=d_year#21696L, d_month_seq#22325L, functions=[])
>  : +- BroadcastNestedLoopJoin Bui

[jira] [Updated] (SPARK-32708) Query optimization fails to reuse exchange with DataSourceV2

2020-09-21 Thread Gengliang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-32708?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gengliang Wang updated SPARK-32708:
---
Fix Version/s: 2.4.8

> Query optimization fails to reuse exchange with DataSourceV2
> 
>
> Key: SPARK-32708
> URL: https://issues.apache.org/jira/browse/SPARK-32708
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.8
>Reporter: Mingjia Liu
>Assignee: Mingjia Liu
>Priority: Major
> Fix For: 2.4.8
>
>
> Repro query:
> {code:java}
> spark.conf.set("spark.sql.exchange.reuse","true")
> spark.read.format('parquet').load('gs://dataproc-kokoro-tests-us-central1/tpcds/1G/parquet/date_dim')
> #spark.read.format("com.google.cloud.spark.bigquery.v2.BigQueryDataSourceV2").option("table",'tpcds_1G.date_dim').load()
>  
> df.createOrReplaceTempView(table)
>  
> df = spark.sql(""" 
> WITH t1 AS (
>  SELECT 
>  d_year, d_month_seq
>  FROM (
>  SELECT t1.d_year , t2.d_month_seq 
>  FROM 
>  date_dim t1
>  cross join
>  date_dim t2
>  where t1.d_day_name = "Monday" and t1.d_fy_year > 2000
>  and t2.d_day_name = "Monday" and t2.d_fy_year > 2000
>  )
>  GROUP BY d_year, d_month_seq)
>  
>  SELECT
>  prev_yr.d_year AS prev_year, curr_yr.d_year AS year, curr_yr.d_month_seq
>  FROM t1 curr_yr cross join t1 prev_yr
>  WHERE curr_yr.d_year=2002 AND prev_yr.d_year=2002 
>  ORDER BY d_month_seq
>  LIMIT 100
>  
>  """)
> df.explain()
> #df.show()
> {code}
>  
> *The above query has different plans with Parquet and DataSourceV2. Both 
> plans are correct tho. However, the DataSourceV2 plan is less optimized :*
> *Sub-plan [5-7] is exactly the same as sub-plan [1-3]( Aggregate on BHJed 
> dataset of two tables that are filtered, projected the same way).* 
> *Therefore, in the below parquet plan, exchange that happens after [1-3] is 
> reused to replace [5-6].*
>  *However, the DataSourceV2 plan failed to do so.*
>  
> Parquet:
> {code:java}
> == Physical Plan ==
> TakeOrderedAndProject(limit=100, orderBy=[d_month_seq#21456L ASC NULLS 
> FIRST], output=[prev_year#21451L,year#21452L,d_month_seq#21456L])
> +- *(9) Project [d_year#21487L AS prev_year#21451L, d_year#20481L AS 
> year#21452L, d_month_seq#21456L]
>+- CartesianProduct
>   :- *(4) HashAggregate(keys=[d_year#20481L, d_month_seq#21456L], 
> functions=[])
>   :  +- Exchange hashpartitioning(d_year#20481L, d_month_seq#21456L, 200)
>   : +- *(3) HashAggregate(keys=[d_year#20481L, d_month_seq#21456L], 
> functions=[])
>   :+- BroadcastNestedLoopJoin BuildRight, Cross
>   :   :- *(1) Project [d_year#20481L]
>   :   :  +- *(1) Filter (isnotnull(d_year#20481L) && 
> isnotnull(d_day_name#20489)) && isnotnull(d_fy_year#20486L)) && 
> (d_day_name#20489 = Monday)) && (d_fy_year#20486L > 2000)) && (d_year#20481L 
> = 2002))
>   :   : +- *(1) FileScan parquet 
> [d_year#20481L,d_fy_year#20486L,d_day_name#20489] Batched: true, Format: 
> Parquet, Location: 
> InMemoryFileIndex[gs://dataproc-kokoro-tests-us-central1/tpcds/1G/parquet/date_dim],
>  PartitionFilters: [], PushedFilters: [IsNotNull(d_year), 
> IsNotNull(d_day_name), IsNotNull(d_fy_year), EqualTo(d_day_name,Monday), 
> Grea..., ReadSchema: struct
>   :   +- BroadcastExchange IdentityBroadcastMode
>   :  +- *(2) Project [d_month_seq#21456L]
>   : +- *(2) Filter (((isnotnull(d_day_name#21467) && 
> isnotnull(d_fy_year#21464L)) && (d_day_name#21467 = Monday)) && 
> (d_fy_year#21464L > 2000))
>   :+- *(2) FileScan parquet 
> [d_month_seq#21456L,d_fy_year#21464L,d_day_name#21467] Batched: true, Format: 
> Parquet, Location: 
> InMemoryFileIndex[gs://dataproc-kokoro-tests-us-central1/tpcds/1G/parquet/date_dim],
>  PartitionFilters: [], PushedFilters: [IsNotNull(d_day_name), 
> IsNotNull(d_fy_year), EqualTo(d_day_name,Monday), GreaterThan(d_fy_year,2..., 
> ReadSchema: struct
>   +- *(8) HashAggregate(keys=[d_year#21487L, d_month_seq#21540L], 
> functions=[])
>  +- ReusedExchange [d_year#21487L, d_month_seq#21540L], Exchange 
> hashpartitioning(d_year#20481L, d_month_seq#21456L, 200){code}
>  
> DataSourceV2:
> {code:java}
> == Physical Plan ==
>  TakeOrderedAndProject(limit=100, orderBy=d_month_seq#22325L ASC NULLS FIRST, 
> output=prev_year#22320L,year#22321L,d_month_seq#22325L)
>  +- *(9) Project d_year#22356L AS prev_year#22320L, d_year#21696L AS 
> year#22321L, d_month_seq#22325L
>  +- CartesianProduct
>  :- *(4) HashAggregate(keys=d_year#21696L, d_month_seq#22325L, functions=[])
>  : +- Exchange hashpartitioning(d_year#21696L, d_month_seq#22325L, 200)
>  : +- *(3) HashAggregate(keys=d_year#21696L, d_month_seq#22325L, functions=[])
>  : +- BroadcastNestedLoopJoin BuildRight, Cros

[jira] [Updated] (SPARK-32708) Query optimization fails to reuse exchange with DataSourceV2

2020-09-14 Thread Gengliang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-32708?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gengliang Wang updated SPARK-32708:
---
Affects Version/s: (was: 2.4.6)

> Query optimization fails to reuse exchange with DataSourceV2
> 
>
> Key: SPARK-32708
> URL: https://issues.apache.org/jira/browse/SPARK-32708
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.7
>Reporter: Mingjia Liu
>Assignee: Mingjia Liu
>Priority: Major
>
> Repro query:
> {code:java}
> spark.conf.set("spark.sql.exchange.reuse","true")
> spark.read.format('parquet').load('gs://dataproc-kokoro-tests-us-central1/tpcds/1G/parquet/date_dim')
> #spark.read.format("com.google.cloud.spark.bigquery.v2.BigQueryDataSourceV2").option("table",'tpcds_1G.date_dim').load()
>  
> df.createOrReplaceTempView(table)
>  
> df = spark.sql(""" 
> WITH t1 AS (
>  SELECT 
>  d_year, d_month_seq
>  FROM (
>  SELECT t1.d_year , t2.d_month_seq 
>  FROM 
>  date_dim t1
>  cross join
>  date_dim t2
>  where t1.d_day_name = "Monday" and t1.d_fy_year > 2000
>  and t2.d_day_name = "Monday" and t2.d_fy_year > 2000
>  )
>  GROUP BY d_year, d_month_seq)
>  
>  SELECT
>  prev_yr.d_year AS prev_year, curr_yr.d_year AS year, curr_yr.d_month_seq
>  FROM t1 curr_yr cross join t1 prev_yr
>  WHERE curr_yr.d_year=2002 AND prev_yr.d_year=2002 
>  ORDER BY d_month_seq
>  LIMIT 100
>  
>  """)
> df.explain()
> #df.show()
> {code}
>  
> *The above query has different plans with Parquet and DataSourceV2. Both 
> plans are correct tho. However, the DataSourceV2 plan is less optimized :*
> *Sub-plan [5-7] is exactly the same as sub-plan [1-3]( Aggregate on BHJed 
> dataset of two tables that are filtered, projected the same way).* 
> *Therefore, in the below parquet plan, exchange that happens after [1-3] is 
> reused to replace [5-6].*
>  *However, the DataSourceV2 plan failed to do so.*
>  
> Parquet:
> {code:java}
> == Physical Plan ==
> TakeOrderedAndProject(limit=100, orderBy=[d_month_seq#21456L ASC NULLS 
> FIRST], output=[prev_year#21451L,year#21452L,d_month_seq#21456L])
> +- *(9) Project [d_year#21487L AS prev_year#21451L, d_year#20481L AS 
> year#21452L, d_month_seq#21456L]
>+- CartesianProduct
>   :- *(4) HashAggregate(keys=[d_year#20481L, d_month_seq#21456L], 
> functions=[])
>   :  +- Exchange hashpartitioning(d_year#20481L, d_month_seq#21456L, 200)
>   : +- *(3) HashAggregate(keys=[d_year#20481L, d_month_seq#21456L], 
> functions=[])
>   :+- BroadcastNestedLoopJoin BuildRight, Cross
>   :   :- *(1) Project [d_year#20481L]
>   :   :  +- *(1) Filter (isnotnull(d_year#20481L) && 
> isnotnull(d_day_name#20489)) && isnotnull(d_fy_year#20486L)) && 
> (d_day_name#20489 = Monday)) && (d_fy_year#20486L > 2000)) && (d_year#20481L 
> = 2002))
>   :   : +- *(1) FileScan parquet 
> [d_year#20481L,d_fy_year#20486L,d_day_name#20489] Batched: true, Format: 
> Parquet, Location: 
> InMemoryFileIndex[gs://dataproc-kokoro-tests-us-central1/tpcds/1G/parquet/date_dim],
>  PartitionFilters: [], PushedFilters: [IsNotNull(d_year), 
> IsNotNull(d_day_name), IsNotNull(d_fy_year), EqualTo(d_day_name,Monday), 
> Grea..., ReadSchema: struct
>   :   +- BroadcastExchange IdentityBroadcastMode
>   :  +- *(2) Project [d_month_seq#21456L]
>   : +- *(2) Filter (((isnotnull(d_day_name#21467) && 
> isnotnull(d_fy_year#21464L)) && (d_day_name#21467 = Monday)) && 
> (d_fy_year#21464L > 2000))
>   :+- *(2) FileScan parquet 
> [d_month_seq#21456L,d_fy_year#21464L,d_day_name#21467] Batched: true, Format: 
> Parquet, Location: 
> InMemoryFileIndex[gs://dataproc-kokoro-tests-us-central1/tpcds/1G/parquet/date_dim],
>  PartitionFilters: [], PushedFilters: [IsNotNull(d_day_name), 
> IsNotNull(d_fy_year), EqualTo(d_day_name,Monday), GreaterThan(d_fy_year,2..., 
> ReadSchema: struct
>   +- *(8) HashAggregate(keys=[d_year#21487L, d_month_seq#21540L], 
> functions=[])
>  +- ReusedExchange [d_year#21487L, d_month_seq#21540L], Exchange 
> hashpartitioning(d_year#20481L, d_month_seq#21456L, 200){code}
>  
> DataSourceV2:
> {code:java}
> == Physical Plan ==
>  TakeOrderedAndProject(limit=100, orderBy=d_month_seq#22325L ASC NULLS FIRST, 
> output=prev_year#22320L,year#22321L,d_month_seq#22325L)
>  +- *(9) Project d_year#22356L AS prev_year#22320L, d_year#21696L AS 
> year#22321L, d_month_seq#22325L
>  +- CartesianProduct
>  :- *(4) HashAggregate(keys=d_year#21696L, d_month_seq#22325L, functions=[])
>  : +- Exchange hashpartitioning(d_year#21696L, d_month_seq#22325L, 200)
>  : +- *(3) HashAggregate(keys=d_year#21696L, d_month_seq#22325L, functions=[])
>  : +- BroadcastNestedLoopJoin BuildRight, Cross
>  : :- *(1) P

[jira] [Updated] (SPARK-32708) Query optimization fails to reuse exchange with DataSourceV2

2020-08-27 Thread Takeshi Yamamuro (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-32708?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Takeshi Yamamuro updated SPARK-32708:
-
Affects Version/s: 2.4.7

> Query optimization fails to reuse exchange with DataSourceV2
> 
>
> Key: SPARK-32708
> URL: https://issues.apache.org/jira/browse/SPARK-32708
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.6, 2.4.7
>Reporter: Mingjia Liu
>Priority: Major
>
> Repro query:
> {code:java}
> spark.conf.set("spark.sql.exchange.reuse","true")
> spark.read.format('parquet').load('gs://dataproc-kokoro-tests-us-central1/tpcds/1G/parquet/date_dim')
> #spark.read.format("com.google.cloud.spark.bigquery.v2.BigQueryDataSourceV2").option("table",'tpcds_1G.date_dim').load()
>  
> df.createOrReplaceTempView(table)
>  
> df = spark.sql(""" 
> WITH t1 AS (
>  SELECT 
>  d_year, d_month_seq
>  FROM (
>  SELECT t1.d_year , t2.d_month_seq 
>  FROM 
>  date_dim t1
>  cross join
>  date_dim t2
>  where t1.d_day_name = "Monday" and t1.d_fy_year > 2000
>  and t2.d_day_name = "Monday" and t2.d_fy_year > 2000
>  )
>  GROUP BY d_year, d_month_seq)
>  
>  SELECT
>  prev_yr.d_year AS prev_year, curr_yr.d_year AS year, curr_yr.d_month_seq
>  FROM t1 curr_yr cross join t1 prev_yr
>  WHERE curr_yr.d_year=2002 AND prev_yr.d_year=2002 
>  ORDER BY d_month_seq
>  LIMIT 100
>  
>  """)
> df.explain()
> #df.show()
> {code}
>  
> *The above query has different plans with Parquet and DataSourceV2. Both 
> plans are correct tho. However, the DataSourceV2 plan is less optimized :*
> *Sub-plan [5-7] is exactly the same as sub-plan [1-3]( Aggregate on BHJed 
> dataset of two tables that are filtered, projected the same way).* 
> *Therefore, in the below parquet plan, exchange that happens after [1-3] is 
> reused to replace [5-6].*
>  *However, the DataSourceV2 plan failed to do so.*
>  
> Parquet:
> {code:java}
> == Physical Plan ==
> TakeOrderedAndProject(limit=100, orderBy=[d_month_seq#21456L ASC NULLS 
> FIRST], output=[prev_year#21451L,year#21452L,d_month_seq#21456L])
> +- *(9) Project [d_year#21487L AS prev_year#21451L, d_year#20481L AS 
> year#21452L, d_month_seq#21456L]
>+- CartesianProduct
>   :- *(4) HashAggregate(keys=[d_year#20481L, d_month_seq#21456L], 
> functions=[])
>   :  +- Exchange hashpartitioning(d_year#20481L, d_month_seq#21456L, 200)
>   : +- *(3) HashAggregate(keys=[d_year#20481L, d_month_seq#21456L], 
> functions=[])
>   :+- BroadcastNestedLoopJoin BuildRight, Cross
>   :   :- *(1) Project [d_year#20481L]
>   :   :  +- *(1) Filter (isnotnull(d_year#20481L) && 
> isnotnull(d_day_name#20489)) && isnotnull(d_fy_year#20486L)) && 
> (d_day_name#20489 = Monday)) && (d_fy_year#20486L > 2000)) && (d_year#20481L 
> = 2002))
>   :   : +- *(1) FileScan parquet 
> [d_year#20481L,d_fy_year#20486L,d_day_name#20489] Batched: true, Format: 
> Parquet, Location: 
> InMemoryFileIndex[gs://dataproc-kokoro-tests-us-central1/tpcds/1G/parquet/date_dim],
>  PartitionFilters: [], PushedFilters: [IsNotNull(d_year), 
> IsNotNull(d_day_name), IsNotNull(d_fy_year), EqualTo(d_day_name,Monday), 
> Grea..., ReadSchema: struct
>   :   +- BroadcastExchange IdentityBroadcastMode
>   :  +- *(2) Project [d_month_seq#21456L]
>   : +- *(2) Filter (((isnotnull(d_day_name#21467) && 
> isnotnull(d_fy_year#21464L)) && (d_day_name#21467 = Monday)) && 
> (d_fy_year#21464L > 2000))
>   :+- *(2) FileScan parquet 
> [d_month_seq#21456L,d_fy_year#21464L,d_day_name#21467] Batched: true, Format: 
> Parquet, Location: 
> InMemoryFileIndex[gs://dataproc-kokoro-tests-us-central1/tpcds/1G/parquet/date_dim],
>  PartitionFilters: [], PushedFilters: [IsNotNull(d_day_name), 
> IsNotNull(d_fy_year), EqualTo(d_day_name,Monday), GreaterThan(d_fy_year,2..., 
> ReadSchema: struct
>   +- *(8) HashAggregate(keys=[d_year#21487L, d_month_seq#21540L], 
> functions=[])
>  +- ReusedExchange [d_year#21487L, d_month_seq#21540L], Exchange 
> hashpartitioning(d_year#20481L, d_month_seq#21456L, 200){code}
>  
> DataSourceV2:
> {code:java}
> == Physical Plan ==
>  TakeOrderedAndProject(limit=100, orderBy=d_month_seq#22325L ASC NULLS FIRST, 
> output=prev_year#22320L,year#22321L,d_month_seq#22325L)
>  +- *(9) Project d_year#22356L AS prev_year#22320L, d_year#21696L AS 
> year#22321L, d_month_seq#22325L
>  +- CartesianProduct
>  :- *(4) HashAggregate(keys=d_year#21696L, d_month_seq#22325L, functions=[])
>  : +- Exchange hashpartitioning(d_year#21696L, d_month_seq#22325L, 200)
>  : +- *(3) HashAggregate(keys=d_year#21696L, d_month_seq#22325L, functions=[])
>  : +- BroadcastNestedLoopJoin BuildRight, Cross
>  : :- *(1) Project d_year#21696L
>  : : +- *(1)

[jira] [Updated] (SPARK-32708) Query optimization fails to reuse exchange with DataSourceV2

2020-08-27 Thread Takeshi Yamamuro (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-32708?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Takeshi Yamamuro updated SPARK-32708:
-
Affects Version/s: (was: 2.4.7)
   (was: 2.4.5)
   (was: 2.4.4)
   (was: 2.4.3)
   (was: 2.4.2)
   (was: 2.4.1)
   (was: 2.4.0)

> Query optimization fails to reuse exchange with DataSourceV2
> 
>
> Key: SPARK-32708
> URL: https://issues.apache.org/jira/browse/SPARK-32708
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.6
>Reporter: Mingjia Liu
>Priority: Major
>
> Repro query:
> {code:java}
> spark.conf.set("spark.sql.exchange.reuse","true")
> spark.read.format('parquet').load('gs://dataproc-kokoro-tests-us-central1/tpcds/1G/parquet/date_dim')
> #spark.read.format("com.google.cloud.spark.bigquery.v2.BigQueryDataSourceV2").option("table",'tpcds_1G.date_dim').load()
>  
> df.createOrReplaceTempView(table)
>  
> df = spark.sql(""" 
> WITH t1 AS (
>  SELECT 
>  d_year, d_month_seq
>  FROM (
>  SELECT t1.d_year , t2.d_month_seq 
>  FROM 
>  date_dim t1
>  cross join
>  date_dim t2
>  where t1.d_day_name = "Monday" and t1.d_fy_year > 2000
>  and t2.d_day_name = "Monday" and t2.d_fy_year > 2000
>  )
>  GROUP BY d_year, d_month_seq)
>  
>  SELECT
>  prev_yr.d_year AS prev_year, curr_yr.d_year AS year, curr_yr.d_month_seq
>  FROM t1 curr_yr cross join t1 prev_yr
>  WHERE curr_yr.d_year=2002 AND prev_yr.d_year=2002 
>  ORDER BY d_month_seq
>  LIMIT 100
>  
>  """)
> df.explain()
> #df.show()
> {code}
>  
> *The above query has different plans with Parquet and DataSourceV2. Both 
> plans are correct tho. However, the DataSourceV2 plan is less optimized :*
> *Sub-plan [5-7] is exactly the same as sub-plan [1-3]( Aggregate on BHJed 
> dataset of two tables that are filtered, projected the same way).* 
> *Therefore, in the below parquet plan, exchange that happens after [1-3] is 
> reused to replace [5-6].*
>  *However, the DataSourceV2 plan failed to do so.*
>  
> Parquet:
> {code:java}
> == Physical Plan ==
> TakeOrderedAndProject(limit=100, orderBy=[d_month_seq#21456L ASC NULLS 
> FIRST], output=[prev_year#21451L,year#21452L,d_month_seq#21456L])
> +- *(9) Project [d_year#21487L AS prev_year#21451L, d_year#20481L AS 
> year#21452L, d_month_seq#21456L]
>+- CartesianProduct
>   :- *(4) HashAggregate(keys=[d_year#20481L, d_month_seq#21456L], 
> functions=[])
>   :  +- Exchange hashpartitioning(d_year#20481L, d_month_seq#21456L, 200)
>   : +- *(3) HashAggregate(keys=[d_year#20481L, d_month_seq#21456L], 
> functions=[])
>   :+- BroadcastNestedLoopJoin BuildRight, Cross
>   :   :- *(1) Project [d_year#20481L]
>   :   :  +- *(1) Filter (isnotnull(d_year#20481L) && 
> isnotnull(d_day_name#20489)) && isnotnull(d_fy_year#20486L)) && 
> (d_day_name#20489 = Monday)) && (d_fy_year#20486L > 2000)) && (d_year#20481L 
> = 2002))
>   :   : +- *(1) FileScan parquet 
> [d_year#20481L,d_fy_year#20486L,d_day_name#20489] Batched: true, Format: 
> Parquet, Location: 
> InMemoryFileIndex[gs://dataproc-kokoro-tests-us-central1/tpcds/1G/parquet/date_dim],
>  PartitionFilters: [], PushedFilters: [IsNotNull(d_year), 
> IsNotNull(d_day_name), IsNotNull(d_fy_year), EqualTo(d_day_name,Monday), 
> Grea..., ReadSchema: struct
>   :   +- BroadcastExchange IdentityBroadcastMode
>   :  +- *(2) Project [d_month_seq#21456L]
>   : +- *(2) Filter (((isnotnull(d_day_name#21467) && 
> isnotnull(d_fy_year#21464L)) && (d_day_name#21467 = Monday)) && 
> (d_fy_year#21464L > 2000))
>   :+- *(2) FileScan parquet 
> [d_month_seq#21456L,d_fy_year#21464L,d_day_name#21467] Batched: true, Format: 
> Parquet, Location: 
> InMemoryFileIndex[gs://dataproc-kokoro-tests-us-central1/tpcds/1G/parquet/date_dim],
>  PartitionFilters: [], PushedFilters: [IsNotNull(d_day_name), 
> IsNotNull(d_fy_year), EqualTo(d_day_name,Monday), GreaterThan(d_fy_year,2..., 
> ReadSchema: struct
>   +- *(8) HashAggregate(keys=[d_year#21487L, d_month_seq#21540L], 
> functions=[])
>  +- ReusedExchange [d_year#21487L, d_month_seq#21540L], Exchange 
> hashpartitioning(d_year#20481L, d_month_seq#21456L, 200){code}
>  
> DataSourceV2:
> {code:java}
> == Physical Plan ==
>  TakeOrderedAndProject(limit=100, orderBy=d_month_seq#22325L ASC NULLS FIRST, 
> output=prev_year#22320L,year#22321L,d_month_seq#22325L)
>  +- *(9) Project d_year#22356L AS prev_year#22320L, d_year#21696L AS 
> year#22321L, d_month_seq#22325L
>  +- CartesianProduct
>  :- *(4) HashAggregate(keys=d_year#21696L, d_month_seq#22325L, functions=[])
>  : +- Ex

[jira] [Updated] (SPARK-32708) Query optimization fails to reuse exchange with DataSourceV2

2020-08-26 Thread Mingjia Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-32708?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mingjia Liu updated SPARK-32708:

Description: 
Repro query:
{code:java}
spark.conf.set("spark.sql.exchange.reuse","true")
spark.read.format('parquet').load('gs://dataproc-kokoro-tests-us-central1/tpcds/1G/parquet/date_dim')
#spark.read.format("com.google.cloud.spark.bigquery.v2.BigQueryDataSourceV2").option("table",'tpcds_1G.date_dim').load()
 
df.createOrReplaceTempView(table)
 
df = spark.sql(""" 
WITH t1 AS (
 SELECT 
 d_year, d_month_seq
 FROM (
 SELECT t1.d_year , t2.d_month_seq 
 FROM 
 date_dim t1
 cross join
 date_dim t2
 where t1.d_day_name = "Monday" and t1.d_fy_year > 2000
 and t2.d_day_name = "Monday" and t2.d_fy_year > 2000
 )
 GROUP BY d_year, d_month_seq)
 
 SELECT
 prev_yr.d_year AS prev_year, curr_yr.d_year AS year, curr_yr.d_month_seq
 FROM t1 curr_yr cross join t1 prev_yr
 WHERE curr_yr.d_year=2002 AND prev_yr.d_year=2002 
 ORDER BY d_month_seq
 LIMIT 100
 
 """)
df.explain()
#df.show()
{code}
 

*The above query has different plans with Parquet and DataSourceV2. Both plans 
are correct tho. However, the DataSourceV2 plan is less optimized :*

*Sub-plan [5-7] is exactly the same as sub-plan [1-3]( Aggregate on BHJed 
dataset of two tables that are filtered, projected the same way).* 

*Therefore, in the below parquet plan, exchange that happens after [1-3] is 
reused to replace [5-6].*

 *However, the DataSourceV2 plan failed to do so.*

 

Parquet:
{code:java}
== Physical Plan ==
TakeOrderedAndProject(limit=100, orderBy=[d_month_seq#21456L ASC NULLS FIRST], 
output=[prev_year#21451L,year#21452L,d_month_seq#21456L])
+- *(9) Project [d_year#21487L AS prev_year#21451L, d_year#20481L AS 
year#21452L, d_month_seq#21456L]
   +- CartesianProduct
  :- *(4) HashAggregate(keys=[d_year#20481L, d_month_seq#21456L], 
functions=[])
  :  +- Exchange hashpartitioning(d_year#20481L, d_month_seq#21456L, 200)
  : +- *(3) HashAggregate(keys=[d_year#20481L, d_month_seq#21456L], 
functions=[])
  :+- BroadcastNestedLoopJoin BuildRight, Cross
  :   :- *(1) Project [d_year#20481L]
  :   :  +- *(1) Filter (isnotnull(d_year#20481L) && 
isnotnull(d_day_name#20489)) && isnotnull(d_fy_year#20486L)) && 
(d_day_name#20489 = Monday)) && (d_fy_year#20486L > 2000)) && (d_year#20481L = 
2002))
  :   : +- *(1) FileScan parquet 
[d_year#20481L,d_fy_year#20486L,d_day_name#20489] Batched: true, Format: 
Parquet, Location: 
InMemoryFileIndex[gs://dataproc-kokoro-tests-us-central1/tpcds/1G/parquet/date_dim],
 PartitionFilters: [], PushedFilters: [IsNotNull(d_year), 
IsNotNull(d_day_name), IsNotNull(d_fy_year), EqualTo(d_day_name,Monday), 
Grea..., ReadSchema: struct
  :   +- BroadcastExchange IdentityBroadcastMode
  :  +- *(2) Project [d_month_seq#21456L]
  : +- *(2) Filter (((isnotnull(d_day_name#21467) && 
isnotnull(d_fy_year#21464L)) && (d_day_name#21467 = Monday)) && 
(d_fy_year#21464L > 2000))
  :+- *(2) FileScan parquet 
[d_month_seq#21456L,d_fy_year#21464L,d_day_name#21467] Batched: true, Format: 
Parquet, Location: 
InMemoryFileIndex[gs://dataproc-kokoro-tests-us-central1/tpcds/1G/parquet/date_dim],
 PartitionFilters: [], PushedFilters: [IsNotNull(d_day_name), 
IsNotNull(d_fy_year), EqualTo(d_day_name,Monday), GreaterThan(d_fy_year,2..., 
ReadSchema: struct
  +- *(8) HashAggregate(keys=[d_year#21487L, d_month_seq#21540L], 
functions=[])
 +- ReusedExchange [d_year#21487L, d_month_seq#21540L], Exchange 
hashpartitioning(d_year#20481L, d_month_seq#21456L, 200){code}
 

DataSourceV2:
{code:java}
== Physical Plan ==
 TakeOrderedAndProject(limit=100, orderBy=d_month_seq#22325L ASC NULLS FIRST, 
output=prev_year#22320L,year#22321L,d_month_seq#22325L)
 +- *(9) Project d_year#22356L AS prev_year#22320L, d_year#21696L AS 
year#22321L, d_month_seq#22325L
 +- CartesianProduct
 :- *(4) HashAggregate(keys=d_year#21696L, d_month_seq#22325L, functions=[])
 : +- Exchange hashpartitioning(d_year#21696L, d_month_seq#22325L, 200)
 : +- *(3) HashAggregate(keys=d_year#21696L, d_month_seq#22325L, functions=[])
 : +- BroadcastNestedLoopJoin BuildRight, Cross
 : :- *(1) Project d_year#21696L
 : : +- *(1) ScanV2 BigQueryDataSourceV2d_year#21696L (Filters: 
[isnotnull(d_day_name#21704), (d_day_name#21704 = Monday), 
isnotnull(d_fy_year#21701L), (d_fy_yea..., Options: 
[table=tpcds_1G.date_dim,paths=[]])
 : +- BroadcastExchange IdentityBroadcastMode
 : +- *(2) Project d_month_seq#22325L
 : +- *(2) ScanV2 BigQueryDataSourceV2d_month_seq#22325L (Filters: 
[isnotnull(d_day_name#22336), (d_day_name#22336 = Monday), 
isnotnull(d_fy_year#22333L), (d_fy_yea..., Options: 
[table=tpcds_1G.date_dim,paths=[]])
 +- *(8) HashAggregate(keys=d_year#22356L, d_month_seq#22409L, functions=[])
 +- Exchange hashpartitioning(d_year#22356L, d_month_seq

[jira] [Updated] (SPARK-32708) Query optimization fails to reuse exchange with DataSourceV2

2020-08-26 Thread Mingjia Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-32708?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mingjia Liu updated SPARK-32708:

Description: 
Repro query:
{code:java}
spark.conf.set("spark.sql.exchange.reuse","true")
spark.read.format('parquet').load('gs://dataproc-kokoro-tests-us-central1/tpcds/1G/parquet/date_dim')
#spark.read.format("com.google.cloud.spark.bigquery.v2.BigQueryDataSourceV2").option("table",'tpcds_1G.date_dim').load()
 
df.createOrReplaceTempView(table)
 
df = spark.sql(""" 
WITH t1 AS (
 SELECT 
 d_year, d_month_seq
 FROM (
 SELECT t1.d_year , t2.d_month_seq 
 FROM 
 date_dim t1
 cross join
 date_dim t2
 where t1.d_day_name = "Monday" and t1.d_fy_year > 2000
 and t2.d_day_name = "Monday" and t2.d_fy_year > 2000
 )
 GROUP BY d_year, d_month_seq)
 
 SELECT
 prev_yr.d_year AS prev_year, curr_yr.d_year AS year, curr_yr.d_month_seq
 FROM t1 curr_yr cross join t1 prev_yr
 WHERE curr_yr.d_year=2002 AND prev_yr.d_year=2002 
 ORDER BY d_month_seq
 LIMIT 100
 
 """)
df.explain()
#df.show()
{code}
 

The above query has different plans with Parquet and DataSourceV2. Both plans 
are correct tho. However, the DataSourceV2 plan is less optimized :

Sub-plan [5-7] is exactly the same as sub-plan [1-3]. Therefore, the below 
parquet plan reused shuffle writes after [1-3]. However, the datasource V2 plan 
failed to do so.

Parquet:
{code:java}
== Physical Plan ==
TakeOrderedAndProject(limit=100, orderBy=[d_month_seq#21456L ASC NULLS FIRST], 
output=[prev_year#21451L,year#21452L,d_month_seq#21456L])
+- *(9) Project [d_year#21487L AS prev_year#21451L, d_year#20481L AS 
year#21452L, d_month_seq#21456L]
   +- CartesianProduct
  :- *(4) HashAggregate(keys=[d_year#20481L, d_month_seq#21456L], 
functions=[])
  :  +- Exchange hashpartitioning(d_year#20481L, d_month_seq#21456L, 200)
  : +- *(3) HashAggregate(keys=[d_year#20481L, d_month_seq#21456L], 
functions=[])
  :+- BroadcastNestedLoopJoin BuildRight, Cross
  :   :- *(1) Project [d_year#20481L]
  :   :  +- *(1) Filter (isnotnull(d_year#20481L) && 
isnotnull(d_day_name#20489)) && isnotnull(d_fy_year#20486L)) && 
(d_day_name#20489 = Monday)) && (d_fy_year#20486L > 2000)) && (d_year#20481L = 
2002))
  :   : +- *(1) FileScan parquet 
[d_year#20481L,d_fy_year#20486L,d_day_name#20489] Batched: true, Format: 
Parquet, Location: 
InMemoryFileIndex[gs://dataproc-kokoro-tests-us-central1/tpcds/1G/parquet/date_dim],
 PartitionFilters: [], PushedFilters: [IsNotNull(d_year), 
IsNotNull(d_day_name), IsNotNull(d_fy_year), EqualTo(d_day_name,Monday), 
Grea..., ReadSchema: struct
  :   +- BroadcastExchange IdentityBroadcastMode
  :  +- *(2) Project [d_month_seq#21456L]
  : +- *(2) Filter (((isnotnull(d_day_name#21467) && 
isnotnull(d_fy_year#21464L)) && (d_day_name#21467 = Monday)) && 
(d_fy_year#21464L > 2000))
  :+- *(2) FileScan parquet 
[d_month_seq#21456L,d_fy_year#21464L,d_day_name#21467] Batched: true, Format: 
Parquet, Location: 
InMemoryFileIndex[gs://dataproc-kokoro-tests-us-central1/tpcds/1G/parquet/date_dim],
 PartitionFilters: [], PushedFilters: [IsNotNull(d_day_name), 
IsNotNull(d_fy_year), EqualTo(d_day_name,Monday), GreaterThan(d_fy_year,2..., 
ReadSchema: struct
  +- *(8) HashAggregate(keys=[d_year#21487L, d_month_seq#21540L], 
functions=[])
 +- ReusedExchange [d_year#21487L, d_month_seq#21540L], Exchange 
hashpartitioning(d_year#20481L, d_month_seq#21456L, 200){code}
 

DataSourceV2:
{code:java}
== Physical Plan ==
 TakeOrderedAndProject(limit=100, orderBy=d_month_seq#22325L ASC NULLS FIRST, 
output=prev_year#22320L,year#22321L,d_month_seq#22325L)
 +- *(9) Project d_year#22356L AS prev_year#22320L, d_year#21696L AS 
year#22321L, d_month_seq#22325L
 +- CartesianProduct
 :- *(4) HashAggregate(keys=d_year#21696L, d_month_seq#22325L, functions=[])
 : +- Exchange hashpartitioning(d_year#21696L, d_month_seq#22325L, 200)
 : +- *(3) HashAggregate(keys=d_year#21696L, d_month_seq#22325L, functions=[])
 : +- BroadcastNestedLoopJoin BuildRight, Cross
 : :- *(1) Project d_year#21696L
 : : +- *(1) ScanV2 BigQueryDataSourceV2d_year#21696L (Filters: 
[isnotnull(d_day_name#21704), (d_day_name#21704 = Monday), 
isnotnull(d_fy_year#21701L), (d_fy_yea..., Options: 
[table=tpcds_1G.date_dim,paths=[]])
 : +- BroadcastExchange IdentityBroadcastMode
 : +- *(2) Project d_month_seq#22325L
 : +- *(2) ScanV2 BigQueryDataSourceV2d_month_seq#22325L (Filters: 
[isnotnull(d_day_name#22336), (d_day_name#22336 = Monday), 
isnotnull(d_fy_year#22333L), (d_fy_yea..., Options: 
[table=tpcds_1G.date_dim,paths=[]])
 +- *(8) HashAggregate(keys=d_year#22356L, d_month_seq#22409L, functions=[])
 +- Exchange hashpartitioning(d_year#22356L, d_month_seq#22409L, 200)
 +- *(7) HashAggregate(keys=d_year#22356L, d_month_seq#22409L, functions=[])
 +- BroadcastNestedLoopJoin BuildRight,

[jira] [Updated] (SPARK-32708) Query optimization fails to reuse exchange with DataSourceV2

2020-08-26 Thread Mingjia Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-32708?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mingjia Liu updated SPARK-32708:

Description: 
Repro query:
{code:java}
spark.conf.set("spark.sql.exchange.reuse","true")
spark.read.format('parquet').load('gs://dataproc-kokoro-tests-us-central1/tpcds/1G/parquet/date_dim')
#spark.read.format("com.google.cloud.spark.bigquery.v2.BigQueryDataSourceV2").option("table",'tpcds_1G.date_dim').load()
 
df.createOrReplaceTempView(table)
 
df = spark.sql(""" 
WITH t1 AS (
 SELECT 
 d_year, d_month_seq
 FROM (
 SELECT t1.d_year , t2.d_month_seq 
 FROM 
 date_dim t1
 cross join
 date_dim t2
 where t1.d_day_name = "Monday" and t1.d_fy_year > 2000
 and t2.d_day_name = "Monday" and t2.d_fy_year > 2000
 )
 GROUP BY d_year, d_month_seq)
 
 SELECT
 prev_yr.d_year AS prev_year, curr_yr.d_year AS year, curr_yr.d_month_seq
 FROM t1 curr_yr cross join t1 prev_yr
 WHERE curr_yr.d_year=2002 AND prev_yr.d_year=2002 
 ORDER BY d_month_seq
 LIMIT 100
 
 """)
df.explain()
#df.show()
{code}
 

The above query has different plans (marked in {color:#ff}red{color}) with 
Parquet and DataSourceV2. Both plans are correct tho. However, the DataSourceV2 
plan is less optimized :

Sub-plan [5-7] is exactly the same as sub-plan [1-3]. Therefore, the below 
parquet plan reused shuffle writes after [1-3]. However, the datasource V2 plan 
failed to do so.

Parquet:
{code:java}
== Physical Plan ==
TakeOrderedAndProject(limit=100, orderBy=[d_month_seq#21456L ASC NULLS FIRST], 
output=[prev_year#21451L,year#21452L,d_month_seq#21456L])
+- *(9) Project [d_year#21487L AS prev_year#21451L, d_year#20481L AS 
year#21452L, d_month_seq#21456L]
   +- CartesianProduct
  :- *(4) HashAggregate(keys=[d_year#20481L, d_month_seq#21456L], 
functions=[])
  :  +- Exchange hashpartitioning(d_year#20481L, d_month_seq#21456L, 200)
  : +- *(3) HashAggregate(keys=[d_year#20481L, d_month_seq#21456L], 
functions=[])
  :+- BroadcastNestedLoopJoin BuildRight, Cross
  :   :- *(1) Project [d_year#20481L]
  :   :  +- *(1) Filter (isnotnull(d_year#20481L) && 
isnotnull(d_day_name#20489)) && isnotnull(d_fy_year#20486L)) && 
(d_day_name#20489 = Monday)) && (d_fy_year#20486L > 2000)) && (d_year#20481L = 
2002))
  :   : +- *(1) FileScan parquet 
[d_year#20481L,d_fy_year#20486L,d_day_name#20489] Batched: true, Format: 
Parquet, Location: 
InMemoryFileIndex[gs://dataproc-kokoro-tests-us-central1/tpcds/1G/parquet/date_dim],
 PartitionFilters: [], PushedFilters: [IsNotNull(d_year), 
IsNotNull(d_day_name), IsNotNull(d_fy_year), EqualTo(d_day_name,Monday), 
Grea..., ReadSchema: struct
  :   +- BroadcastExchange IdentityBroadcastMode
  :  +- *(2) Project [d_month_seq#21456L]
  : +- *(2) Filter (((isnotnull(d_day_name#21467) && 
isnotnull(d_fy_year#21464L)) && (d_day_name#21467 = Monday)) && 
(d_fy_year#21464L > 2000))
  :+- *(2) FileScan parquet 
[d_month_seq#21456L,d_fy_year#21464L,d_day_name#21467] Batched: true, Format: 
Parquet, Location: 
InMemoryFileIndex[gs://dataproc-kokoro-tests-us-central1/tpcds/1G/parquet/date_dim],
 PartitionFilters: [], PushedFilters: [IsNotNull(d_day_name), 
IsNotNull(d_fy_year), EqualTo(d_day_name,Monday), GreaterThan(d_fy_year,2..., 
ReadSchema: struct
  +- *(8) HashAggregate(keys=[d_year#21487L, d_month_seq#21540L], 
functions=[])
 +- ReusedExchange [d_year#21487L, d_month_seq#21540L], Exchange 
hashpartitioning(d_year#20481L, d_month_seq#21456L, 200){code}
 

DataSourceV2:
{code:java}
== Physical Plan ==
 TakeOrderedAndProject(limit=100, orderBy=d_month_seq#22325L ASC NULLS FIRST, 
output=prev_year#22320L,year#22321L,d_month_seq#22325L)
 +- *(9) Project d_year#22356L AS prev_year#22320L, d_year#21696L AS 
year#22321L, d_month_seq#22325L
 +- CartesianProduct
 :- *(4) HashAggregate(keys=d_year#21696L, d_month_seq#22325L, functions=[])
 : +- Exchange hashpartitioning(d_year#21696L, d_month_seq#22325L, 200)
 : +- *(3) HashAggregate(keys=d_year#21696L, d_month_seq#22325L, functions=[])
 : +- BroadcastNestedLoopJoin BuildRight, Cross
 : :- *(1) Project d_year#21696L
 : : +- *(1) ScanV2 BigQueryDataSourceV2d_year#21696L (Filters: 
[isnotnull(d_day_name#21704), (d_day_name#21704 = Monday), 
isnotnull(d_fy_year#21701L), (d_fy_yea..., Options: 
[table=tpcds_1G.date_dim,paths=[]])
 : +- BroadcastExchange IdentityBroadcastMode
 : +- *(2) Project d_month_seq#22325L
 : +- *(2) ScanV2 BigQueryDataSourceV2d_month_seq#22325L (Filters: 
[isnotnull(d_day_name#22336), (d_day_name#22336 = Monday), 
isnotnull(d_fy_year#22333L), (d_fy_yea..., Options: 
[table=tpcds_1G.date_dim,paths=[]])
 +- *(8) HashAggregate(keys=d_year#22356L, d_month_seq#22409L, functions=[])
 +- Exchange hashpartitioning(d_year#22356L, d_month_seq#22409L, 200)
 +- *(7) HashAggregate(keys=d_year#22356L, d_month_seq#22409L, functions=[])

[jira] [Updated] (SPARK-32708) Query optimization fails to reuse exchange with DataSourceV2

2020-08-26 Thread Mingjia Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-32708?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mingjia Liu updated SPARK-32708:

Summary: Query optimization fails to reuse exchange with DataSourceV2  
(was: Query optimization fail to reuse exchange with DataSourceV2)

> Query optimization fails to reuse exchange with DataSourceV2
> 
>
> Key: SPARK-32708
> URL: https://issues.apache.org/jira/browse/SPARK-32708
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.0, 2.4.1, 2.4.2, 2.4.3, 2.4.4, 2.4.5, 2.4.6, 2.4.7
>Reporter: Mingjia Liu
>Priority: Major
>
> Repro query:
> {code:java}
> spark.conf.set("spark.sql.exchange.reuse","true")
> spark.read.format('parquet').load('gs://dataproc-kokoro-tests-us-central1/tpcds/1G/parquet/date_dim')
> #spark.read.format("com.google.cloud.spark.bigquery.v2.BigQueryDataSourceV2").option("table",'tpcds_1G.date_dim').load()
>  
> df.createOrReplaceTempView(table)
>  
> df = spark.sql(""" 
> WITH t1 AS (
>  SELECT 
>  d_year, d_month_seq
>  FROM (
>  SELECT t1.d_year , t2.d_month_seq 
>  FROM 
>  date_dim t1
>  cross join
>  date_dim t2
>  where t1.d_day_name = "Monday" and t1.d_fy_year > 2000
>  and t2.d_day_name = "Monday" and t2.d_fy_year > 2000
>  )
>  GROUP BY d_year, d_month_seq)
>  
>  SELECT
>  prev_yr.d_year AS prev_year, curr_yr.d_year AS year, curr_yr.d_month_seq
>  FROM t1 curr_yr cross join t1 prev_yr
>  WHERE curr_yr.d_year=2002 AND prev_yr.d_year=2002 
>  ORDER BY d_month_seq
>  LIMIT 100
>  
>  """)
> df.explain()
> #df.show()
> {code}
>  
> The above query has different plans (marked in {color:#FF}red{color}) 
> with Parquet and DataSourceV2. Both plans are correct tho. However, the 
> DataSourceV2 plan is less optimized :
> Sub-plan [5-7] is exactly the same as sub-plan [1-3]. Therefore, the below 
> parquet plan reused shuffle writes after [1-3]. However, the datasource V2 
> plan failed to do so.
> Parquet:
> {code:java}
> == Physical Plan ==
> TakeOrderedAndProject(limit=100, orderBy=[d_month_seq#21456L ASC NULLS 
> FIRST], output=[prev_year#21451L,year#21452L,d_month_seq#21456L])
> +- *(9) Project [d_year#21487L AS prev_year#21451L, d_year#20481L AS 
> year#21452L, d_month_seq#21456L]
>+- CartesianProduct
>   :- *(4) HashAggregate(keys=[d_year#20481L, d_month_seq#21456L], 
> functions=[])
>   :  +- Exchange hashpartitioning(d_year#20481L, d_month_seq#21456L, 200)
>   : +- *(3) HashAggregate(keys=[d_year#20481L, d_month_seq#21456L], 
> functions=[])
>   :+- BroadcastNestedLoopJoin BuildRight, Cross
>   :   :- *(1) Project [d_year#20481L]
>   :   :  +- *(1) Filter (isnotnull(d_year#20481L) && 
> isnotnull(d_day_name#20489)) && isnotnull(d_fy_year#20486L)) && 
> (d_day_name#20489 = Monday)) && (d_fy_year#20486L > 2000)) && (d_year#20481L 
> = 2002))
>   :   : +- *(1) FileScan parquet 
> [d_year#20481L,d_fy_year#20486L,d_day_name#20489] Batched: true, Format: 
> Parquet, Location: 
> InMemoryFileIndex[gs://dataproc-kokoro-tests-us-central1/tpcds/1G/parquet/date_dim],
>  PartitionFilters: [], PushedFilters: [IsNotNull(d_year), 
> IsNotNull(d_day_name), IsNotNull(d_fy_year), EqualTo(d_day_name,Monday), 
> Grea..., ReadSchema: struct
>   :   +- BroadcastExchange IdentityBroadcastMode
>   :  +- *(2) Project [d_month_seq#21456L]
>   : +- *(2) Filter (((isnotnull(d_day_name#21467) && 
> isnotnull(d_fy_year#21464L)) && (d_day_name#21467 = Monday)) && 
> (d_fy_year#21464L > 2000))
>   :+- *(2) FileScan parquet 
> [d_month_seq#21456L,d_fy_year#21464L,d_day_name#21467] Batched: true, Format: 
> Parquet, Location: 
> InMemoryFileIndex[gs://dataproc-kokoro-tests-us-central1/tpcds/1G/parquet/date_dim],
>  PartitionFilters: [], PushedFilters: [IsNotNull(d_day_name), 
> IsNotNull(d_fy_year), EqualTo(d_day_name,Monday), GreaterThan(d_fy_year,2..., 
> ReadSchema: struct
>   +- *(8) HashAggregate(keys=[d_year#21487L, d_month_seq#21540L], 
> functions=[])
>  +- ReusedExchange [d_year#21487L, d_month_seq#21540L], Exchange 
> hashpartitioning(d_year#20481L, d_month_seq#21456L, 200){code}
>  
> DataSourceV2:
> {code:java}
> == Physical Plan ==
>  TakeOrderedAndProject(limit=100, orderBy=d_month_seq#22325L ASC NULLS FIRST, 
> output=prev_year#22320L,year#22321L,d_month_seq#22325L)
>  +- *(9) Project d_year#22356L AS prev_year#22320L, d_year#21696L AS 
> year#22321L, d_month_seq#22325L
>  +- CartesianProduct
>  :- *(4) HashAggregate(keys=d_year#21696L, d_month_seq#22325L, functions=[])
>  : +- Exchange hashpartitioning(d_year#21696L, d_month_seq#22325L, 200)
>  : +- *(3) HashAggregate(keys=d_year#21696L, d_month_seq#22325L, functions=[])
>  : +- BroadcastNestedLoopJoin BuildRight, C