Repository: spark
Updated Branches:
  refs/heads/master e164a04b2 -> 3fc456694


[SPARK-16678][SPARK-16677][SQL] Fix two View-related bugs

## What changes were proposed in this pull request?
**Issue 1: Disallow Creating/Altering a View when the same-name Table Exists 
(without IF NOT EXISTS)**
When we create OR alter a view, we check whether the view already exists. In 
the current implementation, if a table with the same name exists, we treat it 
as a view. However, this is not the right behavior. We should follow what Hive 
does. For example,
```
hive> CREATE TABLE tab1 (id int);
OK
Time taken: 0.196 seconds
hive> CREATE OR REPLACE VIEW tab1 AS SELECT * FROM t1;
FAILED: SemanticException [Error 10218]: Existing table is not a view
 The following is an existing table, not a view: default.tab1
hive> ALTER VIEW tab1 AS SELECT * FROM t1;
FAILED: SemanticException [Error 10218]: Existing table is not a view
 The following is an existing table, not a view: default.tab1
hive> CREATE VIEW IF NOT EXISTS tab1 AS SELECT * FROM t1;
OK
Time taken: 0.678 seconds
```

**Issue 2: Strange Error when Issuing Load Table Against A View**
Users should not be allowed to issue LOAD DATA against a view. Currently, when 
users doing it, we got a very strange runtime error. For example,
```SQL
LOAD DATA LOCAL INPATH "$testData" INTO TABLE $viewName
```
```
java.lang.reflect.InvocationTargetException was thrown.
java.lang.reflect.InvocationTargetException
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at 
org.apache.spark.sql.hive.client.Shim_v0_14.loadTable(HiveShim.scala:680)
```
## How was this patch tested?
Added test cases

Author: gatorsmile <gatorsm...@gmail.com>

Closes #14314 from gatorsmile/tableDDLAgainstView.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3fc45669
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3fc45669
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3fc45669

Branch: refs/heads/master
Commit: 3fc456694151e766c551b4bc58ed7c9457777666
Parents: e164a04
Author: gatorsmile <gatorsm...@gmail.com>
Authored: Tue Jul 26 09:32:29 2016 +0800
Committer: Wenchen Fan <wenc...@databricks.com>
Committed: Tue Jul 26 09:32:29 2016 +0800

