This is an automated email from the ASF dual-hosted git repository.

tgraves pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new 73abceb  [SPARK-35093][SQL] AQE now uses newQueryStage plan as key for 
looking up cached exchanges for re-use
73abceb is described below

commit 73abceb05d64eafeb39866c69a84d0b7f3c1f097
Author: Andy Grove <andygrov...@gmail.com>
AuthorDate: Wed May 19 07:45:26 2021 -0500

    [SPARK-35093][SQL] AQE now uses newQueryStage plan as key for looking up 
cached exchanges for re-use
    
    ### What changes were proposed in this pull request?
    AQE has an optimization where it attempts to reuse compatible exchanges but 
it does not take into account whether the exchanges are columnar or not, 
resulting in incorrect reuse under some circumstances.
    
    This PR simply changes the key used to lookup cached stages. It now uses 
the canonicalized form of the new query stage (potentially created by a plugin) 
rather than using the canonicalized form of the original exchange.
    
    ### Why are the changes needed?
    When using the [RAPIDS Accelerator for Apache 
Spark](https://github.com/NVIDIA/spark-rapids) we sometimes see a new query 
stage correctly create a row-based exchange and then Spark replaces it with a 
cached columnar exchange, which is not compatible, and this causes queries to 
fail.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    The patch has been tested with the query that highlighted this issue. I 
looked at writing unit tests for this but it would involve implementing a mock 
columnar exchange in the tests so would be quite a bit of work. If anyone has 
ideas on other ways to test this I am happy to hear them.
    
    Closes #32195 from andygrove/SPARK-35093.
    
    Authored-by: Andy Grove <andygrov...@gmail.com>
    Signed-off-by: Thomas Graves <tgra...@apache.org>
    (cherry picked from commit 52e3cf9ff50b4209e29cb06df09b1ef3a18bc83b)
    Signed-off-by: Thomas Graves <tgra...@apache.org>
---
 .../apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala    | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
index 89d3b53..596c8b3 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
@@ -418,7 +418,8 @@ case class AdaptiveSparkPlanExec(
               // Check the `stageCache` again for reuse. If a match is found, 
ditch the new stage
               // and reuse the existing stage found in the `stageCache`, 
otherwise update the
               // `stageCache` with the new stage.
-              val queryStage = 
context.stageCache.getOrElseUpdate(e.canonicalized, newStage)
+              val queryStage = context.stageCache.getOrElseUpdate(
+                newStage.plan.canonicalized, newStage)
               if (queryStage.ne(newStage)) {
                 newStage = reuseQueryStage(queryStage, e)
               }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to