This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 2e54d68 [SPARK-34547][SQL] Only use metadata columns for resolution as last resort 2e54d68 is described below commit 2e54d68eb94cf39b59166f2b1bbb8f6c317760b8 Author: Karen Feng <karen.f...@databricks.com> AuthorDate: Tue Mar 2 17:27:13 2021 +0800 [SPARK-34547][SQL] Only use metadata columns for resolution as last resort ### What changes were proposed in this pull request? Today, child expressions may be resolved based on "real" or metadata output attributes. We should prefer the real attribute during resolution if one exists. ### Why are the changes needed? Today, attempting to resolve an expression when there is a "real" output attribute and a metadata attribute with the same name results in resolution failure. This is likely unexpected, as the user may not know about the metadata attribute. ### Does this PR introduce _any_ user-facing change? Yes. Previously, the user would see an error message when resolving a column with the same name as a "real" output attribute and a metadata attribute as below: ``` org.apache.spark.sql.AnalysisException: Reference 'index' is ambiguous, could be: testcat.ns1.ns2.tableTwo.index, testcat.ns1.ns2.tableOne.index.; line 1 pos 71 at org.apache.spark.sql.catalyst.expressions.package$AttributeSeq.resolve(package.scala:363) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveChildren(LogicalPlan.scala:107) ``` Now, resolution succeeds and provides the "real" output attribute. ### How was this patch tested? Added a unit test. Closes #31654 from karenfeng/fallback-resolve-metadata. Authored-by: Karen Feng <karen.f...@databricks.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../sql/catalyst/plans/logical/LogicalPlan.scala | 6 +++-- .../spark/sql/connector/DataSourceV2SQLSuite.scala | 31 ++++++++++++++++++++++ 2 files changed, 35 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala index 89d65e1..ceb9726 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala @@ -89,8 +89,9 @@ abstract class LogicalPlan } } - private[this] lazy val childAttributes = - AttributeSeq(children.flatMap(c => c.output ++ c.metadataOutput)) + private[this] lazy val childAttributes = AttributeSeq(children.flatMap(_.output)) + + private[this] lazy val childMetadataAttributes = AttributeSeq(children.flatMap(_.metadataOutput)) private[this] lazy val outputAttributes = AttributeSeq(output) @@ -103,6 +104,7 @@ abstract class LogicalPlan nameParts: Seq[String], resolver: Resolver): Option[NamedExpression] = childAttributes.resolve(nameParts, resolver) + .orElse(childMetadataAttributes.resolve(nameParts, resolver)) /** * Optionally resolves the given strings to a [[NamedExpression]] based on the output of this diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala index ca4dff8..e051c2d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala @@ -2492,6 +2492,37 @@ class DataSourceV2SQLSuite } } + test("SPARK-34547: metadata columns are resolved last") { + val t1 = s"${catalogAndNamespace}tableOne" + val t2 = "t2" + withTable(t1) { + sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format " + + "PARTITIONED BY (bucket(4, id), id)") + sql(s"INSERT INTO $t1 VALUES (1, 'a'), (2, 'b'), (3, 'c')") + withTempView(t2) { + sql(s"CREATE TEMPORARY VIEW $t2 AS SELECT * FROM " + + s"VALUES (1, -1), (2, -2), (3, -3) AS $t2(id, index)") + + val sqlQuery = spark.sql(s"SELECT $t1.id, $t2.id, data, index, $t1.index, $t2.index FROM " + + s"$t1 JOIN $t2 WHERE $t1.id = $t2.id") + val t1Table = spark.table(t1) + val t2Table = spark.table(t2) + val dfQuery = t1Table.join(t2Table, t1Table.col("id") === t2Table.col("id")) + .select(s"$t1.id", s"$t2.id", "data", "index", s"$t1.index", s"$t2.index") + + Seq(sqlQuery, dfQuery).foreach { query => + checkAnswer(query, + Seq( + Row(1, 1, "a", -1, 0, -1), + Row(2, 2, "b", -2, 0, -2), + Row(3, 3, "c", -3, 0, -3) + ) + ) + } + } + } + } + test("SPARK-33505: insert into partitioned table") { val t = "testpart.ns1.ns2.tbl" withTable(t) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org