Asif created SPARK-45866:
----------------------------

             Summary: Reuse of exchange in AQE does not happen when run time 
filters are pushed down to the underlying Scan ( like iceberg )
                 Key: SPARK-45866
                 URL: https://issues.apache.org/jira/browse/SPARK-45866
             Project: Spark
          Issue Type: Bug
          Components: SQL
    Affects Versions: 3.5.1
            Reporter: Asif


In certain types of queries for eg TPCDS Query 14b,  the reuse of exchange does 
not happen in AQE , resulting in perf degradation.
The spark TPCDS tests are unable to catch the problem, because the InMemoryScan 
used for testing do not implement the equals & hashCode correctly , in the 
sense , that they do take into account the pushed down run time filters.

In concrete Scan implementations, for eg iceberg's SparkBatchQueryScan , the 
equality check , apart from other things, also involves Runtime Filters pushed 
( which is correct).

In spark the issue is this:
For a given stage being materialized,  just before materialization starts, the 
run time filters are confined to the BatchScanExec level.
Only when the actual RDD corresponding to the BatchScanExec, is being 
evaluated,  do the runtime filters get pushed to the underlying Scan.

Now if a new stage is created and it checks in the stageCache using its 
canonicalized plan to see if a stage can be reused, it fails to find the 
r-usable  stage even if the stage exists, because the canonicalized spark plan 
present in the stage cache, has now the run time filters pushed to the Scan , 
so the incoming canonicalized spark plan does not match the key as their 
underlying scans differ . that is incoming spark plan's scan does not have 
runtime filters , while the canonicalized spark plan present as key in the 
stage cache has the scan with runtime filters pushed.

The fix as I have worked is to provide, two methods in the 
SupportsRuntimeV2Filtering interface ,
default boolean equalToIgnoreRuntimeFilters(Scan other) {
    return this.equals(other);
  }

  default int hashCodeIgnoreRuntimeFilters() {
    return this.hashCode();
  }

In the BatchScanExec, if the scan implements SupportsRuntimeV2Filtering, then 
instead of batch.equals, it should call scan.equalToIgnoreRuntimeFilters

And the underlying Scan implementations should provide equality which excludes 
run time filters.

Similarly the hashCode of BatchScanExec, should use 
scan.hashCodeIgnoreRuntimeFilters instead of ( batch.hashCode).

Will be creating a PR with bug test for review.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to