[CARBONDATA-1988] Fixed bug to remove empty partition directory for drop 
partition command

This closes #1786


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/3c3f33df
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/3c3f33df
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/3c3f33df

Branch: refs/heads/fgdatamap
Commit: 3c3f33dfcae84af11054eab8bde9ea83f1cf9f0d
Parents: e349820
Author: Geetika Gupta <geetika.gu...@knoldus.in>
Authored: Wed Jan 10 16:23:55 2018 +0530
Committer: ravipesala <ravi.pes...@gmail.com>
Committed: Wed Jan 31 12:06:14 2018 +0530

----------------------------------------------------------------------
 .../core/metadata/PartitionMapFileStore.java    |  7 ++++-
 .../StandardPartitionTableDropTestCase.scala    | 27 ++++++++++++++++++++
 .../spark/rdd/CarbonDropPartitionRDD.scala      |  6 +++--
 .../management/CarbonLoadDataCommand.scala      |  9 ++++---
 ...rbonAlterTableDropHivePartitionCommand.scala |  6 +++--
 5 files changed, 47 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/3c3f33df/core/src/main/java/org/apache/carbondata/core/metadata/PartitionMapFileStore.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/metadata/PartitionMapFileStore.java
 
b/core/src/main/java/org/apache/carbondata/core/metadata/PartitionMapFileStore.java
index 355d083..1e9cbc4 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/metadata/PartitionMapFileStore.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/metadata/PartitionMapFileStore.java
@@ -313,9 +313,12 @@ public class PartitionMapFileStore {
    * @param uniqueId
    * @param success
    */
-  public void commitPartitions(String segmentPath, final String uniqueId, 
boolean success) {
+  public void commitPartitions(String segmentPath, final String uniqueId, 
boolean success,
+      String tablePath, List<String> partitionsToDrop) {
     CarbonFile carbonFile = FileFactory
         .getCarbonFile(segmentPath + "/" + uniqueId + 
CarbonTablePath.PARTITION_MAP_EXT + ".tmp");
+    CarbonFile carbonPartFile = FileFactory
+        .getCarbonFile(tablePath + "/" + partitionsToDrop.get(0));
     // write partition info to new file.
     if (carbonFile.exists()) {
       if (success) {
@@ -324,6 +327,8 @@ public class PartitionMapFileStore {
         carbonFile.delete();
       }
     }
+    //Remove the partition directory from table path
+    carbonPartFile.delete();
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3c3f33df/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableDropTestCase.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableDropTestCase.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableDropTestCase.scala
index 2aa9145..aac823a 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableDropTestCase.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableDropTestCase.scala
@@ -16,7 +16,10 @@
  */
 package org.apache.carbondata.spark.testsuite.standardpartition
 
+import java.nio.file.{Files, LinkOption, Paths}
+
 import org.apache.spark.sql.Row
+import org.apache.spark.sql.test.TestQueryExecutor
 import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.BeforeAndAfterAll
 
@@ -182,6 +185,29 @@ class StandardPartitionTableDropTestCase extends QueryTest 
with BeforeAndAfterAl
       Seq(Row(0)))
   }
 
+  test("test dropping on partition table for int partition column") {
+    sql(
+      """
+        | CREATE TABLE partitionone1 (empname String, designation String, doj 
Timestamp,
+        |  workgroupcategory int, workgroupcategoryname String, deptno int, 
deptname String,
+        |  projectcode int, projectjoindate Timestamp, projectenddate 
Date,attendance int,
+        |  utilization int,salary int)
+        | PARTITIONED BY (empno int)
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE 
partitionone1 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+    checkAnswer(
+      sql(s"""select count (*) from partitionone1"""),
+      sql(s"""select count (*) from originTable"""))
+
+    checkAnswer(
+      sql(s"""select count (*) from partitionone1 where empno=11"""),
+      sql(s"""select count (*) from originTable where empno=11"""))
+    sql(s"""ALTER TABLE partitionone1 DROP PARTITION(empno='11')""")
+    assert(Files.notExists(Paths.get(TestQueryExecutor.warehouse + 
"/partitionone1/" + "empno=11"), LinkOption.NOFOLLOW_LINKS))
+    sql("drop table if exists partitionone1")
+  }
+
   override def afterAll = {
     dropTable
   }
@@ -195,6 +221,7 @@ class StandardPartitionTableDropTestCase extends QueryTest 
with BeforeAndAfterAl
     sql("drop table if exists partitionshow")
     sql("drop table if exists staticpartition")
     sql("drop table if exists partitionallcompaction")
+    sql("drop table if exists partitionone1")
   }
 
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3c3f33df/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropPartitionRDD.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropPartitionRDD.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropPartitionRDD.scala
index 0a79295..4806f9f 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropPartitionRDD.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropPartitionRDD.scala
@@ -103,7 +103,8 @@ class CarbonDropPartitionCommitRDD(
     tablePath: String,
     segments: Seq[String],
     success: Boolean,
-    uniqueId: String)
+    uniqueId: String,
+    partitions: Seq[String])
   extends CarbonRDD[String](sc, Nil) {
 
   override def getPartitions: Array[Partition] = {
@@ -117,7 +118,8 @@ class CarbonDropPartitionCommitRDD(
       val split = theSplit.asInstanceOf[CarbonDropPartition]
       logInfo("Commit partition information from : " + split.segmentPath)
 
-      new PartitionMapFileStore().commitPartitions(split.segmentPath, 
uniqueId, success)
+      new PartitionMapFileStore().commitPartitions(split.segmentPath, 
uniqueId, success, tablePath,
+        partitions.toList.asJava)
 
       var havePair = false
       var finished = false

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3c3f33df/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
index 7afbd92..226a625 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
@@ -813,7 +813,8 @@ case class CarbonLoadDataCommand(
             table.getTablePath,
             segments.asScala,
             success = false,
-            uniqueId).collect()
+            uniqueId,
+            partitionNames.toSeq).collect()
           throw e
       }
 
@@ -827,7 +828,8 @@ case class CarbonLoadDataCommand(
             table.getTablePath,
             segments.asScala,
             success = false,
-            uniqueId).collect()
+            uniqueId,
+            partitionNames.toSeq).collect()
           throw e
       }
       // Commit the removed partitions in carbon store.
@@ -836,7 +838,8 @@ case class CarbonLoadDataCommand(
         table.getTablePath,
         segments.asScala,
         success = true,
-        uniqueId).collect()
+        uniqueId,
+        partitionNames.toSeq).collect()
       // get valid segments
       val validsegments =
         new SegmentStatusManager(

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3c3f33df/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala
index dbd686b..c3509a3 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala
@@ -129,7 +129,8 @@ case class CarbonAlterTableDropHivePartitionCommand(
             table.getTablePath,
             segments.asScala,
             false,
-            uniqueId).collect()
+            uniqueId,
+            partitionNames.toSeq).collect()
           throw e
       }
       // commit the drop partitions from carbon store
@@ -137,7 +138,8 @@ case class CarbonAlterTableDropHivePartitionCommand(
         table.getTablePath,
         segments.asScala,
         true,
-        uniqueId).collect()
+        uniqueId,
+        partitionNames.toSeq).collect()
       // Update the loadstatus with update time to clear cache from driver.
       val segmentSet = new util.HashSet[String](new SegmentStatusManager(table
         
.getAbsoluteTableIdentifier).getValidAndInvalidSegments.getValidSegments)

Reply via email to