This is an automated email from the ASF dual-hosted git repository.

cloud-fan pushed a commit to branch branch-4.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-4.2 by this push:
     new 6a91b1d063a8 [SPARK-56921][SQL] Fix CTE ID normalization for nested 
CTEs
6a91b1d063a8 is described below

commit 6a91b1d063a8e868830307bb0f0c4e8c076831c4
Author: Puneet Dixit <[email protected]>
AuthorDate: Tue May 26 11:06:01 2026 +0800

    [SPARK-56921][SQL] Fix CTE ID normalization for nested CTEs
    
    ## What changes were proposed in this pull request?
    
    This updates `NormalizeCTEIds` so normalization for the current `WithCTE` 
scope does not cross into nested `WithCTE` nodes. Nested `WithCTE`s are 
normalized separately by the normal top-down traversal, including when they are 
reached through subquery expressions.
    
    A regression test covers the reported temp-view + nested CTE + union 
pattern.
    
    ## Why are the changes needed?
    
    The previous traversal could double-remap refs because `cteIdToNewId` is 
shared across the rule run. When sibling `WithCTE`s sit under a `Union`, the 
left side can map the inner CTE id to a new id, the right side can rewrite the 
same nested ref through that map, and the outer traversal can then rewrite that 
new id again if it is also a key in the map. The second rewrite can point the 
ref at the wrong outer CTE definition, which later lets `InlineCTE` substitute 
the wrong body.
    
    ## Does this PR introduce _any_ user-facing change?
    
    No.
    
    ## How was this patch tested?
    
    - `git diff --check origin/master HEAD -- 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/normalizer/NormalizeCTEIds.scala
 sql/core/src/test/scala/org/apache/spark/sql/CTEInlineSuite.scala`
    - `JAVA_HOME=/opt/homebrew/opt/openjdk17 build/sbt 'sql/testOnly 
*CTEInlineSuiteAEOff *CTEInlineSuiteAEOn -- -z SPARK-56921'`
    
    ## Was this patch authored or co-authored using generative AI tooling?
    
    Generated-by: OpenAI GPT-5
    
    Closes #55985 from puneetdixit200/fix-spark-56921-normalize-nested-cte.
    
    Authored-by: Puneet Dixit 
<[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
    (cherry picked from commit 037e9e6faa43b6fc62af271e5ed1cd12ee4676f2)
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../sql/catalyst/normalizer/NormalizeCTEIds.scala  | 36 ++++++++----
 .../org/apache/spark/sql/CTEInlineSuite.scala      | 67 ++++++++++++++++++++++
 2 files changed, 92 insertions(+), 11 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/normalizer/NormalizeCTEIds.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/normalizer/NormalizeCTEIds.scala
index 6c0bca0e1104..660f11d368ab 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/normalizer/NormalizeCTEIds.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/normalizer/NormalizeCTEIds.scala
@@ -21,6 +21,7 @@ import java.util.concurrent.atomic.AtomicLong
 
 import scala.collection.mutable
 
+import org.apache.spark.sql.catalyst.expressions.SubqueryExpression
 import org.apache.spark.sql.catalyst.plans.logical.{CacheTableAsSelect, 
CTERelationRef, LogicalPlan, UnionLoop, UnionLoopRef, WithCTE}
 import org.apache.spark.sql.catalyst.rules.Rule
 
@@ -52,16 +53,29 @@ object NormalizeCTEIds extends Rule[LogicalPlan] {
 
   private def canonicalizeCTE(
       plan: LogicalPlan,
-      defIdToNewId: mutable.Map[Long, Long]): LogicalPlan = {
-    plan.transformDownWithSubqueries {
-      // For nested WithCTE, if defIndex didn't contain the cteId,
-      // means it's not current WithCTE's ref.
-      case ref: CTERelationRef if defIdToNewId.contains(ref.cteId) =>
-        ref.copy(cteId = defIdToNewId(ref.cteId))
-      case unionLoop: UnionLoop if defIdToNewId.contains(unionLoop.id) =>
-        unionLoop.copy(id = defIdToNewId(unionLoop.id))
-      case unionLoopRef: UnionLoopRef if 
defIdToNewId.contains(unionLoopRef.loopId) =>
-        unionLoopRef.copy(loopId = defIdToNewId(unionLoopRef.loopId))
-    }
+      defIdToNewId: mutable.Map[Long, Long]): LogicalPlan = plan match {
+    // Stop at nested WithCTEs because applyInternal canonicalizes each 
WithCTE scope
+    // independently. Descending here would re-apply the shared cteIdToNewId 
map to
+    // inner-scope refs and, under sibling WithCTEs, move them to the wrong CTE
+    // definition (SPARK-56921).
+    case _: WithCTE => plan
+    case other =>
+      val normalizedPlan = other match {
+        case ref: CTERelationRef if defIdToNewId.contains(ref.cteId) =>
+          ref.copy(cteId = defIdToNewId(ref.cteId))
+        case unionLoop: UnionLoop if defIdToNewId.contains(unionLoop.id) =>
+          unionLoop.copy(id = defIdToNewId(unionLoop.id))
+        case unionLoopRef: UnionLoopRef if 
defIdToNewId.contains(unionLoopRef.loopId) =>
+          unionLoopRef.copy(loopId = defIdToNewId(unionLoopRef.loopId))
+        case _ =>
+          other
+      }
+
+      normalizedPlan
+        .withNewChildren(normalizedPlan.children.map(canonicalizeCTE(_, 
defIdToNewId)))
+        .transformExpressionsDown {
+          case subqueryExpression: SubqueryExpression =>
+            
subqueryExpression.withNewPlan(canonicalizeCTE(subqueryExpression.plan, 
defIdToNewId))
+        }
   }
 }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CTEInlineSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/CTEInlineSuite.scala
index 7562d5669cc2..3833b7f2509d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CTEInlineSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CTEInlineSuite.scala
@@ -261,6 +261,73 @@ abstract class CTEInlineSuiteBase
     }
   }
 
