Repository: spark
Updated Branches:
  refs/heads/branch-2.1 94272a960 -> 776255065


[SPARK-19048][SQL] Delete Partition Location when Dropping Managed Partitioned 
Tables in InMemoryCatalog

### What changes were proposed in this pull request?
The data in the managed table should be deleted after table is dropped. 
However, if the partition location is not under the location of the partitioned 
table, it is not deleted as expected. Users can specify any location for the 
partition when they adding a partition.

This PR is to delete partition location when dropping managed partitioned 
tables stored in `InMemoryCatalog`.

### How was this patch tested?
Added test cases for both HiveExternalCatalog and InMemoryCatalog

Author: gatorsmile <gatorsm...@gmail.com>

Closes #16448 from gatorsmile/unsetSerdeProp.

(cherry picked from commit b67b35f76b684c5176dc683e7491fd01b43f4467)
Signed-off-by: gatorsmile <gatorsm...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/77625506
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/77625506
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/77625506

Branch: refs/heads/branch-2.1
Commit: 776255065c13df7b4505c225546b4b66cd929c76
Parents: 94272a9
Author: gatorsmile <gatorsm...@gmail.com>
Authored: Tue Jan 3 11:43:47 2017 -0800
Committer: gatorsmile <gatorsm...@gmail.com>
Committed: Tue Jan 3 11:44:11 2017 -0800

----------------------------------------------------------------------
 .../sql/catalyst/catalog/InMemoryCatalog.scala  | 13 +++++
 .../catalyst/catalog/ExternalCatalogSuite.scala | 48 +++++++++++++++--
 .../spark/sql/execution/command/ddl.scala       |  5 +-
 .../spark/sql/hive/execution/HiveDDLSuite.scala | 56 +++++++++++++++++++-
 4 files changed, 113 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/77625506/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
