[ https://issues.apache.org/jira/browse/SPARK-45866?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Asif updated SPARK-45866: ------------------------- Labels: pull-request-available (was: ) > Reuse of exchange in AQE does not happen when run time filters are pushed > down to the underlying Scan ( like iceberg ) > ---------------------------------------------------------------------------------------------------------------------- > > Key: SPARK-45866 > URL: https://issues.apache.org/jira/browse/SPARK-45866 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 3.5.1 > Reporter: Asif > Priority: Major > Labels: pull-request-available > > In certain types of queries for eg TPCDS Query 14b, the reuse of exchange > does not happen in AQE , resulting in perf degradation. > The spark TPCDS tests are unable to catch the problem, because the > InMemoryScan used for testing do not implement the equals & hashCode > correctly , in the sense , that they do take into account the pushed down run > time filters. > In concrete Scan implementations, for eg iceberg's SparkBatchQueryScan , the > equality check , apart from other things, also involves Runtime Filters > pushed ( which is correct). > In spark the issue is this: > For a given stage being materialized, just before materialization starts, > the run time filters are confined to the BatchScanExec level. > Only when the actual RDD corresponding to the BatchScanExec, is being > evaluated, do the runtime filters get pushed to the underlying Scan. > Now if a new stage is created and it checks in the stageCache using its > canonicalized plan to see if a stage can be reused, it fails to find the > r-usable stage even if the stage exists, because the canonicalized spark > plan present in the stage cache, has now the run time filters pushed to the > Scan , so the incoming canonicalized spark plan does not match the key as > their underlying scans differ . that is incoming spark plan's scan does not > have runtime filters , while the canonicalized spark plan present as key in > the stage cache has the scan with runtime filters pushed. > The fix as I have worked is to provide, two methods in the > SupportsRuntimeV2Filtering interface , > default boolean equalToIgnoreRuntimeFilters(Scan other) { > return this.equals(other); > } > default int hashCodeIgnoreRuntimeFilters() { > return this.hashCode(); > } > In the BatchScanExec, if the scan implements SupportsRuntimeV2Filtering, then > instead of batch.equals, it should call scan.equalToIgnoreRuntimeFilters > And the underlying Scan implementations should provide equality which > excludes run time filters. > Similarly the hashCode of BatchScanExec, should use > scan.hashCodeIgnoreRuntimeFilters instead of ( batch.hashCode). > Will be creating a PR with bug test for review. -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org