+  test("SPARK-56921: plan normalization handles nested CTEs under union") {
+    withTempView("input", "common") {
+      Seq((1, 1, 10), (1, 2, 20), (2, 1, 30))
+        .toDF("a", "b", "value")
+        .createOrReplaceTempView("input")
+
+      sql(
+        s"""with cte_common as (
+           |  select a, b, sum(value) as value
+           |  from input
+           |  group by a, b
+           |)
+           |select * from cte_common
+         """.stripMargin).createOrReplaceTempView("common")
+
+      val left = sql(
+        s"""with cte_a as (
+           |  select a, sum(value) as value
+           |  from common
+           |  group by a
+           |)
+           |select a as id, value from cte_a
+         """.stripMargin)
+
+      val right = sql(
+        s"""with cte_b as (
+           |  select b, sum(value) as value
+           |  from common
+           |  group by b
+           |)
+           |select b as id, value from cte_b
+         """.stripMargin)
+
+      checkAnswer(
+        left.union(right),
+        Row(1, 30) :: Row(2, 30) :: Row(1, 40) :: Row(2, 20) :: Nil)
+    }
+  }
+
+  test("SPARK-56921: plan normalization preserves recursive CTE loop refs") {
+    val df = sql(
+      s"""with recursive t(n) as (
+         |  select 1
+         |  union all
+         |  select n + 1 from t where n < 3
+         |)
+         |select * from t
+       """.stripMargin)
+
+    val normalized = df.queryExecution.normalized
+    val unionLoops = normalized.collect { case unionLoop: UnionLoop => 
unionLoop }
+
+    assert(unionLoops.nonEmpty, "Recursive CTE should normalize with a 
UnionLoop.")
+    unionLoops.foreach { unionLoop =>
+      val unionLoopRefs = unionLoop.recursion.collect {
+        case unionLoopRef: UnionLoopRef => unionLoopRef
+      }
+
+      assert(unionLoopRefs.nonEmpty, "Recursive CTE should normalize with a 
UnionLoopRef.")
+      assert(
+        unionLoopRefs.forall(_.loopId == unionLoop.id),
+        "UnionLoopRef loop IDs should match the normalized UnionLoop ID.")
+    }
+
+    checkAnswer(df, Row(1) :: Row(2) :: Row(3) :: Nil)
+  }
+
   test("SPARK-36447: invalid nested CTEs") {
     withTempView("t") {
       Seq((0, 1), (1, 2)).toDF("c1", "c2").createOrReplaceTempView("t")


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to