This is an automated email from the ASF dual-hosted git repository. yamamuro pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new 25d7219 [SPARK-34719][SQL][3.0] Correctly resolve the view query with duplicated column names 25d7219 is described below commit 25d72191de7c842aa2acd4b7307ba8e6585dd182 Author: Wenchen Fan <wenc...@databricks.com> AuthorDate: Sat Mar 20 11:09:50 2021 +0900 [SPARK-34719][SQL][3.0] Correctly resolve the view query with duplicated column names backport https://github.com/apache/spark/pull/31811 to 3.0 ### What changes were proposed in this pull request? For permanent views (and the new SQL temp view in Spark 3.1), we store the view SQL text and re-parse/analyze the view SQL text when reading the view. In the case of `SELECT * FROM ...`, we want to avoid view schema change (e.g. the referenced table changes its schema) and will record the view query output column names when creating the view, so that when reading the view we can add a `SELECT recorded_column_names FROM ...` to retain the original view query schema. In Spark 3.1 and before, the final SELECT is added after the analysis phase: https://github.com/apache/spark/blob/branch-3.1/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/view.scala#L67 If the view query has duplicated output column names, we always pick the first column when reading a view. A simple repro: ``` scala> sql("create view c(x, y) as select 1 a, 2 a") res0: org.apache.spark.sql.DataFrame = [] scala> sql("select * from c").show +---+---+ | x| y| +---+---+ | 1| 1| +---+---+ ``` In the master branch, we will fail at the view reading time due to https://github.com/apache/spark/commit/b891862fb6b740b103d5a09530626ee4e0e8f6e3 , which adds the final SELECT during analysis, so that the query fails with `Reference 'a' is ambiguous` This PR proposes to resolve the view query output column names from the matching attributes by ordinal. For example, `create view c(x, y) as select 1 a, 2 a`, the view query output column names are `[a, a]`. When we reading the view, there are 2 matching attributes (e.g.`[a#1, a#2]`) and we can simply match them by ordinal. A negative example is ``` create table t(a int) create view v as select *, 1 as col from t replace table t(a int, col int) ``` When reading the view, the view query output column names are `[a, col]`, and there are two matching attributes of `col`, and we should fail the query. See the tests for details. ### Why are the changes needed? bug fix ### Does this PR introduce _any_ user-facing change? yes ### How was this patch tested? new test Closes #31894 from cloud-fan/backport. Authored-by: Wenchen Fan <wenc...@databricks.com> Signed-off-by: Takeshi Yamamuro <yamam...@apache.org> --- .../apache/spark/sql/catalyst/analysis/view.scala | 44 ++++++++++++++++++--- .../apache/spark/sql/execution/SQLViewSuite.scala | 45 +++++++++++++++++++++- 2 files changed, 82 insertions(+), 7 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/view.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/view.scala index 6560164..013a303 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/view.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/view.scala @@ -17,7 +17,10 @@ package org.apache.spark.sql.catalyst.analysis -import org.apache.spark.sql.catalyst.expressions.Alias +import java.util.Locale + +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute} import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, View} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.internal.SQLConf @@ -60,15 +63,44 @@ object EliminateView extends Rule[LogicalPlan] with CastSupport { // The child has the different output attributes with the View operator. Adds a Project over // the child of the view. case v @ View(desc, output, child) if child.resolved && !v.sameOutput(child) => + // Use the stored view query output column names to find the matching attributes. The column + // names may have duplication, e.g. `CREATE VIEW v(x, y) AS SELECT 1 col, 2 col`. We need to + // make sure the that matching attributes have the same number of duplications, and pick the + // corresponding attribute by ordinal. val resolver = conf.resolver val queryColumnNames = desc.viewQueryColumnNames val queryOutput = if (queryColumnNames.nonEmpty) { - // Find the attribute that has the expected attribute name from an attribute list, the names - // are compared using conf.resolver. - // `CheckAnalysis` already guarantees the expected attribute can be found for sure. - desc.viewQueryColumnNames.map { colName => - child.output.find(attr => resolver(attr.name, colName)).get + val normalizeColName: String => String = if (conf.caseSensitiveAnalysis) { + identity + } else { + _.toLowerCase(Locale.ROOT) + } + val nameToCounts = scala.collection.mutable.HashMap.empty[String, Int] + val nameToMatchedCols = scala.collection.mutable.HashMap.empty[String, Seq[Attribute]] + + val outputAttrs = queryColumnNames.map { colName => + val normalized = normalizeColName(colName) + val count = nameToCounts.getOrElse(normalized, 0) + val matchedCols = nameToMatchedCols.getOrElseUpdate( + normalized, child.output.filter(attr => resolver(attr.name, colName))) + if (matchedCols.length - 1 < count) { + throw new AnalysisException(s"The SQL query of view ${desc.identifier} has an " + + s"incompatible schema change and column $colName cannot be resolved. Expect " + + s"more attributes named $colName in ${child.output.mkString("[", ",", "]")}") + } + nameToCounts(normalized) = count + 1 + matchedCols(count) } + + nameToCounts.foreach { case (colName, count) => + if (count > 1 && nameToMatchedCols(colName).length != count) { + throw new AnalysisException(s"The SQL query of view ${desc.identifier} has an " + + s"incompatible schema change and column $colName cannot be resolved. Expect " + + s"less attributes named $colName in ${child.output.mkString("[", ",", "]")}") + } + } + + outputAttrs } else { // For view created before Spark 2.2.0, the view text is already fully qualified, the plan // output is the same with the view output. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala index 0a0c3f5..68d9460 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.execution import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.NoSuchTableException -import org.apache.spark.sql.internal.SQLConf.MAX_NESTED_VIEW_DEPTH +import org.apache.spark.sql.internal.SQLConf.{CASE_SENSITIVE, MAX_NESTED_VIEW_DEPTH} import org.apache.spark.sql.test.{SharedSparkSession, SQLTestUtils} class SimpleSQLViewSuite extends SQLViewSuite with SharedSparkSession @@ -721,6 +721,7 @@ abstract class SQLViewSuite extends QueryTest with SQLTestUtils { sql("CREATE DATABASE IF NOT EXISTS db2") sql("USE db2") checkAnswer(spark.table("default.v1"), Row(1)) + sql("USE default") } } } @@ -752,4 +753,46 @@ abstract class SQLViewSuite extends QueryTest with SQLTestUtils { checkAnswer(sql(s"SELECT * FROM $globalTempDB.testView"), Row(2)) } } + + test("SPARK-34719: view query with duplicated output column names") { + Seq(true, false).foreach { caseSensitive => + withSQLConf(CASE_SENSITIVE.key -> caseSensitive.toString) { + withView("v1", "v2") { + sql("CREATE VIEW v1 AS SELECT 1 a, 2 b") + sql("CREATE VIEW v2 AS SELECT 1 col") + + sql("CREATE VIEW testView(c1, c2, c3, c4) AS SELECT *, 1 col, 2 col FROM v1") + withView("testView") { + checkAnswer(spark.table("testView"), Seq(Row(1, 2, 1, 2))) + + // One more duplicated column `COL` if caseSensitive=false. + sql("CREATE OR REPLACE VIEW v1 AS SELECT 1 a, 2 b, 3 COL") + if (caseSensitive) { + checkAnswer(spark.table("testView"), Seq(Row(1, 2, 1, 2))) + } else { + val e = intercept[AnalysisException](spark.table("testView").collect()) + assert(e.message.contains("incompatible schema change")) + } + } + + // v1 has 3 columns [a, b, COL], v2 has one column [col], so `testView2` has duplicated + // output column names if caseSensitive=false. + sql("CREATE VIEW testView2(c1, c2, c3, c4) AS SELECT * FROM v1, v2") + withView("testView2") { + checkAnswer(spark.table("testView2"), Seq(Row(1, 2, 3, 1))) + + // One less duplicated column if caseSensitive=false. + sql("CREATE OR REPLACE VIEW v1 AS SELECT 1 a, 2 b") + if (caseSensitive) { + val e = intercept[AnalysisException](spark.table("testView2").collect()) + assert(e.message.contains("'COL' is not found in '(a,b,col)'")) + } else { + val e = intercept[AnalysisException](spark.table("testView2").collect()) + assert(e.message.contains("incompatible schema change")) + } + } + } + } + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org