[ https://issues.apache.org/jira/browse/SPARK-32708?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17185523#comment-17185523 ]
Mingjia Liu edited comment on SPARK-32708 at 8/27/20, 12:01 AM: ---------------------------------------------------------------- This difference happened at ReuseExchange rule apply. The direct reason is : equals function of DataSourceV2ScanExec returns 'false' as comparing [6] [5] VS [2][1] in DataSourceV2 plan. But the actual cause is : DataSourceV2ScanExec.pushedFilters are defined as array of Expressions whose equal function has expression_id in its scope. So for example, *isnotnull(d_day_name#22364)* is not considered equal to *isnotnull(d_day_name#22420)* was (Author: mingjial): This difference happened at ReuseExchange rule apply. The direct reason is : equals function of DataSourceV2ScanExec returns 'false' as comparing [6] [5] VS [2][1] in DataSourceV2 plan. But the actual cause is : DataSourceV2ScanExec.pushedFilters are defined as array of Expressions whose equal function has expression_id in its scope. So for example, isnotnull(d_day_name#22364) is not considered equal to isnotnull(d_day_name#22420) > Query optimization fails to reuse exchange with DataSourceV2 > ------------------------------------------------------------ > > Key: SPARK-32708 > URL: https://issues.apache.org/jira/browse/SPARK-32708 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 2.4.0, 2.4.1, 2.4.2, 2.4.3, 2.4.4, 2.4.5, 2.4.6, 2.4.7 > Reporter: Mingjia Liu > Priority: Major > > Repro query: > {code:java} > spark.conf.set("spark.sql.exchange.reuse","true") > spark.read.format('parquet').load('gs://dataproc-kokoro-tests-us-central1/tpcds/1G/parquet/date_dim') > #spark.read.format("com.google.cloud.spark.bigquery.v2.BigQueryDataSourceV2").option("table",'tpcds_1G.date_dim').load() > > df.createOrReplaceTempView(table) > > df = spark.sql(""" > WITH t1 AS ( > SELECT > d_year, d_month_seq > FROM ( > SELECT t1.d_year , t2.d_month_seq > FROM > date_dim t1 > cross join > date_dim t2 > where t1.d_day_name = "Monday" and t1.d_fy_year > 2000 > and t2.d_day_name = "Monday" and t2.d_fy_year > 2000 > ) > GROUP BY d_year, d_month_seq) > > SELECT > prev_yr.d_year AS prev_year, curr_yr.d_year AS year, curr_yr.d_month_seq > FROM t1 curr_yr cross join t1 prev_yr > WHERE curr_yr.d_year=2002 AND prev_yr.d_year=2002 > ORDER BY d_month_seq > LIMIT 100 > > """) > df.explain() > #df.show() > {code} > > *The above query has different plans with Parquet and DataSourceV2. Both > plans are correct tho. However, the DataSourceV2 plan is less optimized :* > *Sub-plan [5-7] is exactly the same as sub-plan [1-3]( Aggregate on BHJed > dataset of two tables that are filtered, projected the same way).* > *Therefore, in the below parquet plan, exchange that happens after [1-3] is > reused to replace [5-6].* > *However, the DataSourceV2 plan failed to do so.* > > Parquet: > {code:java} > == Physical Plan == > TakeOrderedAndProject(limit=100, orderBy=[d_month_seq#21456L ASC NULLS > FIRST], output=[prev_year#21451L,year#21452L,d_month_seq#21456L]) > +- *(9) Project [d_year#21487L AS prev_year#21451L, d_year#20481L AS > year#21452L, d_month_seq#21456L] > +- CartesianProduct > :- *(4) HashAggregate(keys=[d_year#20481L, d_month_seq#21456L], > functions=[]) > : +- Exchange hashpartitioning(d_year#20481L, d_month_seq#21456L, 200) > : +- *(3) HashAggregate(keys=[d_year#20481L, d_month_seq#21456L], > functions=[]) > : +- BroadcastNestedLoopJoin BuildRight, Cross > : :- *(1) Project [d_year#20481L] > : : +- *(1) Filter (((((isnotnull(d_year#20481L) && > isnotnull(d_day_name#20489)) && isnotnull(d_fy_year#20486L)) && > (d_day_name#20489 = Monday)) && (d_fy_year#20486L > 2000)) && (d_year#20481L > = 2002)) > : : +- *(1) FileScan parquet > [d_year#20481L,d_fy_year#20486L,d_day_name#20489] Batched: true, Format: > Parquet, Location: > InMemoryFileIndex[gs://dataproc-kokoro-tests-us-central1/tpcds/1G/parquet/date_dim], > PartitionFilters: [], PushedFilters: [IsNotNull(d_year), > IsNotNull(d_day_name), IsNotNull(d_fy_year), EqualTo(d_day_name,Monday), > Grea..., ReadSchema: struct<d_year:bigint,d_fy_year:bigint,d_day_name:string> > : +- BroadcastExchange IdentityBroadcastMode > : +- *(2) Project [d_month_seq#21456L] > : +- *(2) Filter (((isnotnull(d_day_name#21467) && > isnotnull(d_fy_year#21464L)) && (d_day_name#21467 = Monday)) && > (d_fy_year#21464L > 2000)) > : +- *(2) FileScan parquet > [d_month_seq#21456L,d_fy_year#21464L,d_day_name#21467] Batched: true, Format: > Parquet, Location: > InMemoryFileIndex[gs://dataproc-kokoro-tests-us-central1/tpcds/1G/parquet/date_dim], > PartitionFilters: [], PushedFilters: [IsNotNull(d_day_name), > IsNotNull(d_fy_year), EqualTo(d_day_name,Monday), GreaterThan(d_fy_year,2..., > ReadSchema: struct<d_month_seq:bigint,d_fy_year:bigint,d_day_name:string> > +- *(8) HashAggregate(keys=[d_year#21487L, d_month_seq#21540L], > functions=[]) > +- ReusedExchange [d_year#21487L, d_month_seq#21540L], Exchange > hashpartitioning(d_year#20481L, d_month_seq#21456L, 200){code} > > DataSourceV2: > {code:java} > == Physical Plan == > TakeOrderedAndProject(limit=100, orderBy=d_month_seq#22325L ASC NULLS FIRST, > output=prev_year#22320L,year#22321L,d_month_seq#22325L) > +- *(9) Project d_year#22356L AS prev_year#22320L, d_year#21696L AS > year#22321L, d_month_seq#22325L > +- CartesianProduct > :- *(4) HashAggregate(keys=d_year#21696L, d_month_seq#22325L, functions=[]) > : +- Exchange hashpartitioning(d_year#21696L, d_month_seq#22325L, 200) > : +- *(3) HashAggregate(keys=d_year#21696L, d_month_seq#22325L, functions=[]) > : +- BroadcastNestedLoopJoin BuildRight, Cross > : :- *(1) Project d_year#21696L > : : +- *(1) ScanV2 BigQueryDataSourceV2d_year#21696L (Filters: > [isnotnull(d_day_name#21704), (d_day_name#21704 = Monday), > isnotnull(d_fy_year#21701L), (d_fy_yea..., Options: > [table=tpcds_1G.date_dim,paths=[]]) > : +- BroadcastExchange IdentityBroadcastMode > : +- *(2) Project d_month_seq#22325L > : +- *(2) ScanV2 BigQueryDataSourceV2d_month_seq#22325L (Filters: > [isnotnull(d_day_name#22336), (d_day_name#22336 = Monday), > isnotnull(d_fy_year#22333L), (d_fy_yea..., Options: > [table=tpcds_1G.date_dim,paths=[]]) > +- *(8) HashAggregate(keys=d_year#22356L, d_month_seq#22409L, functions=[]) > +- Exchange hashpartitioning(d_year#22356L, d_month_seq#22409L, 200) > +- *(7) HashAggregate(keys=d_year#22356L, d_month_seq#22409L, functions=[]) > +- BroadcastNestedLoopJoin BuildRight, Cross > :- *(5) Project d_year#22356L > : +- *(5) ScanV2 BigQueryDataSourceV2d_year#22356L (Filters: > [isnotnull(d_day_name#22364), (d_day_name#22364 = Monday), > isnotnull(d_fy_year#22361L), (d_fy_yea..., Options: > [table=tpcds_1G.date_dim,paths=[]]) > +- BroadcastExchange IdentityBroadcastMode > +- *(6) Project d_month_seq#22409L > +- *(6) ScanV2 BigQueryDataSourceV2d_month_seq#22409L (Filters: > [isnotnull(d_day_name#22420), (d_day_name#22420 = Monday), > isnotnull(d_fy_year#22417L), (d_fy_yea..., Options: > [table=tpcds_1G.date_dim,paths=[]]){code} > -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org