[ 
https://issues.apache.org/jira/browse/SPARK-32708?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17185523#comment-17185523
 ] 

Mingjia Liu edited comment on SPARK-32708 at 8/27/20, 12:01 AM:
----------------------------------------------------------------

This difference happened at ReuseExchange rule apply. 

The direct reason is  : equals function of DataSourceV2ScanExec returns 'false' 
as comparing [6] [5] VS [2][1] in DataSourceV2 plan.

But the actual cause is : DataSourceV2ScanExec.pushedFilters are defined as 
array of Expressions whose equal function has  expression_id in its scope.

So for example,  *isnotnull(d_day_name#22364)*  is not considered equal to  
*isnotnull(d_day_name#22420)*
  


was (Author: mingjial):
This difference happened at ReuseExchange rule apply. 

The direct reason is  : equals function of DataSourceV2ScanExec returns 'false' 
as comparing [6] [5] VS [2][1] in DataSourceV2 plan.

But the actual cause is : DataSourceV2ScanExec.pushedFilters are defined as 
array of Expressions whose equal function has  expression_id in its scope.


So for example, 
isnotnull(d_day_name#22364) 
is not considered equal to 
isnotnull(d_day_name#22420)
 

> Query optimization fails to reuse exchange with DataSourceV2
> ------------------------------------------------------------
>
>                 Key: SPARK-32708
>                 URL: https://issues.apache.org/jira/browse/SPARK-32708
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.4.0, 2.4.1, 2.4.2, 2.4.3, 2.4.4, 2.4.5, 2.4.6, 2.4.7
>            Reporter: Mingjia Liu
>            Priority: Major
>
> Repro query:
> {code:java}
> spark.conf.set("spark.sql.exchange.reuse","true")
> spark.read.format('parquet').load('gs://dataproc-kokoro-tests-us-central1/tpcds/1G/parquet/date_dim')
> #spark.read.format("com.google.cloud.spark.bigquery.v2.BigQueryDataSourceV2").option("table",'tpcds_1G.date_dim').load()
>  
> df.createOrReplaceTempView(table)
>  
> df = spark.sql(""" 
> WITH t1 AS (
>  SELECT 
>  d_year, d_month_seq
>  FROM (
>  SELECT t1.d_year , t2.d_month_seq 
>  FROM 
>  date_dim t1
>  cross join
>  date_dim t2
>  where t1.d_day_name = "Monday" and t1.d_fy_year > 2000
>  and t2.d_day_name = "Monday" and t2.d_fy_year > 2000
>  )
>  GROUP BY d_year, d_month_seq)
>  
>  SELECT
>  prev_yr.d_year AS prev_year, curr_yr.d_year AS year, curr_yr.d_month_seq
>  FROM t1 curr_yr cross join t1 prev_yr
>  WHERE curr_yr.d_year=2002 AND prev_yr.d_year=2002 
>  ORDER BY d_month_seq
>  LIMIT 100
>  
>  """)
> df.explain()
> #df.show()
> {code}
>  
> *The above query has different plans with Parquet and DataSourceV2. Both 
> plans are correct tho. However, the DataSourceV2 plan is less optimized :*
> *Sub-plan [5-7] is exactly the same as sub-plan [1-3]( Aggregate on BHJed 
> dataset of two tables that are filtered, projected the same way).* 
> *Therefore, in the below parquet plan, exchange that happens after [1-3] is 
> reused to replace [5-6].*
>  *However, the DataSourceV2 plan failed to do so.*
>  
> Parquet:
> {code:java}
> == Physical Plan ==
> TakeOrderedAndProject(limit=100, orderBy=[d_month_seq#21456L ASC NULLS 
> FIRST], output=[prev_year#21451L,year#21452L,d_month_seq#21456L])
> +- *(9) Project [d_year#21487L AS prev_year#21451L, d_year#20481L AS 
> year#21452L, d_month_seq#21456L]
>    +- CartesianProduct
>       :- *(4) HashAggregate(keys=[d_year#20481L, d_month_seq#21456L], 
> functions=[])
>       :  +- Exchange hashpartitioning(d_year#20481L, d_month_seq#21456L, 200)
>       :     +- *(3) HashAggregate(keys=[d_year#20481L, d_month_seq#21456L], 
> functions=[])
>       :        +- BroadcastNestedLoopJoin BuildRight, Cross
>       :           :- *(1) Project [d_year#20481L]
>       :           :  +- *(1) Filter (((((isnotnull(d_year#20481L) && 
> isnotnull(d_day_name#20489)) && isnotnull(d_fy_year#20486L)) && 
> (d_day_name#20489 = Monday)) && (d_fy_year#20486L > 2000)) && (d_year#20481L 
> = 2002))
>       :           :     +- *(1) FileScan parquet 
> [d_year#20481L,d_fy_year#20486L,d_day_name#20489] Batched: true, Format: 
> Parquet, Location: 
> InMemoryFileIndex[gs://dataproc-kokoro-tests-us-central1/tpcds/1G/parquet/date_dim],
>  PartitionFilters: [], PushedFilters: [IsNotNull(d_year), 
> IsNotNull(d_day_name), IsNotNull(d_fy_year), EqualTo(d_day_name,Monday), 
> Grea..., ReadSchema: struct<d_year:bigint,d_fy_year:bigint,d_day_name:string>
>       :           +- BroadcastExchange IdentityBroadcastMode
>       :              +- *(2) Project [d_month_seq#21456L]
>       :                 +- *(2) Filter (((isnotnull(d_day_name#21467) && 
> isnotnull(d_fy_year#21464L)) && (d_day_name#21467 = Monday)) && 
> (d_fy_year#21464L > 2000))
>       :                    +- *(2) FileScan parquet 
> [d_month_seq#21456L,d_fy_year#21464L,d_day_name#21467] Batched: true, Format: 
> Parquet, Location: 
> InMemoryFileIndex[gs://dataproc-kokoro-tests-us-central1/tpcds/1G/parquet/date_dim],
>  PartitionFilters: [], PushedFilters: [IsNotNull(d_day_name), 
> IsNotNull(d_fy_year), EqualTo(d_day_name,Monday), GreaterThan(d_fy_year,2..., 
> ReadSchema: struct<d_month_seq:bigint,d_fy_year:bigint,d_day_name:string>
>       +- *(8) HashAggregate(keys=[d_year#21487L, d_month_seq#21540L], 
> functions=[])
>          +- ReusedExchange [d_year#21487L, d_month_seq#21540L], Exchange 
> hashpartitioning(d_year#20481L, d_month_seq#21456L, 200){code}
>  
> DataSourceV2:
> {code:java}
> == Physical Plan ==
>  TakeOrderedAndProject(limit=100, orderBy=d_month_seq#22325L ASC NULLS FIRST, 
> output=prev_year#22320L,year#22321L,d_month_seq#22325L)
>  +- *(9) Project d_year#22356L AS prev_year#22320L, d_year#21696L AS 
> year#22321L, d_month_seq#22325L
>  +- CartesianProduct
>  :- *(4) HashAggregate(keys=d_year#21696L, d_month_seq#22325L, functions=[])
>  : +- Exchange hashpartitioning(d_year#21696L, d_month_seq#22325L, 200)
>  : +- *(3) HashAggregate(keys=d_year#21696L, d_month_seq#22325L, functions=[])
>  : +- BroadcastNestedLoopJoin BuildRight, Cross
>  : :- *(1) Project d_year#21696L
>  : : +- *(1) ScanV2 BigQueryDataSourceV2d_year#21696L (Filters: 
> [isnotnull(d_day_name#21704), (d_day_name#21704 = Monday), 
> isnotnull(d_fy_year#21701L), (d_fy_yea..., Options: 
> [table=tpcds_1G.date_dim,paths=[]])
>  : +- BroadcastExchange IdentityBroadcastMode
>  : +- *(2) Project d_month_seq#22325L
>  : +- *(2) ScanV2 BigQueryDataSourceV2d_month_seq#22325L (Filters: 
> [isnotnull(d_day_name#22336), (d_day_name#22336 = Monday), 
> isnotnull(d_fy_year#22333L), (d_fy_yea..., Options: 
> [table=tpcds_1G.date_dim,paths=[]])
>  +- *(8) HashAggregate(keys=d_year#22356L, d_month_seq#22409L, functions=[])
>  +- Exchange hashpartitioning(d_year#22356L, d_month_seq#22409L, 200)
>  +- *(7) HashAggregate(keys=d_year#22356L, d_month_seq#22409L, functions=[])
>  +- BroadcastNestedLoopJoin BuildRight, Cross
>  :- *(5) Project d_year#22356L
>  : +- *(5) ScanV2 BigQueryDataSourceV2d_year#22356L (Filters: 
> [isnotnull(d_day_name#22364), (d_day_name#22364 = Monday), 
> isnotnull(d_fy_year#22361L), (d_fy_yea..., Options: 
> [table=tpcds_1G.date_dim,paths=[]])
>  +- BroadcastExchange IdentityBroadcastMode
>  +- *(6) Project d_month_seq#22409L
>  +- *(6) ScanV2 BigQueryDataSourceV2d_month_seq#22409L (Filters: 
> [isnotnull(d_day_name#22420), (d_day_name#22420 = Monday), 
> isnotnull(d_fy_year#22417L), (d_fy_yea..., Options: 
> [table=tpcds_1G.date_dim,paths=[]]){code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to