Repository: spark Updated Branches: refs/heads/branch-2.0 46d5f7f38 -> 44052a707
[SPARK-15596][SPARK-15635][SQL] ALTER TABLE RENAME fixes ## What changes were proposed in this pull request? **SPARK-15596**: Even after we renamed a cached table, the plan would remain in the cache with the old table name. If I created a new table using the old name then the old table would return incorrect data. Note that this applies only to Hive tables. **SPARK-15635**: Renaming a datasource table would render the table not query-able. This is because we store the location of the table in a "path" property, which was not updated to reflect Hive's change in table location following a rename. ## How was this patch tested? DDLSuite Author: Andrew Or <and...@databricks.com> Closes #13416 from andrewor14/rename-table. (cherry picked from commit 9e2643b21d5749f2f5447b0274a6a35496054342) Signed-off-by: Yin Huai <yh...@databricks.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/44052a70 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/44052a70 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/44052a70 Branch: refs/heads/branch-2.0 Commit: 44052a70734fd0f35dc54f5553ae40ac395eef13 Parents: 46d5f7f Author: Andrew Or <and...@databricks.com> Authored: Wed Jun 1 14:26:24 2016 -0700 Committer: Yin Huai <yh...@databricks.com> Committed: Wed Jun 1 14:26:49 2016 -0700 ---------------------------------------------------------------------- .../spark/sql/execution/command/tables.scala | 35 ++++++++++++++++++-- .../spark/sql/execution/command/DDLSuite.scala | 18 ++++++++++ 2 files changed, 51 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/44052a70/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index 2d6a3b4..1b89c6b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -23,6 +23,7 @@ import java.util.Date import scala.collection.mutable.ArrayBuffer import scala.util.control.NonFatal +import scala.util.Try import org.apache.hadoop.fs.Path @@ -145,8 +146,38 @@ case class AlterTableRenameCommand( override def run(sparkSession: SparkSession): Seq[Row] = { val catalog = sparkSession.sessionState.catalog DDLUtils.verifyAlterTableType(catalog, oldName, isView) - catalog.invalidateTable(oldName) - catalog.renameTable(oldName, newName) + // If this is a temp view, just rename the view. + // Otherwise, if this is a real table, we also need to uncache and invalidate the table. + val isTemporary = catalog.isTemporaryTable(oldName) + if (isTemporary) { + catalog.renameTable(oldName, newName) + } else { + // If an exception is thrown here we can just assume the table is uncached; + // this can happen with Hive tables when the underlying catalog is in-memory. + val wasCached = Try(sparkSession.catalog.isCached(oldName.unquotedString)).getOrElse(false) + if (wasCached) { + try { + sparkSession.catalog.uncacheTable(oldName.unquotedString) + } catch { + case NonFatal(e) => log.warn(e.toString, e) + } + } + // For datasource tables, we also need to update the "path" serde property + val table = catalog.getTableMetadata(oldName) + if (DDLUtils.isDatasourceTable(table) && table.tableType == CatalogTableType.MANAGED) { + val newPath = catalog.defaultTablePath(newName) + val newTable = table.withNewStorage( + serdeProperties = table.storage.serdeProperties ++ Map("path" -> newPath)) + catalog.alterTable(newTable) + } + // Invalidate the table last, otherwise uncaching the table would load the logical plan + // back into the hive metastore cache + catalog.invalidateTable(oldName) + catalog.renameTable(oldName, newName) + if (wasCached) { + sparkSession.catalog.cacheTable(newName.unquotedString) + } + } Seq.empty[Row] } http://git-wip-us.apache.org/repos/asf/spark/blob/44052a70/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index dd1f598..741ea67 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -446,6 +446,24 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { } } + test("alter table: rename cached table") { + import testImplicits._ + sql("CREATE TABLE students (age INT, name STRING) USING parquet") + val df = (1 to 2).map { i => (i, i.toString) }.toDF("age", "name") + df.write.insertInto("students") + spark.catalog.cacheTable("students") + assume(spark.table("students").collect().toSeq == df.collect().toSeq, "bad test: wrong data") + assume(spark.catalog.isCached("students"), "bad test: table was not cached in the first place") + sql("ALTER TABLE students RENAME TO teachers") + sql("CREATE TABLE students (age INT, name STRING) USING parquet") + // Now we have both students and teachers. + // The cached data for the old students table should not be read by the new students table. + assert(!spark.catalog.isCached("students")) + assert(spark.catalog.isCached("teachers")) + assert(spark.table("students").collect().isEmpty) + assert(spark.table("teachers").collect().toSeq == df.collect().toSeq) + } + test("rename temporary table - destination table with database name") { withTempTable("tab1") { sql( --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org