Repository: spark
Updated Branches:
  refs/heads/branch-2.0 46d5f7f38 -> 44052a707


[SPARK-15596][SPARK-15635][SQL] ALTER TABLE RENAME fixes

## What changes were proposed in this pull request?

**SPARK-15596**: Even after we renamed a cached table, the plan would remain in 
the cache with the old table name. If I created a new table using the old name 
then the old table would return incorrect data. Note that this applies only to 
Hive tables.

**SPARK-15635**: Renaming a datasource table would render the table not 
query-able. This is because we store the location of the table in a "path" 
property, which was not updated to reflect Hive's change in table location 
following a rename.

## How was this patch tested?

DDLSuite

Author: Andrew Or <and...@databricks.com>

Closes #13416 from andrewor14/rename-table.

(cherry picked from commit 9e2643b21d5749f2f5447b0274a6a35496054342)
Signed-off-by: Yin Huai <yh...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/44052a70
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/44052a70
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/44052a70

Branch: refs/heads/branch-2.0
Commit: 44052a70734fd0f35dc54f5553ae40ac395eef13
Parents: 46d5f7f
Author: Andrew Or <and...@databricks.com>
Authored: Wed Jun 1 14:26:24 2016 -0700
Committer: Yin Huai <yh...@databricks.com>
Committed: Wed Jun 1 14:26:49 2016 -0700

----------------------------------------------------------------------
 .../spark/sql/execution/command/tables.scala    | 35 ++++++++++++++++++--
 .../spark/sql/execution/command/DDLSuite.scala  | 18 ++++++++++
 2 files changed, 51 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/44052a70/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
index 2d6a3b4..1b89c6b 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
@@ -23,6 +23,7 @@ import java.util.Date
 
 import scala.collection.mutable.ArrayBuffer
 import scala.util.control.NonFatal
+import scala.util.Try
 
 import org.apache.hadoop.fs.Path
 
@@ -145,8 +146,38 @@ case class AlterTableRenameCommand(
   override def run(sparkSession: SparkSession): Seq[Row] = {
     val catalog = sparkSession.sessionState.catalog
     DDLUtils.verifyAlterTableType(catalog, oldName, isView)
-    catalog.invalidateTable(oldName)
-    catalog.renameTable(oldName, newName)
+    // If this is a temp view, just rename the view.
+    // Otherwise, if this is a real table, we also need to uncache and 
invalidate the table.
+    val isTemporary = catalog.isTemporaryTable(oldName)
+    if (isTemporary) {
+      catalog.renameTable(oldName, newName)
+    } else {
+      // If an exception is thrown here we can just assume the table is 
uncached;
+      // this can happen with Hive tables when the underlying catalog is 
in-memory.
+      val wasCached = 
Try(sparkSession.catalog.isCached(oldName.unquotedString)).getOrElse(false)
+      if (wasCached) {
+        try {
+          sparkSession.catalog.uncacheTable(oldName.unquotedString)
+        } catch {
+          case NonFatal(e) => log.warn(e.toString, e)
+        }
+      }
+      // For datasource tables, we also need to update the "path" serde 
property
+      val table = catalog.getTableMetadata(oldName)
+      if (DDLUtils.isDatasourceTable(table) && table.tableType == 
CatalogTableType.MANAGED) {
+        val newPath = catalog.defaultTablePath(newName)
+        val newTable = table.withNewStorage(
+          serdeProperties = table.storage.serdeProperties ++ Map("path" -> 
newPath))
+        catalog.alterTable(newTable)
+      }
+      // Invalidate the table last, otherwise uncaching the table would load 
the logical plan
+      // back into the hive metastore cache
+      catalog.invalidateTable(oldName)
+      catalog.renameTable(oldName, newName)
+      if (wasCached) {
+        sparkSession.catalog.cacheTable(newName.unquotedString)
+      }
+    }
     Seq.empty[Row]
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/44052a70/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index dd1f598..741ea67 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -446,6 +446,24 @@ class DDLSuite extends QueryTest with SharedSQLContext 
with BeforeAndAfterEach {
     }
   }
 
+  test("alter table: rename cached table") {
+    import testImplicits._
+    sql("CREATE TABLE students (age INT, name STRING) USING parquet")
+    val df = (1 to 2).map { i => (i, i.toString) }.toDF("age", "name")
+    df.write.insertInto("students")
+    spark.catalog.cacheTable("students")
+    assume(spark.table("students").collect().toSeq == df.collect().toSeq, "bad 
test: wrong data")
+    assume(spark.catalog.isCached("students"), "bad test: table was not cached 
in the first place")
+    sql("ALTER TABLE students RENAME TO teachers")
+    sql("CREATE TABLE students (age INT, name STRING) USING parquet")
+    // Now we have both students and teachers.
+    // The cached data for the old students table should not be read by the 
new students table.
+    assert(!spark.catalog.isCached("students"))
+    assert(spark.catalog.isCached("teachers"))
+    assert(spark.table("students").collect().isEmpty)
+    assert(spark.table("teachers").collect().toSeq == df.collect().toSeq)
+  }
+
   test("rename temporary table - destination table with database name") {
     withTempTable("tab1") {
       sql(


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to