This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 844222231a6 [SPARK-43030][SQL][FOLLOWUP] CTE ref should keep the 
output attributes duplicated when renew
844222231a6 is described below

commit 844222231a6e4631afca22b3a4763eabeeef1398
Author: Wenchen Fan <wenc...@databricks.com>
AuthorDate: Tue May 30 13:51:38 2023 +0800

    [SPARK-43030][SQL][FOLLOWUP] CTE ref should keep the output attributes 
duplicated when renew
    
    ### What changes were proposed in this pull request?
    
    This is a followup of https://github.com/apache/spark/pull/40662 to fix a 
regression.
    
    `CTERelationRef` inherits the output attributes from a query, which may 
contain duplicated attributes, for queries like SELECT a, a FROM t. It's 
important to keep the duplicated attributes to have the same id in the new 
instance, as column resolution allows more than one matching attribute if their 
ids are the same.
    
    For example, `Project('a, CTERelationRef(a#1, a#1))` can be resolved 
properly as the matching attributes a have the same id, but `Project('a, 
CTERelationRef(a#2, a#3))` can't be resolved.
    
    ### Why are the changes needed?
    
    bug fix
    
    ### Does this PR introduce _any_ user-facing change?
    
    the bug is not released yet.
    
    ### How was this patch tested?
    
    new test
    
    Closes #41363 from cloud-fan/fix.
    
    Authored-by: Wenchen Fan <wenc...@databricks.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../sql/catalyst/plans/logical/basicLogicalOperators.scala   | 12 +++++++++++-
 .../apache/spark/sql/catalyst/analysis/AnalysisSuite.scala   | 11 +++++++++++
 2 files changed, 22 insertions(+), 1 deletion(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
index ceed7b0cc54..4bde26a7d6e 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
@@ -844,7 +844,17 @@ case class CTERelationRef(
 
   override lazy val resolved: Boolean = _resolved
 
-  override def newInstance(): LogicalPlan = copy(output = 
output.map(_.newInstance()))
+  override def newInstance(): LogicalPlan = {
+    // CTERelationRef inherits the output attributes from a query, which may 
contain duplicated
+    // attributes, for queries like `SELECT a, a FROM t`. It's important to 
keep the duplicated
+    // attributes to have the same id in the new instance, as column 
resolution allows more than one
+    // matching attributes if their ids are the same.
+    // For example, `Project('a, CTERelationRef(a#1, a#1))` can be resolved 
properly as the matching
+    // attributes `a` have the same id, but `Project('a, CTERelationRef(a#2, 
a#3))` can't be
+    // resolved.
+    val oldAttrToNewAttr = 
AttributeMap(output.zip(output.map(_.newInstance())))
+    copy(output = output.map(attr => oldAttrToNewAttr(attr)))
+  }
 
   def withNewStats(statsOpt: Option[Statistics]): CTERelationRef = 
copy(statsOpt = statsOpt)
 
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
index 1029f7f8fab..e1050e91e59 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
@@ -1500,6 +1500,17 @@ class AnalysisSuite extends AnalysisTest with Matchers {
       assert(refs.map(_.output).distinct.length == 2)
     }
 
+    withClue("CTE relation has duplicated attributes") {
+      val cteDef = CTERelationDef(testRelation.select($"a", $"a"))
+      val cteRef = CTERelationRef(cteDef.id, false, Nil)
+      val plan = WithCTE(cteRef.join(cteRef.select($"a")), Seq(cteDef)).analyze
+      val refs = plan.collect {
+        case r: CTERelationRef => r
+      }
+      assert(refs.length == 2)
+      assert(refs.map(_.output).distinct.length == 2)
+    }
+
     withClue("references in both CTE relation definition and main query") {
       val cteDef2 = CTERelationDef(cteRef.where($"a" > 2))
       val cteRef2 = CTERelationRef(cteDef2.id, false, Nil)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to