Repository: spark
Updated Branches:
  refs/heads/branch-2.0 52cb1ad38 -> 26359d27c


[SPARK-15862][SQL] Better Error Message When Having Database Name in CACHE 
TABLE AS SELECT

#### What changes were proposed in this pull request?
~~If the temp table already exists, we should not silently replace it when 
doing `CACHE TABLE AS SELECT`. This is inconsistent with the behavior of `CREAT 
VIEW` or `CREATE TABLE`. This PR is to fix this silent drop.~~

~~Maybe, we also can introduce new syntax for replacing the existing one. For 
example, in Hive, to replace a view, the syntax should be like `ALTER VIEW AS 
SELECT` or `CREATE OR REPLACE VIEW AS SELECT`~~

The table name in `CACHE TABLE AS SELECT` should NOT contain database prefix 
like "database.table". Thus, this PR captures this in Parser and outputs a 
better error message, instead of reporting the view already exists.

In addition, refactoring the `Parser` to generate table identifiers instead of 
returning the table name string.

#### How was this patch tested?
- Added a test case for caching and uncaching qualified table names
- Fixed a few test cases that do not drop temp table at the end
- Added the related test case for the issue resolved in this PR

Author: gatorsmile <gatorsm...@gmail.com>
Author: xiaoli <lixiao1...@gmail.com>
Author: Xiao Li <xiaoli@Xiaos-MacBook-Pro.local>

Closes #13572 from gatorsmile/cacheTableAsSelect.

(cherry picked from commit 6451cf9270b55465d8ecea4c4031329a1058561a)
Signed-off-by: Cheng Lian <l...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/26359d27
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/26359d27
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/26359d27

Branch: refs/heads/branch-2.0
Commit: 26359d27c47ae3ec53e442de3884ec9245d15cee
Parents: 52cb1ad
Author: gatorsmile <gatorsm...@gmail.com>
Authored: Thu Jun 16 10:01:59 2016 -0700
Committer: Cheng Lian <l...@databricks.com>
Committed: Thu Jun 16 10:02:12 2016 -0700

----------------------------------------------------------------------
 .../apache/spark/sql/catalyst/parser/SqlBase.g4 |  4 +-
 .../spark/sql/execution/SparkSqlParser.scala    | 10 ++-
 .../spark/sql/execution/command/cache.scala     | 20 ++---
 .../spark/sql/execution/command/views.scala     |  2 +-
 .../org/apache/spark/sql/CachedTableSuite.scala | 68 +++++++++--------
 .../apache/spark/sql/hive/test/TestHive.scala   |  2 +-
 .../spark/sql/hive/CachedTableSuite.scala       | 79 +++++++++++++++-----
 7 files changed, 121 insertions(+), 64 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/26359d27/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 
b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
index 044f910..b603196 100644
--- 
a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
+++ 
b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
@@ -114,8 +114,8 @@ statement
         tableIdentifier partitionSpec? describeColName?                
#describeTable
     | REFRESH TABLE tableIdentifier                                    
#refreshTable
     | REFRESH .*?                                                      
#refreshResource
-    | CACHE LAZY? TABLE identifier (AS? query)?                        
#cacheTable
-    | UNCACHE TABLE identifier                                         
#uncacheTable
+    | CACHE LAZY? TABLE tableIdentifier (AS? query)?                   
#cacheTable
+    | UNCACHE TABLE tableIdentifier                                    
#uncacheTable
     | CLEAR CACHE                                                      
#clearCache
     | LOAD DATA LOCAL? INPATH path=STRING OVERWRITE? INTO TABLE
         tableIdentifier partitionSpec?                                 
#loadData

