Repository: spark Updated Branches: refs/heads/branch-2.1 94272a960 -> 776255065
[SPARK-19048][SQL] Delete Partition Location when Dropping Managed Partitioned Tables in InMemoryCatalog ### What changes were proposed in this pull request? The data in the managed table should be deleted after table is dropped. However, if the partition location is not under the location of the partitioned table, it is not deleted as expected. Users can specify any location for the partition when they adding a partition. This PR is to delete partition location when dropping managed partitioned tables stored in `InMemoryCatalog`. ### How was this patch tested? Added test cases for both HiveExternalCatalog and InMemoryCatalog Author: gatorsmile <gatorsm...@gmail.com> Closes #16448 from gatorsmile/unsetSerdeProp. (cherry picked from commit b67b35f76b684c5176dc683e7491fd01b43f4467) Signed-off-by: gatorsmile <gatorsm...@gmail.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/77625506 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/77625506 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/77625506 Branch: refs/heads/branch-2.1 Commit: 776255065c13df7b4505c225546b4b66cd929c76 Parents: 94272a9 Author: gatorsmile <gatorsm...@gmail.com> Authored: Tue Jan 3 11:43:47 2017 -0800 Committer: gatorsmile <gatorsm...@gmail.com> Committed: Tue Jan 3 11:44:11 2017 -0800 ---------------------------------------------------------------------- .../sql/catalyst/catalog/InMemoryCatalog.scala | 13 +++++ .../catalyst/catalog/ExternalCatalogSuite.scala | 48 +++++++++++++++-- .../spark/sql/execution/command/ddl.scala | 5 +- .../spark/sql/hive/execution/HiveDDLSuite.scala | 56 +++++++++++++++++++- 4 files changed, 113 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/77625506/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala index a6bebe1..9a6c732 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala @@ -229,9 +229,22 @@ class InMemoryCatalog( if (tableExists(db, table)) { val tableMeta = getTable(db, table) if (tableMeta.tableType == CatalogTableType.MANAGED) { + // Delete the data/directory for each partition + val locationAllParts = catalog(db).tables(table).partitions.values.toSeq.map(_.location) + locationAllParts.foreach { loc => + val partitionPath = new Path(loc) + try { + val fs = partitionPath.getFileSystem(hadoopConfig) + fs.delete(partitionPath, true) + } catch { + case e: IOException => + throw new SparkException(s"Unable to delete partition path $partitionPath", e) + } + } assert(tableMeta.storage.locationUri.isDefined, "Managed table should always have table location, as we will assign a default location " + "to it if it doesn't have one.") + // Delete the data/directory of the table val dir = new Path(tableMeta.location) try { val fs = dir.getFileSystem(hadoopConfig) http://git-wip-us.apache.org/repos/asf/spark/blob/77625506/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala index 00e663c..9d20602 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala @@ -324,7 +324,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac val table = CatalogTable( identifier = TableIdentifier("tbl", Some("db1")), tableType = CatalogTableType.MANAGED, - storage = CatalogStorageFormat(None, None, None, None, false, Map.empty), + storage = CatalogStorageFormat.empty, schema = new StructType() .add("col1", "int") .add("col2", "string") @@ -346,6 +346,46 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac assert(new Path(partitionLocation) == defaultPartitionLocation) } + test("create/drop partitions in managed tables with location") { + val catalog = newBasicCatalog() + val table = CatalogTable( + identifier = TableIdentifier("tbl", Some("db1")), + tableType = CatalogTableType.MANAGED, + storage = CatalogStorageFormat.empty, + schema = new StructType() + .add("col1", "int") + .add("col2", "string") + .add("partCol1", "int") + .add("partCol2", "string"), + provider = Some("hive"), + partitionColumnNames = Seq("partCol1", "partCol2")) + catalog.createTable(table, ignoreIfExists = false) + + val newLocationPart1 = newUriForDatabase() + val newLocationPart2 = newUriForDatabase() + + val partition1 = + CatalogTablePartition(Map("partCol1" -> "1", "partCol2" -> "2"), + storageFormat.copy(locationUri = Some(newLocationPart1))) + val partition2 = + CatalogTablePartition(Map("partCol1" -> "3", "partCol2" -> "4"), + storageFormat.copy(locationUri = Some(newLocationPart2))) + catalog.createPartitions("db1", "tbl", Seq(partition1), ignoreIfExists = false) + catalog.createPartitions("db1", "tbl", Seq(partition2), ignoreIfExists = false) + + assert(exists(newLocationPart1)) + assert(exists(newLocationPart2)) + + // the corresponding directory is dropped. + catalog.dropPartitions("db1", "tbl", Seq(partition1.spec), + ignoreIfNotExists = false, purge = false, retainData = false) + assert(!exists(newLocationPart1)) + + // all the remaining directories are dropped. + catalog.dropTable("db1", "tbl", ignoreIfNotExists = false, purge = false) + assert(!exists(newLocationPart2)) + } + test("list partition names") { val catalog = newBasicCatalog() val newPart = CatalogTablePartition(Map("a" -> "1", "b" -> "%="), storageFormat) @@ -459,7 +499,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac val table = CatalogTable( identifier = TableIdentifier("tbl", Some("db1")), tableType = CatalogTableType.MANAGED, - storage = CatalogStorageFormat(None, None, None, None, false, Map.empty), + storage = CatalogStorageFormat.empty, schema = new StructType() .add("col1", "int") .add("col2", "string") @@ -684,7 +724,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac val table = CatalogTable( identifier = TableIdentifier("my_table", Some("db1")), tableType = CatalogTableType.MANAGED, - storage = CatalogStorageFormat(None, None, None, None, false, Map.empty), + storage = CatalogStorageFormat.empty, schema = new StructType().add("a", "int").add("b", "string"), provider = Some("hive") ) @@ -717,7 +757,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac val table = CatalogTable( identifier = TableIdentifier("tbl", Some("db1")), tableType = CatalogTableType.MANAGED, - storage = CatalogStorageFormat(None, None, None, None, false, Map.empty), + storage = CatalogStorageFormat.empty, schema = new StructType() .add("col1", "int") .add("col2", "string") http://git-wip-us.apache.org/repos/asf/spark/blob/77625506/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index c62c142..b1bb565 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -329,13 +329,12 @@ case class AlterTableSerDePropertiesCommand( /** * Add Partition in ALTER TABLE: add the table partitions. * - * 'partitionSpecsAndLocs': the syntax of ALTER VIEW is identical to ALTER TABLE, - * EXCEPT that it is ILLEGAL to specify a LOCATION clause. * An error message will be issued if the partition exists, unless 'ifNotExists' is true. * * The syntax of this command is: * {{{ - * ALTER TABLE table ADD [IF NOT EXISTS] PARTITION spec [LOCATION 'loc1'] + * ALTER TABLE table ADD [IF NOT EXISTS] PARTITION spec1 [LOCATION 'loc1'] + * PARTITION spec2 [LOCATION 'loc2'] * }}} */ case class AlterTableAddPartitionCommand( http://git-wip-us.apache.org/repos/asf/spark/blob/77625506/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index f313db6..8b34219 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -199,6 +199,52 @@ class HiveDDLSuite assert(e.message == "Found duplicate column(s) in table definition of `tbl`: a") } + test("add/drop partition with location - managed table") { + val tab = "tab_with_partitions" + withTempDir { tmpDir => + val basePath = new File(tmpDir.getCanonicalPath) + val part1Path = new File(basePath + "/part1") + val part2Path = new File(basePath + "/part2") + val dirSet = part1Path :: part2Path :: Nil + + // Before data insertion, all the directory are empty + assert(dirSet.forall(dir => dir.listFiles == null || dir.listFiles.isEmpty)) + + withTable(tab) { + sql( + s""" + |CREATE TABLE $tab (key INT, value STRING) + |PARTITIONED BY (ds STRING, hr STRING) + """.stripMargin) + sql( + s""" + |ALTER TABLE $tab ADD + |PARTITION (ds='2008-04-08', hr=11) LOCATION '$part1Path' + |PARTITION (ds='2008-04-08', hr=12) LOCATION '$part2Path' + """.stripMargin) + assert(dirSet.forall(dir => dir.listFiles == null || dir.listFiles.isEmpty)) + + sql(s"INSERT OVERWRITE TABLE $tab partition (ds='2008-04-08', hr=11) SELECT 1, 'a'") + sql(s"INSERT OVERWRITE TABLE $tab partition (ds='2008-04-08', hr=12) SELECT 2, 'b'") + // add partition will not delete the data + assert(dirSet.forall(dir => dir.listFiles.nonEmpty)) + checkAnswer( + spark.table(tab), + Row(1, "a", "2008-04-08", "11") :: Row(2, "b", "2008-04-08", "12") :: Nil + ) + + sql(s"ALTER TABLE $tab DROP PARTITION (ds='2008-04-08', hr=11)") + // drop partition will delete the data + assert(part1Path.listFiles == null || part1Path.listFiles.isEmpty) + assert(part2Path.listFiles.nonEmpty) + + sql(s"DROP TABLE $tab") + // drop table will delete the data of the managed table + assert(dirSet.forall(dir => dir.listFiles == null || dir.listFiles.isEmpty)) + } + } + } + test("add/drop partitions - external table") { val catalog = spark.sessionState.catalog withTempDir { tmpDir => @@ -257,9 +303,15 @@ class HiveDDLSuite // drop partition will not delete the data of external table assert(dirSet.forall(dir => dir.listFiles.nonEmpty)) - sql(s"ALTER TABLE $externalTab ADD PARTITION (ds='2008-04-08', hr='12')") + sql( + s""" + |ALTER TABLE $externalTab ADD PARTITION (ds='2008-04-08', hr='12') + |PARTITION (ds='2008-04-08', hr=11) + """.stripMargin) assert(catalog.listPartitions(TableIdentifier(externalTab)).map(_.spec).toSet == - Set(Map("ds" -> "2008-04-08", "hr" -> "12"), Map("ds" -> "2008-04-09", "hr" -> "11"))) + Set(Map("ds" -> "2008-04-08", "hr" -> "11"), + Map("ds" -> "2008-04-08", "hr" -> "12"), + Map("ds" -> "2008-04-09", "hr" -> "11"))) // add partition will not delete the data assert(dirSet.forall(dir => dir.listFiles.nonEmpty)) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org