This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 2e54d68  [SPARK-34547][SQL] Only use metadata columns for resolution 
as last resort
2e54d68 is described below

commit 2e54d68eb94cf39b59166f2b1bbb8f6c317760b8
Author: Karen Feng <karen.f...@databricks.com>
AuthorDate: Tue Mar 2 17:27:13 2021 +0800

    [SPARK-34547][SQL] Only use metadata columns for resolution as last resort
    
    ### What changes were proposed in this pull request?
    
    Today, child expressions may be resolved based on "real" or metadata output 
attributes. We should prefer the real attribute during resolution if one exists.
    
    ### Why are the changes needed?
    
    Today, attempting to resolve an expression when there is a "real" output 
attribute and a metadata attribute with the same name results in resolution 
failure. This is likely unexpected, as the user may not know about the metadata 
attribute.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes. Previously, the user would see an error message when resolving a 
column with the same name as a "real" output attribute and a metadata attribute 
as below:
    ```
    org.apache.spark.sql.AnalysisException: Reference 'index' is ambiguous, 
could be: testcat.ns1.ns2.tableTwo.index, testcat.ns1.ns2.tableOne.index.; line 
1 pos 71
    at 
org.apache.spark.sql.catalyst.expressions.package$AttributeSeq.resolve(package.scala:363)
    at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveChildren(LogicalPlan.scala:107)
    ```
    
    Now, resolution succeeds and provides the "real" output attribute.
    
    ### How was this patch tested?
    
    Added a unit test.
    
    Closes #31654 from karenfeng/fallback-resolve-metadata.
    
    Authored-by: Karen Feng <karen.f...@databricks.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../sql/catalyst/plans/logical/LogicalPlan.scala   |  6 +++--
 .../spark/sql/connector/DataSourceV2SQLSuite.scala | 31 ++++++++++++++++++++++
 2 files changed, 35 insertions(+), 2 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
index 89d65e1..ceb9726 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
@@ -89,8 +89,9 @@ abstract class LogicalPlan
     }
   }
 
-  private[this] lazy val childAttributes =
-    AttributeSeq(children.flatMap(c => c.output ++ c.metadataOutput))
+  private[this] lazy val childAttributes = 
AttributeSeq(children.flatMap(_.output))
+
+  private[this] lazy val childMetadataAttributes = 
AttributeSeq(children.flatMap(_.metadataOutput))
 
   private[this] lazy val outputAttributes = AttributeSeq(output)
 
@@ -103,6 +104,7 @@ abstract class LogicalPlan
       nameParts: Seq[String],
       resolver: Resolver): Option[NamedExpression] =
     childAttributes.resolve(nameParts, resolver)
+      .orElse(childMetadataAttributes.resolve(nameParts, resolver))
 
   /**
    * Optionally resolves the given strings to a [[NamedExpression]] based on 
the output of this
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
index ca4dff8..e051c2d 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
@@ -2492,6 +2492,37 @@ class DataSourceV2SQLSuite
     }
   }
 
+  test("SPARK-34547: metadata columns are resolved last") {
+    val t1 = s"${catalogAndNamespace}tableOne"
+    val t2 = "t2"
+    withTable(t1) {
+      sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format " +
+        "PARTITIONED BY (bucket(4, id), id)")
+      sql(s"INSERT INTO $t1 VALUES (1, 'a'), (2, 'b'), (3, 'c')")
+      withTempView(t2) {
+        sql(s"CREATE TEMPORARY VIEW $t2 AS SELECT * FROM " +
+          s"VALUES (1, -1), (2, -2), (3, -3) AS $t2(id, index)")
+
+        val sqlQuery = spark.sql(s"SELECT $t1.id, $t2.id, data, index, 
$t1.index, $t2.index FROM " +
+          s"$t1 JOIN $t2 WHERE $t1.id = $t2.id")
+        val t1Table = spark.table(t1)
+        val t2Table = spark.table(t2)
+        val dfQuery = t1Table.join(t2Table, t1Table.col("id") === 
t2Table.col("id"))
+          .select(s"$t1.id", s"$t2.id", "data", "index", s"$t1.index", 
s"$t2.index")
+
+        Seq(sqlQuery, dfQuery).foreach { query =>
+          checkAnswer(query,
+            Seq(
+              Row(1, 1, "a", -1, 0, -1),
+              Row(2, 2, "b", -2, 0, -2),
+              Row(3, 3, "c", -3, 0, -3)
+            )
+          )
+        }
+      }
+    }
+  }
+
   test("SPARK-33505: insert into partitioned table") {
     val t = "testpart.ns1.ns2.tbl"
     withTable(t) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to