[
https://issues.apache.org/jira/browse/SPARK-45866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18084147#comment-18084147
]
Dongjoon Hyun commented on SPARK-45866:
---------------------------------------
According to the Apache Spark contribution guideline, I removed `Target
Version` from this JIRA issue, [~ashahid7].
- https://spark.apache.org/contributing.html
{quote}
Do not set the following fields:
- Fix Version. This is assigned by committers only when resolved.
- Target Version. This is assigned by committers to indicate a PR has been
accepted for possible fix by the target version.
{quote}
> Reuse of exchange in AQE does not happen when run time filters are pushed
> down to the underlying Scan ( like iceberg )
> ----------------------------------------------------------------------------------------------------------------------
>
> Key: SPARK-45866
> URL: https://issues.apache.org/jira/browse/SPARK-45866
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 3.5.1, 4.0.0
> Reporter: Asif
> Priority: Major
> Labels: pull-request-available
>
> In certain types of queries for eg TPCDS Query 14b, the reuse of exchange
> does not happen in AQE , resulting in perf degradation.
> The spark TPCDS tests are unable to catch the problem, because the
> InMemoryScan used for testing do not implement the equals & hashCode
> correctly , in the sense , that they do take into account the pushed down run
> time filters.
> In concrete Scan implementations, for eg iceberg's SparkBatchQueryScan , the
> equality check , apart from other things, also involves Runtime Filters
> pushed ( which is correct).
> In spark the issue is this:
> For a given stage being materialized, just before materialization starts,
> the run time filters are confined to the BatchScanExec level.
> Only when the actual RDD corresponding to the BatchScanExec, is being
> evaluated, do the runtime filters get pushed to the underlying Scan.
> Now if a new stage is created and it checks in the stageCache using its
> canonicalized plan to see if a stage can be reused, it fails to find the
> r-usable stage even if the stage exists, because the canonicalized spark
> plan present in the stage cache, has now the run time filters pushed to the
> Scan , so the incoming canonicalized spark plan does not match the key as
> their underlying scans differ . that is incoming spark plan's scan does not
> have runtime filters , while the canonicalized spark plan present as key in
> the stage cache has the scan with runtime filters pushed.
> The fix as I have worked is to provide, two methods in the
> SupportsRuntimeV2Filtering interface ,
> default boolean equalToIgnoreRuntimeFilters(Scan other) {
> return this.equals(other);
> }
> default int hashCodeIgnoreRuntimeFilters() {
> return this.hashCode();
> }
> In the BatchScanExec, if the scan implements SupportsRuntimeV2Filtering, then
> instead of batch.equals, it should call scan.equalToIgnoreRuntimeFilters
> And the underlying Scan implementations should provide equality which
> excludes run time filters.
> Similarly the hashCode of BatchScanExec, should use
> scan.hashCodeIgnoreRuntimeFilters instead of ( batch.hashCode).
> Will be creating a PR with bug test for review.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]