This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-2.4 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-2.4 by this push: new ce58e05 [SPARK-34719][SQL][2.4] Correctly resolve the view query with duplicated column names ce58e05 is described below commit ce58e05714591e11d851f3604ade190fe550a8d5 Author: Wenchen Fan <wenc...@databricks.com> AuthorDate: Sat Mar 20 11:09:50 2021 +0900 [SPARK-34719][SQL][2.4] Correctly resolve the view query with duplicated column names backport https://github.com/apache/spark/pull/31811 to 2.4 For permanent views (and the new SQL temp view in Spark 3.1), we store the view SQL text and re-parse/analyze the view SQL text when reading the view. In the case of `SELECT * FROM ...`, we want to avoid view schema change (e.g. the referenced table changes its schema) and will record the view query output column names when creating the view, so that when reading the view we can add a `SELECT recorded_column_names FROM ...` to retain the original view query schema. In Spark 3.1 and before, the final SELECT is added after the analysis phase: https://github.com/apache/spark/blob/branch-3.1/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/view.scala#L67 If the view query has duplicated output column names, we always pick the first column when reading a view. A simple repro: ``` scala> sql("create view c(x, y) as select 1 a, 2 a") res0: org.apache.spark.sql.DataFrame = [] scala> sql("select * from c").show +---+---+ | x| y| +---+---+ | 1| 1| +---+---+ ``` In the master branch, we will fail at the view reading time due to https://github.com/apache/spark/commit/b891862fb6b740b103d5a09530626ee4e0e8f6e3 , which adds the final SELECT during analysis, so that the query fails with `Reference 'a' is ambiguous` This PR proposes to resolve the view query output column names from the matching attributes by ordinal. For example, `create view c(x, y) as select 1 a, 2 a`, the view query output column names are `[a, a]`. When we reading the view, there are 2 matching attributes (e.g.`[a#1, a#2]`) and we can simply match them by ordinal. A negative example is ``` create table t(a int) create view v as select *, 1 as col from t replace table t(a int, col int) ``` When reading the view, the view query output column names are `[a, col]`, and there are two matching attributes of `col`, and we should fail the query. See the tests for details. bug fix yes new test Closes #31894 from cloud-fan/backport. Authored-by: Wenchen Fan <wenc...@databricks.com> Signed-off-by: Takeshi Yamamuro <yamam...@apache.org> --- .../apache/spark/sql/catalyst/analysis/view.scala | 44 ++++++++++++++++++--- .../apache/spark/sql/execution/SQLViewSuite.scala | 46 ++++++++++++++++++++++ 2 files changed, 84 insertions(+), 6 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/view.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/view.scala index 6a94f51..44f4a63 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/view.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/view.scala @@ -17,7 +17,10 @@ package org.apache.spark.sql.catalyst.analysis -import org.apache.spark.sql.catalyst.expressions.Alias +import java.util.Locale + +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute} import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, View} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.internal.SQLConf @@ -60,15 +63,44 @@ object EliminateView extends Rule[LogicalPlan] with CastSupport { // The child has the different output attributes with the View operator. Adds a Project over // the child of the view. case v @ View(desc, output, child) if child.resolved && output != child.output => + // Use the stored view query output column names to find the matching attributes. The column + // names may have duplication, e.g. `CREATE VIEW v(x, y) AS SELECT 1 col, 2 col`. We need to + // make sure the that matching attributes have the same number of duplications, and pick the + // corresponding attribute by ordinal. val resolver = conf.resolver val queryColumnNames = desc.viewQueryColumnNames val queryOutput = if (queryColumnNames.nonEmpty) { - // Find the attribute that has the expected attribute name from an attribute list, the names - // are compared using conf.resolver. - // `CheckAnalysis` already guarantees the expected attribute can be found for sure. - desc.viewQueryColumnNames.map { colName => - child.output.find(attr => resolver(attr.name, colName)).get + val normalizeColName: String => String = if (conf.caseSensitiveAnalysis) { + identity + } else { + _.toLowerCase(Locale.ROOT) + } + val nameToCounts = scala.collection.mutable.HashMap.empty[String, Int] + val nameToMatchedCols = scala.collection.mutable.HashMap.empty[String, Seq[Attribute]] + + val outputAttrs = queryColumnNames.map { colName => + val normalized = normalizeColName(colName) + val count = nameToCounts.getOrElse(normalized, 0) + val matchedCols = nameToMatchedCols.getOrElseUpdate( + normalized, child.output.filter(attr => resolver(attr.name, colName))) + if (matchedCols.length - 1 < count) { + throw new AnalysisException(s"The SQL query of view ${desc.identifier} has an " + + s"incompatible schema change and column $colName cannot be resolved. Expect " + + s"more attributes named $colName in ${child.output.mkString("[", ",", "]")}") + } + nameToCounts(normalized) = count + 1 + matchedCols(count) } + + nameToCounts.foreach { case (colName, count) => + if (count > 1 && nameToMatchedCols(colName).length != count) { + throw new AnalysisException(s"The SQL query of view ${desc.identifier} has an " + + s"incompatible schema change and column $colName cannot be resolved. Expect " + + s"less attributes named $colName in ${child.output.mkString("[", ",", "]")}") + } + } + + outputAttrs } else { // For view created before Spark 2.2.0, the view text is already fully qualified, the plan // output is the same with the view output. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala index 5eb2e31..0ce5d59 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.execution import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.NoSuchTableException +import org.apache.spark.sql.internal.SQLConf.{CASE_SENSITIVE, CROSS_JOINS_ENABLED} import org.apache.spark.sql.test.{SharedSQLContext, SQLTestUtils} class SimpleSQLViewSuite extends SQLViewSuite with SharedSQLContext @@ -702,6 +703,7 @@ abstract class SQLViewSuite extends QueryTest with SQLTestUtils { sql("CREATE DATABASE IF NOT EXISTS db2") sql("USE db2") checkAnswer(spark.table("default.v1"), Row(1)) + sql("USE default") } } } @@ -733,4 +735,48 @@ abstract class SQLViewSuite extends QueryTest with SQLTestUtils { checkAnswer(sql(s"SELECT * FROM $globalTempDB.testView"), Row(2)) } } + + test("SPARK-34719: view query with duplicated output column names") { + Seq(true, false).foreach { caseSensitive => + withSQLConf( + CASE_SENSITIVE.key -> caseSensitive.toString, + CROSS_JOINS_ENABLED.key -> "true") { + withView("v1", "v2") { + sql("CREATE VIEW v1 AS SELECT 1 a, 2 b") + sql("CREATE VIEW v2 AS SELECT 1 col") + + sql("CREATE VIEW testView(c1, c2, c3, c4) AS SELECT *, 1 col, 2 col FROM v1") + withView("testView") { + checkAnswer(spark.table("testView"), Seq(Row(1, 2, 1, 2))) + + // One more duplicated column `COL` if caseSensitive=false. + sql("CREATE OR REPLACE VIEW v1 AS SELECT 1 a, 2 b, 3 COL") + if (caseSensitive) { + checkAnswer(spark.table("testView"), Seq(Row(1, 2, 1, 2))) + } else { + val e = intercept[AnalysisException](spark.table("testView").collect()) + assert(e.message.contains("incompatible schema change")) + } + } + + // v1 has 3 columns [a, b, COL], v2 has one column [col], so `testView2` has duplicated + // output column names if caseSensitive=false. + sql("CREATE VIEW testView2(c1, c2, c3, c4) AS SELECT * FROM v1, v2") + withView("testView2") { + checkAnswer(spark.table("testView2"), Seq(Row(1, 2, 3, 1))) + + // One less duplicated column if caseSensitive=false. + sql("CREATE OR REPLACE VIEW v1 AS SELECT 1 a, 2 b") + if (caseSensitive) { + val e = intercept[AnalysisException](spark.table("testView2").collect()) + assert(e.message.contains("'COL' is not found in '(a,b,col)'")) + } else { + val e = intercept[AnalysisException](spark.table("testView2").collect()) + assert(e.message.contains("incompatible schema change")) + } + } + } + } + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org