Repository: spark Updated Branches: refs/heads/master e164a04b2 -> 3fc456694
[SPARK-16678][SPARK-16677][SQL] Fix two View-related bugs ## What changes were proposed in this pull request? **Issue 1: Disallow Creating/Altering a View when the same-name Table Exists (without IF NOT EXISTS)** When we create OR alter a view, we check whether the view already exists. In the current implementation, if a table with the same name exists, we treat it as a view. However, this is not the right behavior. We should follow what Hive does. For example, ``` hive> CREATE TABLE tab1 (id int); OK Time taken: 0.196 seconds hive> CREATE OR REPLACE VIEW tab1 AS SELECT * FROM t1; FAILED: SemanticException [Error 10218]: Existing table is not a view The following is an existing table, not a view: default.tab1 hive> ALTER VIEW tab1 AS SELECT * FROM t1; FAILED: SemanticException [Error 10218]: Existing table is not a view The following is an existing table, not a view: default.tab1 hive> CREATE VIEW IF NOT EXISTS tab1 AS SELECT * FROM t1; OK Time taken: 0.678 seconds ``` **Issue 2: Strange Error when Issuing Load Table Against A View** Users should not be allowed to issue LOAD DATA against a view. Currently, when users doing it, we got a very strange runtime error. For example, ```SQL LOAD DATA LOCAL INPATH "$testData" INTO TABLE $viewName ``` ``` java.lang.reflect.InvocationTargetException was thrown. java.lang.reflect.InvocationTargetException at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.sql.hive.client.Shim_v0_14.loadTable(HiveShim.scala:680) ``` ## How was this patch tested? Added test cases Author: gatorsmile <gatorsm...@gmail.com> Closes #14314 from gatorsmile/tableDDLAgainstView. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3fc45669 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3fc45669 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3fc45669 Branch: refs/heads/master Commit: 3fc456694151e766c551b4bc58ed7c9457777666 Parents: e164a04 Author: gatorsmile <gatorsm...@gmail.com> Authored: Tue Jul 26 09:32:29 2016 +0800 Committer: Wenchen Fan <wenc...@databricks.com> Committed: Tue Jul 26 09:32:29 2016 +0800 ---------------------------------------------------------------------- .../sql/catalyst/catalog/SessionCatalog.scala | 6 +- .../spark/sql/execution/command/tables.scala | 33 ++++----- .../spark/sql/execution/command/views.scala | 5 ++ .../spark/sql/hive/execution/SQLViewSuite.scala | 71 +++++++++++++++++++- 4 files changed, 96 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/3fc45669/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index 134fc4e..1856dc4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -443,12 +443,12 @@ class SessionCatalog( } /** - * Return whether a table with the specified name exists. + * Return whether a table/view with the specified name exists. * - * Note: If a database is explicitly specified, then this will return whether the table + * Note: If a database is explicitly specified, then this will return whether the table/view * exists in that particular database instead. In that case, even if there is a temporary * table with the same name, we will return false if the specified database does not - * contain the table. + * contain the table/view. */ def tableExists(name: TableIdentifier): Boolean = synchronized { val db = formatDatabaseName(name.database.getOrElse(currentDb)) http://git-wip-us.apache.org/repos/asf/spark/blob/3fc45669/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index 8f3adad..c6daa95 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -202,35 +202,38 @@ case class LoadDataCommand( override def run(sparkSession: SparkSession): Seq[Row] = { val catalog = sparkSession.sessionState.catalog if (!catalog.tableExists(table)) { - throw new AnalysisException(s"Target table in LOAD DATA does not exist: '$table'") + throw new AnalysisException(s"Target table in LOAD DATA does not exist: $table") } val targetTable = catalog.getTableMetadataOption(table).getOrElse { - throw new AnalysisException(s"Target table in LOAD DATA cannot be temporary: '$table'") + throw new AnalysisException(s"Target table in LOAD DATA cannot be temporary: $table") + } + if (targetTable.tableType == CatalogTableType.VIEW) { + throw new AnalysisException(s"Target table in LOAD DATA cannot be a view: $table") } if (DDLUtils.isDatasourceTable(targetTable)) { - throw new AnalysisException(s"LOAD DATA is not supported for datasource tables: '$table'") + throw new AnalysisException(s"LOAD DATA is not supported for datasource tables: $table") } if (targetTable.partitionColumnNames.nonEmpty) { if (partition.isEmpty) { - throw new AnalysisException(s"LOAD DATA target table '$table' is partitioned, " + + throw new AnalysisException(s"LOAD DATA target table $table is partitioned, " + s"but no partition spec is provided") } if (targetTable.partitionColumnNames.size != partition.get.size) { - throw new AnalysisException(s"LOAD DATA target table '$table' is partitioned, " + + throw new AnalysisException(s"LOAD DATA target table $table is partitioned, " + s"but number of columns in provided partition spec (${partition.get.size}) " + s"do not match number of partitioned columns in table " + s"(s${targetTable.partitionColumnNames.size})") } partition.get.keys.foreach { colName => if (!targetTable.partitionColumnNames.contains(colName)) { - throw new AnalysisException(s"LOAD DATA target table '$table' is partitioned, " + + throw new AnalysisException(s"LOAD DATA target table $table is partitioned, " + s"but the specified partition spec refers to a column that is not partitioned: " + s"'$colName'") } } } else { if (partition.nonEmpty) { - throw new AnalysisException(s"LOAD DATA target table '$table' is not partitioned, " + + throw new AnalysisException(s"LOAD DATA target table $table is not partitioned, " + s"but a partition spec was provided.") } } @@ -321,31 +324,31 @@ case class TruncateTableCommand( override def run(spark: SparkSession): Seq[Row] = { val catalog = spark.sessionState.catalog if (!catalog.tableExists(tableName)) { - throw new AnalysisException(s"Table '$tableName' in TRUNCATE TABLE does not exist.") + throw new AnalysisException(s"Table $tableName in TRUNCATE TABLE does not exist.") } if (catalog.isTemporaryTable(tableName)) { throw new AnalysisException( - s"Operation not allowed: TRUNCATE TABLE on temporary tables: '$tableName'") + s"Operation not allowed: TRUNCATE TABLE on temporary tables: $tableName") } val table = catalog.getTableMetadata(tableName) if (table.tableType == CatalogTableType.EXTERNAL) { throw new AnalysisException( - s"Operation not allowed: TRUNCATE TABLE on external tables: '$tableName'") + s"Operation not allowed: TRUNCATE TABLE on external tables: $tableName") } if (table.tableType == CatalogTableType.VIEW) { throw new AnalysisException( - s"Operation not allowed: TRUNCATE TABLE on views: '$tableName'") + s"Operation not allowed: TRUNCATE TABLE on views: $tableName") } val isDatasourceTable = DDLUtils.isDatasourceTable(table) if (isDatasourceTable && partitionSpec.isDefined) { throw new AnalysisException( s"Operation not allowed: TRUNCATE TABLE ... PARTITION is not supported " + - s"for tables created using the data sources API: '$tableName'") + s"for tables created using the data sources API: $tableName") } if (table.partitionColumnNames.isEmpty && partitionSpec.isDefined) { throw new AnalysisException( s"Operation not allowed: TRUNCATE TABLE ... PARTITION is not supported " + - s"for tables that are not partitioned: '$tableName'") + s"for tables that are not partitioned: $tableName") } val locations = if (isDatasourceTable) { @@ -366,7 +369,7 @@ case class TruncateTableCommand( } catch { case NonFatal(e) => throw new AnalysisException( - s"Failed to truncate table '$tableName' when removing data of the path: $path " + + s"Failed to truncate table $tableName when removing data of the path: $path " + s"because of ${e.toString}") } } @@ -379,7 +382,7 @@ case class TruncateTableCommand( spark.sharedState.cacheManager.uncacheQuery(spark.table(tableName.quotedString)) } catch { case NonFatal(e) => - log.warn(s"Exception when attempting to uncache table '$tableName'", e) + log.warn(s"Exception when attempting to uncache table $tableName", e) } Seq.empty[Row] } http://git-wip-us.apache.org/repos/asf/spark/blob/3fc45669/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala index 312a1f6..901a9b9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala @@ -113,9 +113,14 @@ case class CreateViewCommand( val qualifiedName = name.copy(database = Option(database)) if (sessionState.catalog.tableExists(qualifiedName)) { + val tableMetadata = sessionState.catalog.getTableMetadata(qualifiedName) if (allowExisting) { // Handles `CREATE VIEW IF NOT EXISTS v0 AS SELECT ...`. Does nothing when the target view // already exists. + } else if (tableMetadata.tableType != CatalogTableType.VIEW) { + throw new AnalysisException( + "Existing table is not a view. The following is an existing table, " + + s"not a view: $qualifiedName") } else if (replace) { // Handles `CREATE OR REPLACE VIEW v0 AS SELECT ...` sessionState.catalog.alterTable(prepareTable(sparkSession, analyzedPlan)) http://git-wip-us.apache.org/repos/asf/spark/blob/3fc45669/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala index 82dc64a..6a80664 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala @@ -19,7 +19,6 @@ package org.apache.spark.sql.hive.execution import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode} import org.apache.spark.sql.hive.test.TestHiveSingleton -import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SQLTestUtils /** @@ -55,6 +54,76 @@ class SQLViewSuite extends QueryTest with SQLTestUtils with TestHiveSingleton { } } + test("error handling: existing a table with the duplicate name when creating/altering a view") { + withTable("tab1") { + sql("CREATE TABLE tab1 (id int)") + var e = intercept[AnalysisException] { + sql("CREATE OR REPLACE VIEW tab1 AS SELECT * FROM jt") + }.getMessage + assert(e.contains("The following is an existing table, not a view: `default`.`tab1`")) + e = intercept[AnalysisException] { + sql("CREATE VIEW tab1 AS SELECT * FROM jt") + }.getMessage + assert(e.contains("The following is an existing table, not a view: `default`.`tab1`")) + e = intercept[AnalysisException] { + sql("ALTER VIEW tab1 AS SELECT * FROM jt") + }.getMessage + assert(e.contains("The following is an existing table, not a view: `default`.`tab1`")) + } + } + + test("existing a table with the duplicate name when CREATE VIEW IF NOT EXISTS") { + withTable("tab1") { + sql("CREATE TABLE tab1 (id int)") + sql("CREATE VIEW IF NOT EXISTS tab1 AS SELECT * FROM jt") + checkAnswer(sql("select count(*) FROM tab1"), Row(0)) + } + } + + test("error handling: insert/load/truncate table commands against a temp view") { + val viewName = "testView" + withTempView(viewName) { + sql(s"CREATE TEMPORARY VIEW $viewName AS SELECT id FROM jt") + var e = intercept[AnalysisException] { + sql(s"INSERT INTO TABLE $viewName SELECT 1") + }.getMessage + assert(e.contains("Inserting into an RDD-based table is not allowed")) + + val testData = hiveContext.getHiveFile("data/files/employee.dat").getCanonicalPath + e = intercept[AnalysisException] { + sql(s"""LOAD DATA LOCAL INPATH "$testData" INTO TABLE $viewName""") + }.getMessage + assert(e.contains(s"Target table in LOAD DATA cannot be temporary: `$viewName`")) + + e = intercept[AnalysisException] { + sql(s"TRUNCATE TABLE $viewName") + }.getMessage + assert(e.contains(s"Operation not allowed: TRUNCATE TABLE on temporary tables: `$viewName`")) + } + } + + test("error handling: insert/load/truncate table commands against a view") { + val viewName = "testView" + withView(viewName) { + sql(s"CREATE VIEW $viewName AS SELECT id FROM jt") + var e = intercept[AnalysisException] { + sql(s"INSERT INTO TABLE $viewName SELECT 1") + }.getMessage + assert(e.contains("Inserting into an RDD-based table is not allowed")) + + val testData = hiveContext.getHiveFile("data/files/employee.dat").getCanonicalPath + e = intercept[AnalysisException] { + sql(s"""LOAD DATA LOCAL INPATH "$testData" INTO TABLE $viewName""") + }.getMessage + assert(e.contains(s"Target table in LOAD DATA cannot be a view: `$viewName`")) + + e = intercept[AnalysisException] { + sql(s"TRUNCATE TABLE $viewName") + }.getMessage + assert(e.contains(s"Operation not allowed: TRUNCATE TABLE on views: `$viewName`")) + } + } + test("error handling: fail if the view sql itself is invalid") { // A table that does not exist intercept[AnalysisException] { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org