Repository: spark
Updated Branches:
  refs/heads/master 2ac37db7a -> 88a519db9


[SPARK-2734][SQL] Remove tables from cache when DROP TABLE is run.

Author: Michael Armbrust <mich...@databricks.com>

Closes #1650 from marmbrus/dropCached and squashes the following commits:

e6ab80b [Michael Armbrust] Support if exists.
83426c6 [Michael Armbrust] Remove tables from cache when DROP TABLE is run.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/88a519db
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/88a519db
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/88a519db

Branch: refs/heads/master
Commit: 88a519db90d66ee5a1455ef4fcc1ad2a687e3d0b
Parents: 2ac37db
Author: Michael Armbrust <mich...@databricks.com>
Authored: Wed Jul 30 17:30:51 2014 -0700
Committer: Michael Armbrust <mich...@databricks.com>
Committed: Wed Jul 30 17:30:51 2014 -0700

----------------------------------------------------------------------
 .../org/apache/spark/sql/hive/HiveQl.scala      |  9 +++-
 .../apache/spark/sql/hive/HiveStrategies.scala  |  2 +
 .../spark/sql/hive/execution/DropTable.scala    | 48 ++++++++++++++++++++
 .../spark/sql/hive/CachedTableSuite.scala       | 16 +++++++
 4 files changed, 74 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/88a519db/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
index d18ccf8..3d2eb1e 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
@@ -44,6 +44,8 @@ private[hive] case class SourceCommand(filePath: String) 
extends Command
 
 private[hive] case class AddFile(filePath: String) extends Command
 
+private[hive] case class DropTable(tableName: String, ifExists: Boolean) 
extends Command
+
 /** Provides a mapping from HiveQL statements to catalyst logical plans and 
expression trees. */
 private[hive] object HiveQl {
   protected val nativeCommands = Seq(
@@ -96,7 +98,6 @@ private[hive] object HiveQl {
     "TOK_CREATEINDEX",
     "TOK_DROPDATABASE",
     "TOK_DROPINDEX",
-    "TOK_DROPTABLE",
     "TOK_MSCK",
 
     // TODO(marmbrus): Figure out how view are expanded by hive, as we might 
need to handle this.
@@ -377,6 +378,12 @@ private[hive] object HiveQl {
   }
 
   protected def nodeToPlan(node: Node): LogicalPlan = node match {
+    // Special drop table that also uncaches.
+    case Token("TOK_DROPTABLE",
+           Token("TOK_TABNAME", tableNameParts) ::
+           ifExists) =>
+      val tableName = tableNameParts.map { case Token(p, Nil) => p 
}.mkString(".")
+      DropTable(tableName, ifExists.nonEmpty)
     // Just fake explain for any of the native commands.
     case Token("TOK_EXPLAIN", explainArgs)
       if noExplainCommands.contains(explainArgs.head.getText) =>

http://git-wip-us.apache.org/repos/asf/spark/blob/88a519db/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index 4d0fab4..2175c5f 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -81,6 +81,8 @@ private[hive] trait HiveStrategies {
       case logical.NativeCommand(sql) =>
         NativeCommand(sql, plan.output)(context) :: Nil
 
+      case DropTable(tableName, ifExists) => execution.DropTable(tableName, 
ifExists) :: Nil
+
       case describe: logical.DescribeCommand =>
         val resolvedTable = context.executePlan(describe.table).analyzed
         resolvedTable match {

http://git-wip-us.apache.org/repos/asf/spark/blob/88a519db/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DropTable.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DropTable.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DropTable.scala
new file mode 100644
index 0000000..9cd0c86
--- /dev/null
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DropTable.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.expressions.Row
+import org.apache.spark.sql.execution.{Command, LeafNode}
+import org.apache.spark.sql.hive.HiveContext
+
+/**
+ * :: DeveloperApi ::
+ * Drops a table from the metastore and removes it if it is cached.
+ */
+@DeveloperApi
+case class DropTable(tableName: String, ifExists: Boolean) extends LeafNode 
with Command {
+
+  def hiveContext = sqlContext.asInstanceOf[HiveContext]
+
+  def output = Seq.empty
+
+  override protected[sql] lazy val sideEffectResult: Seq[Any] = {
+    val ifExistsClause = if (ifExists) "IF EXISTS " else ""
+    hiveContext.runSqlHive(s"DROP TABLE $ifExistsClause$tableName")
+    hiveContext.catalog.unregisterTable(None, tableName)
+    Seq.empty
+  }
+
+  override def execute(): RDD[Row] = {
+    sideEffectResult
+    sparkContext.emptyRDD[Row]
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/88a519db/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
index 3132d01..08da640 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
@@ -23,6 +23,8 @@ import org.apache.spark.sql.hive.execution.HiveComparisonTest
 import org.apache.spark.sql.hive.test.TestHive
 
 class CachedTableSuite extends HiveComparisonTest {
+  import TestHive._
+
   TestHive.loadTestTable("src")
 
   test("cache table") {
@@ -32,6 +34,20 @@ class CachedTableSuite extends HiveComparisonTest {
   createQueryTest("read from cached table",
     "SELECT * FROM src LIMIT 1", reset = false)
 
+  test("Drop cached table") {
+    hql("CREATE TABLE test(a INT)")
+    cacheTable("test")
+    hql("SELECT * FROM test").collect()
+    hql("DROP TABLE test")
+    intercept[org.apache.hadoop.hive.ql.metadata.InvalidTableException] {
+      hql("SELECT * FROM test").collect()
+    }
+  }
+
+  test("DROP nonexistant table") {
+    hql("DROP TABLE IF EXISTS nonexistantTable")
+  }
+
   test("check that table is cached and uncache") {
     TestHive.table("src").queryExecution.analyzed match {
       case _ : InMemoryRelation => // Found evidence of caching

Reply via email to