This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
     new 2f8e7cbe98df [SPARK-48173][SQL][3.5] CheckAnalysis should see the 
entire query plan
2f8e7cbe98df is described below

commit 2f8e7cbe98df97ee0ae51a20796192c95e750721
Author: Wenchen Fan <cloud0...@gmail.com>
AuthorDate: Tue May 7 15:25:15 2024 -0700

    [SPARK-48173][SQL][3.5] CheckAnalysis should see the entire query plan
    
    backport https://github.com/apache/spark/pull/46439 to 3.5
    
    ### What changes were proposed in this pull request?
    
    This is a follow-up of https://github.com/apache/spark/pull/38029 . Some 
custom check rules need to see the entire query plan tree to get some context, 
but https://github.com/apache/spark/pull/38029 breaks it as it checks the query 
plan of dangling CTE relations recursively.
    
    This PR fixes it by putting back the dangling CTE relation in the main 
query plan and then check the main query plan.
    
    ### Why are the changes needed?
    
    Revert the breaking change to custom check rules
    
    ### Does this PR introduce _any_ user-facing change?
    
    No for most users. This restores the behavior of Spark 3.3 and earlier for 
custom check rules.
    
    ### How was this patch tested?
    
    existing tests.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #46442 from cloud-fan/check2.
    
    Lead-authored-by: Wenchen Fan <cloud0...@gmail.com>
    Co-authored-by: Wenchen Fan <wenc...@databricks.com>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 .../sql/catalyst/analysis/CheckAnalysis.scala      | 38 +++++++++++++++++++---
 1 file changed, 33 insertions(+), 5 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
index 7f10bdbc80ca..485015f2efab 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
@@ -141,17 +141,45 @@ trait CheckAnalysis extends PredicateHelper with 
LookupCatalog with QueryErrorsB
       errorClass, missingCol, orderedCandidates, a.origin)
   }
 
+  private def checkUnreferencedCTERelations(
+      cteMap: mutable.Map[Long, (CTERelationDef, Int, mutable.Map[Long, Int])],
+      visited: mutable.Map[Long, Boolean],
+      danglingCTERelations: mutable.ArrayBuffer[CTERelationDef],
+      cteId: Long): Unit = {
+    if (visited(cteId)) {
+      return
+    }
+    val (cteDef, _, refMap) = cteMap(cteId)
+    refMap.foreach { case (id, _) =>
+      checkUnreferencedCTERelations(cteMap, visited, danglingCTERelations, id)
+    }
+    danglingCTERelations.append(cteDef)
+    visited(cteId) = true
+  }
+
   def checkAnalysis(plan: LogicalPlan): Unit = {
     val inlineCTE = InlineCTE(alwaysInline = true)
     val cteMap = mutable.HashMap.empty[Long, (CTERelationDef, Int, 
mutable.Map[Long, Int])]
     inlineCTE.buildCTEMap(plan, cteMap)
-    cteMap.values.foreach { case (relation, refCount, _) =>
-      // If a CTE relation is never used, it will disappear after inline. Here 
we explicitly check
-      // analysis for it, to make sure the entire query plan is valid.
-      if (refCount == 0) checkAnalysis0(relation.child)
+    val danglingCTERelations = mutable.ArrayBuffer.empty[CTERelationDef]
+    val visited: mutable.Map[Long, Boolean] = 
mutable.Map.empty.withDefaultValue(false)
+    // If a CTE relation is never used, it will disappear after inline. Here 
we explicitly collect
+    // these dangling CTE relations, and put them back in the main query, to 
make sure the entire
+    // query plan is valid.
+    cteMap.foreach { case (cteId, (_, refCount, _)) =>
+      // If a CTE relation ref count is 0, the other CTE relations that 
reference it should also be
+      // collected. This code will also guarantee the leaf relations that do 
not reference
+      // any others are collected first.
+      if (refCount == 0) {
+        checkUnreferencedCTERelations(cteMap, visited, danglingCTERelations, 
cteId)
+      }
     }
     // Inline all CTEs in the plan to help check query plan structures in 
subqueries.
-    checkAnalysis0(inlineCTE(plan))
+    var inlinedPlan: LogicalPlan = inlineCTE(plan)
+    if (danglingCTERelations.nonEmpty) {
+      inlinedPlan = WithCTE(inlinedPlan, danglingCTERelations.toSeq)
+    }
+    checkAnalysis0(inlinedPlan)
     plan.setAnalyzed()
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to