----------------------------------------------------------------------
 .../sql/catalyst/catalog/SessionCatalog.scala   |  6 +-
 .../spark/sql/execution/command/tables.scala    | 33 ++++-----
 .../spark/sql/execution/command/views.scala     |  5 ++
 .../spark/sql/hive/execution/SQLViewSuite.scala | 71 +++++++++++++++++++-
 4 files changed, 96 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/3fc45669/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index 134fc4e..1856dc4 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -443,12 +443,12 @@ class SessionCatalog(
   }
 
   /**
-   * Return whether a table with the specified name exists.
+   * Return whether a table/view with the specified name exists.
    *
-   * Note: If a database is explicitly specified, then this will return 
whether the table
+   * Note: If a database is explicitly specified, then this will return 
whether the table/view
    * exists in that particular database instead. In that case, even if there 
is a temporary
    * table with the same name, we will return false if the specified database 
does not
-   * contain the table.
+   * contain the table/view.
    */
   def tableExists(name: TableIdentifier): Boolean = synchronized {
     val db = formatDatabaseName(name.database.getOrElse(currentDb))

http://git-wip-us.apache.org/repos/asf/spark/blob/3fc45669/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
index 8f3adad..c6daa95 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
@@ -202,35 +202,38 @@ case class LoadDataCommand(
   override def run(sparkSession: SparkSession): Seq[Row] = {
     val catalog = sparkSession.sessionState.catalog
     if (!catalog.tableExists(table)) {
-      throw new AnalysisException(s"Target table in LOAD DATA does not exist: 
'$table'")
+      throw new AnalysisException(s"Target table in LOAD DATA does not exist: 
$table")
     }
     val targetTable = catalog.getTableMetadataOption(table).getOrElse {
-      throw new AnalysisException(s"Target table in LOAD DATA cannot be 
temporary: '$table'")
+      throw new AnalysisException(s"Target table in LOAD DATA cannot be 
temporary: $table")
+    }
+    if (targetTable.tableType == CatalogTableType.VIEW) {
+      throw new AnalysisException(s"Target table in LOAD DATA cannot be a 
view: $table")
     }
     if (DDLUtils.isDatasourceTable(targetTable)) {
-      throw new AnalysisException(s"LOAD DATA is not supported for datasource 
tables: '$table'")
+      throw new AnalysisException(s"LOAD DATA is not supported for datasource 
tables: $table")
     }
     if (targetTable.partitionColumnNames.nonEmpty) {
       if (partition.isEmpty) {
-        throw new AnalysisException(s"LOAD DATA target table '$table' is 
partitioned, " +
+        throw new AnalysisException(s"LOAD DATA target table $table is 
partitioned, " +
           s"but no partition spec is provided")
       }
       if (targetTable.partitionColumnNames.size != partition.get.size) {
-        throw new AnalysisException(s"LOAD DATA target table '$table' is 
partitioned, " +
+        throw new AnalysisException(s"LOAD DATA target table $table is 
partitioned, " +
           s"but number of columns in provided partition spec 
(${partition.get.size}) " +
           s"do not match number of partitioned columns in table " +
           s"(s${targetTable.partitionColumnNames.size})")
       }
       partition.get.keys.foreach { colName =>
         if (!targetTable.partitionColumnNames.contains(colName)) {
-          throw new AnalysisException(s"LOAD DATA target table '$table' is 
partitioned, " +
+          throw new AnalysisException(s"LOAD DATA target table $table is 
partitioned, " +
             s"but the specified partition spec refers to a column that is not 
partitioned: " +
             s"'$colName'")
         }
       }
     } else {
       if (partition.nonEmpty) {
-        throw new AnalysisException(s"LOAD DATA target table '$table' is not 
partitioned, " +
+        throw new AnalysisException(s"LOAD DATA target table $table is not 
partitioned, " +
           s"but a partition spec was provided.")
       }
     }
@@ -321,31 +324,31 @@ case class TruncateTableCommand(
   override def run(spark: SparkSession): Seq[Row] = {
     val catalog = spark.sessionState.catalog
     if (!catalog.tableExists(tableName)) {
-      throw new AnalysisException(s"Table '$tableName' in TRUNCATE TABLE does 
not exist.")
+      throw new AnalysisException(s"Table $tableName in TRUNCATE TABLE does 
not exist.")
     }
     if (catalog.isTemporaryTable(tableName)) {
       throw new AnalysisException(
-        s"Operation not allowed: TRUNCATE TABLE on temporary tables: 
'$tableName'")
+        s"Operation not allowed: TRUNCATE TABLE on temporary tables: 
$tableName")
     }
     val table = catalog.getTableMetadata(tableName)
     if (table.tableType == CatalogTableType.EXTERNAL) {
       throw new AnalysisException(
-        s"Operation not allowed: TRUNCATE TABLE on external tables: 
'$tableName'")
+        s"Operation not allowed: TRUNCATE TABLE on external tables: 
$tableName")
     }
     if (table.tableType == CatalogTableType.VIEW) {
       throw new AnalysisException(
-        s"Operation not allowed: TRUNCATE TABLE on views: '$tableName'")
+        s"Operation not allowed: TRUNCATE TABLE on views: $tableName")
     }
     val isDatasourceTable = DDLUtils.isDatasourceTable(table)
     if (isDatasourceTable && partitionSpec.isDefined) {
       throw new AnalysisException(
         s"Operation not allowed: TRUNCATE TABLE ... PARTITION is not supported 
" +
-        s"for tables created using the data sources API: '$tableName'")
+        s"for tables created using the data sources API: $tableName")
     }
     if (table.partitionColumnNames.isEmpty && partitionSpec.isDefined) {
       throw new AnalysisException(
         s"Operation not allowed: TRUNCATE TABLE ... PARTITION is not supported 
" +
-        s"for tables that are not partitioned: '$tableName'")
+        s"for tables that are not partitioned: $tableName")
     }
     val locations =
       if (isDatasourceTable) {
@@ -366,7 +369,7 @@ case class TruncateTableCommand(
         } catch {
           case NonFatal(e) =>
             throw new AnalysisException(
-              s"Failed to truncate table '$tableName' when removing data of 
the path: $path " +
+              s"Failed to truncate table $tableName when removing data of the 
path: $path " +
                 s"because of ${e.toString}")
         }
       }
@@ -379,7 +382,7 @@ case class TruncateTableCommand(
       
spark.sharedState.cacheManager.uncacheQuery(spark.table(tableName.quotedString))
     } catch {
       case NonFatal(e) =>
-        log.warn(s"Exception when attempting to uncache table '$tableName'", e)
+        log.warn(s"Exception when attempting to uncache table $tableName", e)
     }
     Seq.empty[Row]
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/3fc45669/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
index 312a1f6..901a9b9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
@@ -113,9 +113,14 @@ case class CreateViewCommand(
       val qualifiedName = name.copy(database = Option(database))
 
       if (sessionState.catalog.tableExists(qualifiedName)) {
+        val tableMetadata = 
sessionState.catalog.getTableMetadata(qualifiedName)
         if (allowExisting) {
           // Handles `CREATE VIEW IF NOT EXISTS v0 AS SELECT ...`. Does 
nothing when the target view
           // already exists.
+        } else if (tableMetadata.tableType != CatalogTableType.VIEW) {
+          throw new AnalysisException(
+            "Existing table is not a view. The following is an existing table, 
" +
+              s"not a view: $qualifiedName")
         } else if (replace) {
           // Handles `CREATE OR REPLACE VIEW v0 AS SELECT ...`
           sessionState.catalog.alterTable(prepareTable(sparkSession, 
analyzedPlan))

http://git-wip-us.apache.org/repos/asf/spark/blob/3fc45669/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala
index 82dc64a..6a80664 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala
@@ -19,7 +19,6 @@ package org.apache.spark.sql.hive.execution
 
 import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode}
 import org.apache.spark.sql.hive.test.TestHiveSingleton
-import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SQLTestUtils
 
 /**
@@ -55,6 +54,76 @@ class SQLViewSuite extends QueryTest with SQLTestUtils with 
TestHiveSingleton {
     }
   }
 
+  test("error handling: existing a table with the duplicate name when 
creating/altering a view") {
+    withTable("tab1") {
+      sql("CREATE TABLE tab1 (id int)")
+      var e = intercept[AnalysisException] {
+        sql("CREATE OR REPLACE VIEW tab1 AS SELECT * FROM jt")
+      }.getMessage
+      assert(e.contains("The following is an existing table, not a view: 
`default`.`tab1`"))
+      e = intercept[AnalysisException] {
+        sql("CREATE VIEW tab1 AS SELECT * FROM jt")
+      }.getMessage
+      assert(e.contains("The following is an existing table, not a view: 
`default`.`tab1`"))
+      e = intercept[AnalysisException] {
+        sql("ALTER VIEW tab1 AS SELECT * FROM jt")
+      }.getMessage
+      assert(e.contains("The following is an existing table, not a view: 
`default`.`tab1`"))
+    }
+  }
+
+  test("existing a table with the duplicate name when CREATE VIEW IF NOT 
EXISTS") {
+    withTable("tab1") {
+      sql("CREATE TABLE tab1 (id int)")
+      sql("CREATE VIEW IF NOT EXISTS tab1 AS SELECT * FROM jt")
+      checkAnswer(sql("select count(*) FROM tab1"), Row(0))
+    }
+  }
+
+  test("error handling: insert/load/truncate table commands against a temp 
view") {
+    val viewName = "testView"
+    withTempView(viewName) {
+      sql(s"CREATE TEMPORARY VIEW $viewName AS SELECT id FROM jt")
+      var e = intercept[AnalysisException] {
+        sql(s"INSERT INTO TABLE $viewName SELECT 1")
+      }.getMessage
+      assert(e.contains("Inserting into an RDD-based table is not allowed"))
+
+      val testData = 
hiveContext.getHiveFile("data/files/employee.dat").getCanonicalPath
+      e = intercept[AnalysisException] {
+        sql(s"""LOAD DATA LOCAL INPATH "$testData" INTO TABLE $viewName""")
+      }.getMessage
+      assert(e.contains(s"Target table in LOAD DATA cannot be temporary: 
`$viewName`"))
+
+      e = intercept[AnalysisException] {
+        sql(s"TRUNCATE TABLE $viewName")
+      }.getMessage
+      assert(e.contains(s"Operation not allowed: TRUNCATE TABLE on temporary 
tables: `$viewName`"))
+    }
+  }
+
+  test("error handling: insert/load/truncate table commands against a view") {
+    val viewName = "testView"
+    withView(viewName) {
+      sql(s"CREATE VIEW $viewName AS SELECT id FROM jt")
+      var e = intercept[AnalysisException] {
+        sql(s"INSERT INTO TABLE $viewName SELECT 1")
+      }.getMessage
+      assert(e.contains("Inserting into an RDD-based table is not allowed"))
+
+      val testData = 
hiveContext.getHiveFile("data/files/employee.dat").getCanonicalPath
+      e = intercept[AnalysisException] {
+        sql(s"""LOAD DATA LOCAL INPATH "$testData" INTO TABLE $viewName""")
+      }.getMessage
+      assert(e.contains(s"Target table in LOAD DATA cannot be a view: 
`$viewName`"))
+
+      e = intercept[AnalysisException] {
+        sql(s"TRUNCATE TABLE $viewName")
+      }.getMessage
+      assert(e.contains(s"Operation not allowed: TRUNCATE TABLE on views: 
`$viewName`"))
+    }
+  }
+
   test("error handling: fail if the view sql itself is invalid") {
     // A table that does not exist
     intercept[AnalysisException] {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to