index a6bebe1..9a6c732 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
@@ -229,9 +229,22 @@ class InMemoryCatalog(
     if (tableExists(db, table)) {
       val tableMeta = getTable(db, table)
       if (tableMeta.tableType == CatalogTableType.MANAGED) {
+        // Delete the data/directory for each partition
+        val locationAllParts = 
catalog(db).tables(table).partitions.values.toSeq.map(_.location)
+        locationAllParts.foreach { loc =>
+          val partitionPath = new Path(loc)
+          try {
+            val fs = partitionPath.getFileSystem(hadoopConfig)
+            fs.delete(partitionPath, true)
+          } catch {
+            case e: IOException =>
+              throw new SparkException(s"Unable to delete partition path 
$partitionPath", e)
+          }
+        }
         assert(tableMeta.storage.locationUri.isDefined,
           "Managed table should always have table location, as we will assign 
a default location " +
             "to it if it doesn't have one.")
+        // Delete the data/directory of the table
         val dir = new Path(tableMeta.location)
         try {
           val fs = dir.getFileSystem(hadoopConfig)

http://git-wip-us.apache.org/repos/asf/spark/blob/77625506/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
index 00e663c..9d20602 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
@@ -324,7 +324,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite 
with BeforeAndAfterEac
     val table = CatalogTable(
       identifier = TableIdentifier("tbl", Some("db1")),
       tableType = CatalogTableType.MANAGED,
-      storage = CatalogStorageFormat(None, None, None, None, false, Map.empty),
+      storage = CatalogStorageFormat.empty,
       schema = new StructType()
         .add("col1", "int")
         .add("col2", "string")
@@ -346,6 +346,46 @@ abstract class ExternalCatalogSuite extends SparkFunSuite 
with BeforeAndAfterEac
     assert(new Path(partitionLocation) == defaultPartitionLocation)
   }
 
+  test("create/drop partitions in managed tables with location") {
+    val catalog = newBasicCatalog()
+    val table = CatalogTable(
+      identifier = TableIdentifier("tbl", Some("db1")),
+      tableType = CatalogTableType.MANAGED,
+      storage = CatalogStorageFormat.empty,
+      schema = new StructType()
+        .add("col1", "int")
+        .add("col2", "string")
+        .add("partCol1", "int")
+        .add("partCol2", "string"),
+      provider = Some("hive"),
+      partitionColumnNames = Seq("partCol1", "partCol2"))
+    catalog.createTable(table, ignoreIfExists = false)
+
+    val newLocationPart1 = newUriForDatabase()
+    val newLocationPart2 = newUriForDatabase()
+
+    val partition1 =
+      CatalogTablePartition(Map("partCol1" -> "1", "partCol2" -> "2"),
+        storageFormat.copy(locationUri = Some(newLocationPart1)))
+    val partition2 =
+      CatalogTablePartition(Map("partCol1" -> "3", "partCol2" -> "4"),
+        storageFormat.copy(locationUri = Some(newLocationPart2)))
+    catalog.createPartitions("db1", "tbl", Seq(partition1), ignoreIfExists = 
false)
+    catalog.createPartitions("db1", "tbl", Seq(partition2), ignoreIfExists = 
false)
+
+    assert(exists(newLocationPart1))
+    assert(exists(newLocationPart2))
+
+    // the corresponding directory is dropped.
+    catalog.dropPartitions("db1", "tbl", Seq(partition1.spec),
+      ignoreIfNotExists = false, purge = false, retainData = false)
+    assert(!exists(newLocationPart1))
+
+    // all the remaining directories are dropped.
+    catalog.dropTable("db1", "tbl", ignoreIfNotExists = false, purge = false)
+    assert(!exists(newLocationPart2))
+  }
+
   test("list partition names") {
     val catalog = newBasicCatalog()
     val newPart = CatalogTablePartition(Map("a" -> "1", "b" -> "%="), 
storageFormat)
@@ -459,7 +499,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite 
with BeforeAndAfterEac
     val table = CatalogTable(
       identifier = TableIdentifier("tbl", Some("db1")),
       tableType = CatalogTableType.MANAGED,
-      storage = CatalogStorageFormat(None, None, None, None, false, Map.empty),
+      storage = CatalogStorageFormat.empty,
       schema = new StructType()
         .add("col1", "int")
         .add("col2", "string")
@@ -684,7 +724,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite 
with BeforeAndAfterEac
     val table = CatalogTable(
       identifier = TableIdentifier("my_table", Some("db1")),
       tableType = CatalogTableType.MANAGED,
-      storage = CatalogStorageFormat(None, None, None, None, false, Map.empty),
+      storage = CatalogStorageFormat.empty,
       schema = new StructType().add("a", "int").add("b", "string"),
       provider = Some("hive")
     )
@@ -717,7 +757,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite 
with BeforeAndAfterEac
     val table = CatalogTable(
       identifier = TableIdentifier("tbl", Some("db1")),
       tableType = CatalogTableType.MANAGED,
-      storage = CatalogStorageFormat(None, None, None, None, false, Map.empty),
+      storage = CatalogStorageFormat.empty,
       schema = new StructType()
         .add("col1", "int")
         .add("col2", "string")

http://git-wip-us.apache.org/repos/asf/spark/blob/77625506/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index c62c142..b1bb565 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -329,13 +329,12 @@ case class AlterTableSerDePropertiesCommand(
 /**
  * Add Partition in ALTER TABLE: add the table partitions.
  *
- * 'partitionSpecsAndLocs': the syntax of ALTER VIEW is identical to ALTER 
TABLE,
- * EXCEPT that it is ILLEGAL to specify a LOCATION clause.
  * An error message will be issued if the partition exists, unless 
'ifNotExists' is true.
  *
  * The syntax of this command is:
  * {{{
- *   ALTER TABLE table ADD [IF NOT EXISTS] PARTITION spec [LOCATION 'loc1']
+ *   ALTER TABLE table ADD [IF NOT EXISTS] PARTITION spec1 [LOCATION 'loc1']
+ *                                         PARTITION spec2 [LOCATION 'loc2']
  * }}}
  */
 case class AlterTableAddPartitionCommand(

http://git-wip-us.apache.org/repos/asf/spark/blob/77625506/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
index f313db6..8b34219 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
@@ -199,6 +199,52 @@ class HiveDDLSuite
     assert(e.message == "Found duplicate column(s) in table definition of 
`tbl`: a")
   }
 
+  test("add/drop partition with location - managed table") {
+    val tab = "tab_with_partitions"
+    withTempDir { tmpDir =>
+      val basePath = new File(tmpDir.getCanonicalPath)
+      val part1Path = new File(basePath + "/part1")
+      val part2Path = new File(basePath + "/part2")
+      val dirSet = part1Path :: part2Path :: Nil
+
+      // Before data insertion, all the directory are empty
+      assert(dirSet.forall(dir => dir.listFiles == null || 
dir.listFiles.isEmpty))
+
+      withTable(tab) {
+        sql(
+          s"""
+             |CREATE TABLE $tab (key INT, value STRING)
+             |PARTITIONED BY (ds STRING, hr STRING)
+           """.stripMargin)
+        sql(
+          s"""
+             |ALTER TABLE $tab ADD
+             |PARTITION (ds='2008-04-08', hr=11) LOCATION '$part1Path'
+             |PARTITION (ds='2008-04-08', hr=12) LOCATION '$part2Path'
+           """.stripMargin)
+        assert(dirSet.forall(dir => dir.listFiles == null || 
dir.listFiles.isEmpty))
+
+        sql(s"INSERT OVERWRITE TABLE $tab partition (ds='2008-04-08', hr=11) 
SELECT 1, 'a'")
+        sql(s"INSERT OVERWRITE TABLE $tab partition (ds='2008-04-08', hr=12) 
SELECT 2, 'b'")
+        // add partition will not delete the data
+        assert(dirSet.forall(dir => dir.listFiles.nonEmpty))
+        checkAnswer(
+          spark.table(tab),
+          Row(1, "a", "2008-04-08", "11") :: Row(2, "b", "2008-04-08", "12") 
:: Nil
+        )
+
+        sql(s"ALTER TABLE $tab DROP PARTITION (ds='2008-04-08', hr=11)")
+        // drop partition will delete the data
+        assert(part1Path.listFiles == null || part1Path.listFiles.isEmpty)
+        assert(part2Path.listFiles.nonEmpty)
+
+        sql(s"DROP TABLE $tab")
+        // drop table will delete the data of the managed table
+        assert(dirSet.forall(dir => dir.listFiles == null || 
dir.listFiles.isEmpty))
+      }
+    }
+  }
+
   test("add/drop partitions - external table") {
     val catalog = spark.sessionState.catalog
     withTempDir { tmpDir =>
@@ -257,9 +303,15 @@ class HiveDDLSuite
         // drop partition will not delete the data of external table
         assert(dirSet.forall(dir => dir.listFiles.nonEmpty))
 
-        sql(s"ALTER TABLE $externalTab ADD PARTITION (ds='2008-04-08', 
hr='12')")
+        sql(
+          s"""
+             |ALTER TABLE $externalTab ADD PARTITION (ds='2008-04-08', hr='12')
+             |PARTITION (ds='2008-04-08', hr=11)
+          """.stripMargin)
         
assert(catalog.listPartitions(TableIdentifier(externalTab)).map(_.spec).toSet ==
-          Set(Map("ds" -> "2008-04-08", "hr" -> "12"), Map("ds" -> 
"2008-04-09", "hr" -> "11")))
+          Set(Map("ds" -> "2008-04-08", "hr" -> "11"),
+            Map("ds" -> "2008-04-08", "hr" -> "12"),
+            Map("ds" -> "2008-04-09", "hr" -> "11")))
         // add partition will not delete the data
         assert(dirSet.forall(dir => dir.listFiles.nonEmpty))
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to