This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
     new ce58e05  [SPARK-34719][SQL][2.4] Correctly resolve the view query with 
duplicated column names
ce58e05 is described below

commit ce58e05714591e11d851f3604ade190fe550a8d5
Author: Wenchen Fan <wenc...@databricks.com>
AuthorDate: Sat Mar 20 11:09:50 2021 +0900

    [SPARK-34719][SQL][2.4] Correctly resolve the view query with duplicated 
column names
    
    backport https://github.com/apache/spark/pull/31811 to 2.4
    
    For permanent views (and the new SQL temp view in Spark 3.1), we store the 
view SQL text and re-parse/analyze the view SQL text when reading the view. In 
the case of `SELECT * FROM ...`, we want to avoid view schema change (e.g. the 
referenced table changes its schema) and will record the view query output 
column names when creating the view, so that when reading the view we can add a 
`SELECT recorded_column_names FROM ...` to retain the original view query 
schema.
    
    In Spark 3.1 and before, the final SELECT is added after the analysis 
phase: 
https://github.com/apache/spark/blob/branch-3.1/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/view.scala#L67
    
    If the view query has duplicated output column names, we always pick the 
first column when reading a view. A simple repro:
    ```
    scala> sql("create view c(x, y) as select 1 a, 2 a")
    res0: org.apache.spark.sql.DataFrame = []
    
    scala> sql("select * from c").show
    +---+---+
    |  x|  y|
    +---+---+
    |  1|  1|
    +---+---+
    ```
    
    In the master branch, we will fail at the view reading time due to 
https://github.com/apache/spark/commit/b891862fb6b740b103d5a09530626ee4e0e8f6e3 
, which adds the final SELECT during analysis, so that the query fails with 
`Reference 'a' is ambiguous`
    
    This PR proposes to resolve the view query output column names from the 
matching attributes by ordinal.
    
    For example,  `create view c(x, y) as select 1 a, 2 a`, the view query 
output column names are `[a, a]`. When we reading the view, there are 2 
matching attributes (e.g.`[a#1, a#2]`) and we can simply match them by ordinal.
    
    A negative example is
    ```
    create table t(a int)
    create view v as select *, 1 as col from t
    replace table t(a int, col int)
    ```
    When reading the view, the view query output column names are `[a, col]`, 
and there are two matching attributes of `col`, and we should fail the query. 
See the tests for details.
    
    bug fix
    
    yes
    
    new test
    
    Closes #31894 from cloud-fan/backport.
    
    Authored-by: Wenchen Fan <wenc...@databricks.com>
    Signed-off-by: Takeshi Yamamuro <yamam...@apache.org>
---
 .../apache/spark/sql/catalyst/analysis/view.scala  | 44 ++++++++++++++++++---
 .../apache/spark/sql/execution/SQLViewSuite.scala  | 46 ++++++++++++++++++++++
 2 files changed, 84 insertions(+), 6 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/view.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/view.scala
index 6a94f51..44f4a63 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/view.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/view.scala
@@ -17,7 +17,10 @@
 
 package org.apache.spark.sql.catalyst.analysis
 
