[ 
https://issues.apache.org/jira/browse/SPARK-45866?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Asif updated SPARK-45866:
-------------------------
    Labels: pull-request-available  (was: )

> Reuse of exchange in AQE does not happen when run time filters are pushed 
> down to the underlying Scan ( like iceberg )
> ----------------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-45866
>                 URL: https://issues.apache.org/jira/browse/SPARK-45866
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 3.5.1
>            Reporter: Asif
>            Priority: Major
>              Labels: pull-request-available
>
> In certain types of queries for eg TPCDS Query 14b,  the reuse of exchange 
> does not happen in AQE , resulting in perf degradation.
> The spark TPCDS tests are unable to catch the problem, because the 
> InMemoryScan used for testing do not implement the equals & hashCode 
> correctly , in the sense , that they do take into account the pushed down run 
> time filters.
> In concrete Scan implementations, for eg iceberg's SparkBatchQueryScan , the 
> equality check , apart from other things, also involves Runtime Filters 
> pushed ( which is correct).
> In spark the issue is this:
> For a given stage being materialized,  just before materialization starts, 
> the run time filters are confined to the BatchScanExec level.
> Only when the actual RDD corresponding to the BatchScanExec, is being 
> evaluated,  do the runtime filters get pushed to the underlying Scan.
> Now if a new stage is created and it checks in the stageCache using its 
> canonicalized plan to see if a stage can be reused, it fails to find the 
> r-usable  stage even if the stage exists, because the canonicalized spark 
> plan present in the stage cache, has now the run time filters pushed to the 
> Scan , so the incoming canonicalized spark plan does not match the key as 
> their underlying scans differ . that is incoming spark plan's scan does not 
> have runtime filters , while the canonicalized spark plan present as key in 
> the stage cache has the scan with runtime filters pushed.
> The fix as I have worked is to provide, two methods in the 
> SupportsRuntimeV2Filtering interface ,
> default boolean equalToIgnoreRuntimeFilters(Scan other) {
>     return this.equals(other);
>   }
>   default int hashCodeIgnoreRuntimeFilters() {
>     return this.hashCode();
>   }
> In the BatchScanExec, if the scan implements SupportsRuntimeV2Filtering, then 
> instead of batch.equals, it should call scan.equalToIgnoreRuntimeFilters
> And the underlying Scan implementations should provide equality which 
> excludes run time filters.
> Similarly the hashCode of BatchScanExec, should use 
> scan.hashCodeIgnoreRuntimeFilters instead of ( batch.hashCode).
> Will be creating a PR with bug test for review.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to