http://git-wip-us.apache.org/repos/asf/spark/blob/26359d27/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index a0508ad..154c25a 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -221,14 +221,20 @@ class SparkSqlAstBuilder(conf: SQLConf) extends 
AstBuilder {
    */
   override def visitCacheTable(ctx: CacheTableContext): LogicalPlan = 
withOrigin(ctx) {
     val query = Option(ctx.query).map(plan)
-    CacheTableCommand(ctx.identifier.getText, query, ctx.LAZY != null)
+    val tableIdent = visitTableIdentifier(ctx.tableIdentifier)
+    if (query.isDefined && tableIdent.database.isDefined) {
+      val database = tableIdent.database.get
+      throw new ParseException(s"It is not allowed to add database prefix 
`$database` to " +
+        s"the table name in CACHE TABLE AS SELECT", ctx)
+    }
+    CacheTableCommand(tableIdent, query, ctx.LAZY != null)
   }
 
   /**
    * Create an [[UncacheTableCommand]] logical plan.
    */
   override def visitUncacheTable(ctx: UncacheTableContext): LogicalPlan = 
withOrigin(ctx) {
-    UncacheTableCommand(ctx.identifier.getText)
+    UncacheTableCommand(visitTableIdentifier(ctx.tableIdentifier))
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/26359d27/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala
index 5332366..697e2ff 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala
@@ -18,15 +18,17 @@
 package org.apache.spark.sql.execution.command
 
 import org.apache.spark.sql.{Dataset, Row, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.plans.QueryPlan
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 
 case class CacheTableCommand(
-  tableName: String,
-  plan: Option[LogicalPlan],
-  isLazy: Boolean)
-  extends RunnableCommand {
+    tableIdent: TableIdentifier,
+    plan: Option[LogicalPlan],
+    isLazy: Boolean) extends RunnableCommand {
+  require(plan.isEmpty || tableIdent.database.isEmpty,
+    "Database name is not allowed in CACHE TABLE AS SELECT")
 
   override protected def innerChildren: Seq[QueryPlan[_]] = {
     plan.toSeq
@@ -34,13 +36,13 @@ case class CacheTableCommand(
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
     plan.foreach { logicalPlan =>
-      Dataset.ofRows(sparkSession, 
logicalPlan).createOrReplaceTempView(tableName)
+      Dataset.ofRows(sparkSession, 
logicalPlan).createTempView(tableIdent.quotedString)
     }
-    sparkSession.catalog.cacheTable(tableName)
+    sparkSession.catalog.cacheTable(tableIdent.quotedString)
 
     if (!isLazy) {
       // Performs eager caching
-      sparkSession.table(tableName).count()
+      sparkSession.table(tableIdent).count()
     }
 
     Seq.empty[Row]
@@ -50,10 +52,10 @@ case class CacheTableCommand(
 }
 
 
-case class UncacheTableCommand(tableName: String) extends RunnableCommand {
+case class UncacheTableCommand(tableIdent: TableIdentifier) extends 
RunnableCommand {
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
-    sparkSession.catalog.uncacheTable(tableName)
+    sparkSession.catalog.uncacheTable(tableIdent.quotedString)
     Seq.empty[Row]
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/26359d27/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
index b56c200..088f684 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
@@ -79,7 +79,7 @@ case class CreateViewCommand(
   if (isTemporary && tableDesc.identifier.database.isDefined) {
     val database = tableDesc.identifier.database.get
     throw new AnalysisException(
-      s"It is not allowed to add database prefix ${database} for the TEMPORARY 
view name.")
+      s"It is not allowed to add database prefix `$database` for the TEMPORARY 
view name.")
   }
 
   override def run(sparkSession: SparkSession): Seq[Row] = {

http://git-wip-us.apache.org/repos/asf/spark/blob/26359d27/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
index d7df18a..6f6abfa 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
@@ -73,11 +73,13 @@ class CachedTableSuite extends QueryTest with SQLTestUtils 
with SharedSQLContext
   }
 
   test("cache temp table") {
-    testData.select('key).createOrReplaceTempView("tempTable")
-    assertCached(sql("SELECT COUNT(*) FROM tempTable"), 0)
-    spark.catalog.cacheTable("tempTable")
-    assertCached(sql("SELECT COUNT(*) FROM tempTable"))
-    spark.catalog.uncacheTable("tempTable")
+    withTempTable("tempTable") {
+      testData.select('key).createOrReplaceTempView("tempTable")
+      assertCached(sql("SELECT COUNT(*) FROM tempTable"), 0)
+      spark.catalog.cacheTable("tempTable")
+      assertCached(sql("SELECT COUNT(*) FROM tempTable"))
+      spark.catalog.uncacheTable("tempTable")
+    }
   }
 
   test("unpersist an uncached table will not raise exception") {
@@ -95,9 +97,11 @@ class CachedTableSuite extends QueryTest with SQLTestUtils 
with SharedSQLContext
   }
 
   test("cache table as select") {
-    sql("CACHE TABLE tempTable AS SELECT key FROM testData")
-    assertCached(sql("SELECT COUNT(*) FROM tempTable"))
-    spark.catalog.uncacheTable("tempTable")
+    withTempTable("tempTable") {
+      sql("CACHE TABLE tempTable AS SELECT key FROM testData")
+      assertCached(sql("SELECT COUNT(*) FROM tempTable"))
+      spark.catalog.uncacheTable("tempTable")
+    }
   }
 
   test("uncaching temp table") {
@@ -223,32 +227,36 @@ class CachedTableSuite extends QueryTest with 
SQLTestUtils with SharedSQLContext
   }
 
   test("CACHE TABLE tableName AS SELECT * FROM anotherTable") {
-    sql("CACHE TABLE testCacheTable AS SELECT * FROM testData")
-    assertCached(spark.table("testCacheTable"))
-
-    val rddId = rddIdOf("testCacheTable")
-    assert(
-      isMaterialized(rddId),
-      "Eagerly cached in-memory table should have already been materialized")
-
-    spark.catalog.uncacheTable("testCacheTable")
-    eventually(timeout(10 seconds)) {
-      assert(!isMaterialized(rddId), "Uncached in-memory table should have 
been unpersisted")
+    withTempTable("testCacheTable") {
+      sql("CACHE TABLE testCacheTable AS SELECT * FROM testData")
+      assertCached(spark.table("testCacheTable"))
+
+      val rddId = rddIdOf("testCacheTable")
+      assert(
+        isMaterialized(rddId),
+        "Eagerly cached in-memory table should have already been materialized")
+
+      spark.catalog.uncacheTable("testCacheTable")
+      eventually(timeout(10 seconds)) {
+        assert(!isMaterialized(rddId), "Uncached in-memory table should have 
been unpersisted")
+      }
     }
   }
 
   test("CACHE TABLE tableName AS SELECT ...") {
-    sql("CACHE TABLE testCacheTable AS SELECT key FROM testData LIMIT 10")
-    assertCached(spark.table("testCacheTable"))
-
-    val rddId = rddIdOf("testCacheTable")
-    assert(
-      isMaterialized(rddId),
-      "Eagerly cached in-memory table should have already been materialized")
-
-    spark.catalog.uncacheTable("testCacheTable")
-    eventually(timeout(10 seconds)) {
-      assert(!isMaterialized(rddId), "Uncached in-memory table should have 
been unpersisted")
+    withTempTable("testCacheTable") {
+      sql("CACHE TABLE testCacheTable AS SELECT key FROM testData LIMIT 10")
+      assertCached(spark.table("testCacheTable"))
+
+      val rddId = rddIdOf("testCacheTable")
+      assert(
+        isMaterialized(rddId),
+        "Eagerly cached in-memory table should have already been materialized")
+
+      spark.catalog.uncacheTable("testCacheTable")
+      eventually(timeout(10 seconds)) {
+        assert(!isMaterialized(rddId), "Uncached in-memory table should have 
been unpersisted")
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/26359d27/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
index 1d1d5e3..b45be02 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
@@ -449,7 +449,7 @@ private[hive] class TestHiveQueryExecution(
 
   override lazy val analyzed: LogicalPlan = {
     val describedTables = logical match {
-      case CacheTableCommand(tbl, _, _) => tbl :: Nil
+      case CacheTableCommand(tbl, _, _) => tbl.table :: Nil
       case _ => Nil
     }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/26359d27/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
index e35a719..f7c3e34 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
@@ -21,6 +21,7 @@ import java.io.File
 
 import org.apache.spark.sql.{AnalysisException, QueryTest, SaveMode}
 import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
+import org.apache.spark.sql.catalyst.parser.ParseException
 import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
 import org.apache.spark.sql.hive.test.TestHiveSingleton
 import org.apache.spark.sql.test.SQLTestUtils
@@ -128,29 +129,33 @@ class CachedTableSuite extends QueryTest with 
SQLTestUtils with TestHiveSingleto
   }
 
   test("CACHE TABLE tableName AS SELECT * FROM anotherTable") {
-    sql("CACHE TABLE testCacheTable AS SELECT * FROM src")
-    assertCached(table("testCacheTable"))
+    withTempTable("testCacheTable") {
+      sql("CACHE TABLE testCacheTable AS SELECT * FROM src")
+      assertCached(table("testCacheTable"))
 
-    val rddId = rddIdOf("testCacheTable")
-    assert(
-      isMaterialized(rddId),
-      "Eagerly cached in-memory table should have already been materialized")
+      val rddId = rddIdOf("testCacheTable")
+      assert(
+        isMaterialized(rddId),
+        "Eagerly cached in-memory table should have already been materialized")
 
-    uncacheTable("testCacheTable")
-    assert(!isMaterialized(rddId), "Uncached in-memory table should have been 
unpersisted")
+      uncacheTable("testCacheTable")
+      assert(!isMaterialized(rddId), "Uncached in-memory table should have 
been unpersisted")
+    }
   }
 
   test("CACHE TABLE tableName AS SELECT ...") {
-    sql("CACHE TABLE testCacheTable AS SELECT key FROM src LIMIT 10")
-    assertCached(table("testCacheTable"))
+    withTempTable("testCacheTable") {
+      sql("CACHE TABLE testCacheTable AS SELECT key FROM src LIMIT 10")
+      assertCached(table("testCacheTable"))
 
-    val rddId = rddIdOf("testCacheTable")
-    assert(
-      isMaterialized(rddId),
-      "Eagerly cached in-memory table should have already been materialized")
+      val rddId = rddIdOf("testCacheTable")
+      assert(
+        isMaterialized(rddId),
+        "Eagerly cached in-memory table should have already been materialized")
 
-    uncacheTable("testCacheTable")
-    assert(!isMaterialized(rddId), "Uncached in-memory table should have been 
unpersisted")
+      uncacheTable("testCacheTable")
+      assert(!isMaterialized(rddId), "Uncached in-memory table should have 
been unpersisted")
+    }
   }
 
   test("CACHE LAZY TABLE tableName") {
@@ -172,9 +177,11 @@ class CachedTableSuite extends QueryTest with SQLTestUtils 
with TestHiveSingleto
   }
 
   test("CACHE TABLE with Hive UDF") {
-    sql("CACHE TABLE udfTest AS SELECT * FROM src WHERE floor(key) = 1")
-    assertCached(table("udfTest"))
-    uncacheTable("udfTest")
+    withTempTable("udfTest") {
+      sql("CACHE TABLE udfTest AS SELECT * FROM src WHERE floor(key) = 1")
+      assertCached(table("udfTest"))
+      uncacheTable("udfTest")
+    }
   }
 
   test("REFRESH TABLE also needs to recache the data (data source tables)") {
@@ -267,6 +274,40 @@ class CachedTableSuite extends QueryTest with SQLTestUtils 
with TestHiveSingleto
     Utils.deleteRecursively(tempPath)
   }
 
+  test("Cache/Uncache Qualified Tables") {
+    withTempDatabase { db =>
+      withTempTable("cachedTable") {
+        sql(s"CREATE TABLE $db.cachedTable STORED AS PARQUET AS SELECT 1")
+        sql(s"CACHE TABLE $db.cachedTable")
+        assertCached(spark.table(s"$db.cachedTable"))
+
+        activateDatabase(db) {
+          assertCached(spark.table("cachedTable"))
+          sql("UNCACHE TABLE cachedTable")
+          assert(!spark.catalog.isCached("cachedTable"), "Table 'cachedTable' 
should not be cached")
+          sql(s"CACHE TABLE cachedTable")
+          assert(spark.catalog.isCached("cachedTable"), "Table 'cachedTable' 
should be cached")
+        }
+
+        sql(s"UNCACHE TABLE $db.cachedTable")
+        assert(!spark.catalog.isCached(s"$db.cachedTable"),
+          "Table 'cachedTable' should not be cached")
+      }
+    }
+  }
+
+  test("Cache Table As Select - having database name") {
+    withTempDatabase { db =>
+      withTempTable("cachedTable") {
+        val e = intercept[ParseException] {
+          sql(s"CACHE TABLE $db.cachedTable AS SELECT 1")
+        }.getMessage
+        assert(e.contains("It is not allowed to add database prefix ") &&
+          e.contains("to the table name in CACHE TABLE AS SELECT"))
+      }
+    }
+  }
+
   test("SPARK-11246 cache parquet table") {
     sql("CREATE TABLE cachedTable STORED AS PARQUET AS SELECT 1")
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to