-import org.apache.spark.sql.catalyst.expressions.Alias
+import java.util.Locale
+
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute}
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, View}
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.internal.SQLConf
@@ -60,15 +63,44 @@ object EliminateView extends Rule[LogicalPlan] with 
CastSupport {
     // The child has the different output attributes with the View operator. 
Adds a Project over
     // the child of the view.
     case v @ View(desc, output, child) if child.resolved && output != 
child.output =>
+      // Use the stored view query output column names to find the matching 
attributes. The column
+      // names may have duplication, e.g. `CREATE VIEW v(x, y) AS SELECT 1 
col, 2 col`. We need to
+      // make sure the that matching attributes have the same number of 
duplications, and pick the
+      // corresponding attribute by ordinal.
       val resolver = conf.resolver
       val queryColumnNames = desc.viewQueryColumnNames
       val queryOutput = if (queryColumnNames.nonEmpty) {
-        // Find the attribute that has the expected attribute name from an 
attribute list, the names
-        // are compared using conf.resolver.
-        // `CheckAnalysis` already guarantees the expected attribute can be 
found for sure.
-        desc.viewQueryColumnNames.map { colName =>
-          child.output.find(attr => resolver(attr.name, colName)).get
+        val normalizeColName: String => String = if 
(conf.caseSensitiveAnalysis) {
+          identity
+        } else {
+          _.toLowerCase(Locale.ROOT)
+        }
+        val nameToCounts = scala.collection.mutable.HashMap.empty[String, Int]
+        val nameToMatchedCols = scala.collection.mutable.HashMap.empty[String, 
Seq[Attribute]]
+
+        val outputAttrs = queryColumnNames.map { colName =>
+          val normalized = normalizeColName(colName)
+          val count = nameToCounts.getOrElse(normalized, 0)
+          val matchedCols = nameToMatchedCols.getOrElseUpdate(
+            normalized, child.output.filter(attr => resolver(attr.name, 
colName)))
+          if (matchedCols.length - 1 < count) {
+            throw new AnalysisException(s"The SQL query of view 
${desc.identifier} has an " +
+              s"incompatible schema change and column $colName cannot be 
resolved. Expect " +
+              s"more attributes named $colName in ${child.output.mkString("[", 
",", "]")}")
+          }
+          nameToCounts(normalized) = count + 1
+          matchedCols(count)
         }
+
+        nameToCounts.foreach { case (colName, count) =>
+          if (count > 1 && nameToMatchedCols(colName).length != count) {
+            throw new AnalysisException(s"The SQL query of view 
${desc.identifier} has an " +
+              s"incompatible schema change and column $colName cannot be 
resolved. Expect " +
+              s"less attributes named $colName in ${child.output.mkString("[", 
",", "]")}")
+          }
+        }
+
+        outputAttrs
       } else {
         // For view created before Spark 2.2.0, the view text is already fully 
qualified, the plan
         // output is the same with the view output.
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala
index 5eb2e31..0ce5d59 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.execution
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
+import org.apache.spark.sql.internal.SQLConf.{CASE_SENSITIVE, 
CROSS_JOINS_ENABLED}
 import org.apache.spark.sql.test.{SharedSQLContext, SQLTestUtils}
 
 class SimpleSQLViewSuite extends SQLViewSuite with SharedSQLContext
@@ -702,6 +703,7 @@ abstract class SQLViewSuite extends QueryTest with 
SQLTestUtils {
           sql("CREATE DATABASE IF NOT EXISTS db2")
           sql("USE db2")
           checkAnswer(spark.table("default.v1"), Row(1))
+          sql("USE default")
         }
       }
     }
@@ -733,4 +735,48 @@ abstract class SQLViewSuite extends QueryTest with 
SQLTestUtils {
       checkAnswer(sql(s"SELECT * FROM $globalTempDB.testView"), Row(2))
     }
   }
+
+  test("SPARK-34719: view query with duplicated output column names") {
+    Seq(true, false).foreach { caseSensitive =>
+      withSQLConf(
+        CASE_SENSITIVE.key -> caseSensitive.toString,
+        CROSS_JOINS_ENABLED.key -> "true") {
+        withView("v1", "v2") {
+          sql("CREATE VIEW v1 AS SELECT 1 a, 2 b")
+          sql("CREATE VIEW v2 AS SELECT 1 col")
+
+          sql("CREATE VIEW testView(c1, c2, c3, c4) AS SELECT *, 1 col, 2 col 
FROM v1")
+          withView("testView") {
+            checkAnswer(spark.table("testView"), Seq(Row(1, 2, 1, 2)))
+
+            // One more duplicated column `COL` if caseSensitive=false.
+            sql("CREATE OR REPLACE VIEW v1 AS SELECT 1 a, 2 b, 3 COL")
+            if (caseSensitive) {
+              checkAnswer(spark.table("testView"), Seq(Row(1, 2, 1, 2)))
+            } else {
+              val e = 
intercept[AnalysisException](spark.table("testView").collect())
+              assert(e.message.contains("incompatible schema change"))
+            }
+          }
+
+          // v1 has 3 columns [a, b, COL], v2 has one column [col], so 
`testView2` has duplicated
+          // output column names if caseSensitive=false.
+          sql("CREATE VIEW testView2(c1, c2, c3, c4) AS SELECT * FROM v1, v2")
+          withView("testView2") {
+            checkAnswer(spark.table("testView2"), Seq(Row(1, 2, 3, 1)))
+
+            // One less duplicated column if caseSensitive=false.
+            sql("CREATE OR REPLACE VIEW v1 AS SELECT 1 a, 2 b")
+            if (caseSensitive) {
+              val e = 
intercept[AnalysisException](spark.table("testView2").collect())
+              assert(e.message.contains("'COL' is not found in '(a,b,col)'"))
+            } else {
+              val e = 
intercept[AnalysisException](spark.table("testView2").collect())
+              assert(e.message.contains("incompatible schema change"))
+            }
+          }
+        }
+      }
+    }
+  }
 }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to