[CARBONDATA-1988] Fixed bug to remove empty partition directory for drop partition command
This closes #1786 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/3c3f33df Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/3c3f33df Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/3c3f33df Branch: refs/heads/fgdatamap Commit: 3c3f33dfcae84af11054eab8bde9ea83f1cf9f0d Parents: e349820 Author: Geetika Gupta <geetika.gu...@knoldus.in> Authored: Wed Jan 10 16:23:55 2018 +0530 Committer: ravipesala <ravi.pes...@gmail.com> Committed: Wed Jan 31 12:06:14 2018 +0530 ---------------------------------------------------------------------- .../core/metadata/PartitionMapFileStore.java | 7 ++++- .../StandardPartitionTableDropTestCase.scala | 27 ++++++++++++++++++++ .../spark/rdd/CarbonDropPartitionRDD.scala | 6 +++-- .../management/CarbonLoadDataCommand.scala | 9 ++++--- ...rbonAlterTableDropHivePartitionCommand.scala | 6 +++-- 5 files changed, 47 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/3c3f33df/core/src/main/java/org/apache/carbondata/core/metadata/PartitionMapFileStore.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/PartitionMapFileStore.java b/core/src/main/java/org/apache/carbondata/core/metadata/PartitionMapFileStore.java index 355d083..1e9cbc4 100644 --- a/core/src/main/java/org/apache/carbondata/core/metadata/PartitionMapFileStore.java +++ b/core/src/main/java/org/apache/carbondata/core/metadata/PartitionMapFileStore.java @@ -313,9 +313,12 @@ public class PartitionMapFileStore { * @param uniqueId * @param success */ - public void commitPartitions(String segmentPath, final String uniqueId, boolean success) { + public void commitPartitions(String segmentPath, final String uniqueId, boolean success, + String tablePath, List<String> partitionsToDrop) { CarbonFile carbonFile = FileFactory .getCarbonFile(segmentPath + "/" + uniqueId + CarbonTablePath.PARTITION_MAP_EXT + ".tmp"); + CarbonFile carbonPartFile = FileFactory + .getCarbonFile(tablePath + "/" + partitionsToDrop.get(0)); // write partition info to new file. if (carbonFile.exists()) { if (success) { @@ -324,6 +327,8 @@ public class PartitionMapFileStore { carbonFile.delete(); } } + //Remove the partition directory from table path + carbonPartFile.delete(); } /** http://git-wip-us.apache.org/repos/asf/carbondata/blob/3c3f33df/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableDropTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableDropTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableDropTestCase.scala index 2aa9145..aac823a 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableDropTestCase.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableDropTestCase.scala @@ -16,7 +16,10 @@ */ package org.apache.carbondata.spark.testsuite.standardpartition +import java.nio.file.{Files, LinkOption, Paths} + import org.apache.spark.sql.Row +import org.apache.spark.sql.test.TestQueryExecutor import org.apache.spark.sql.test.util.QueryTest import org.scalatest.BeforeAndAfterAll @@ -182,6 +185,29 @@ class StandardPartitionTableDropTestCase extends QueryTest with BeforeAndAfterAl Seq(Row(0))) } + test("test dropping on partition table for int partition column") { + sql( + """ + | CREATE TABLE partitionone1 (empname String, designation String, doj Timestamp, + | workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, + | projectcode int, projectjoindate Timestamp, projectenddate Date,attendance int, + | utilization int,salary int) + | PARTITIONED BY (empno int) + | STORED BY 'org.apache.carbondata.format' + """.stripMargin) + sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitionone1 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") + checkAnswer( + sql(s"""select count (*) from partitionone1"""), + sql(s"""select count (*) from originTable""")) + + checkAnswer( + sql(s"""select count (*) from partitionone1 where empno=11"""), + sql(s"""select count (*) from originTable where empno=11""")) + sql(s"""ALTER TABLE partitionone1 DROP PARTITION(empno='11')""") + assert(Files.notExists(Paths.get(TestQueryExecutor.warehouse + "/partitionone1/" + "empno=11"), LinkOption.NOFOLLOW_LINKS)) + sql("drop table if exists partitionone1") + } + override def afterAll = { dropTable } @@ -195,6 +221,7 @@ class StandardPartitionTableDropTestCase extends QueryTest with BeforeAndAfterAl sql("drop table if exists partitionshow") sql("drop table if exists staticpartition") sql("drop table if exists partitionallcompaction") + sql("drop table if exists partitionone1") } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/3c3f33df/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropPartitionRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropPartitionRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropPartitionRDD.scala index 0a79295..4806f9f 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropPartitionRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropPartitionRDD.scala @@ -103,7 +103,8 @@ class CarbonDropPartitionCommitRDD( tablePath: String, segments: Seq[String], success: Boolean, - uniqueId: String) + uniqueId: String, + partitions: Seq[String]) extends CarbonRDD[String](sc, Nil) { override def getPartitions: Array[Partition] = { @@ -117,7 +118,8 @@ class CarbonDropPartitionCommitRDD( val split = theSplit.asInstanceOf[CarbonDropPartition] logInfo("Commit partition information from : " + split.segmentPath) - new PartitionMapFileStore().commitPartitions(split.segmentPath, uniqueId, success) + new PartitionMapFileStore().commitPartitions(split.segmentPath, uniqueId, success, tablePath, + partitions.toList.asJava) var havePair = false var finished = false http://git-wip-us.apache.org/repos/asf/carbondata/blob/3c3f33df/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala index 7afbd92..226a625 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala @@ -813,7 +813,8 @@ case class CarbonLoadDataCommand( table.getTablePath, segments.asScala, success = false, - uniqueId).collect() + uniqueId, + partitionNames.toSeq).collect() throw e } @@ -827,7 +828,8 @@ case class CarbonLoadDataCommand( table.getTablePath, segments.asScala, success = false, - uniqueId).collect() + uniqueId, + partitionNames.toSeq).collect() throw e } // Commit the removed partitions in carbon store. @@ -836,7 +838,8 @@ case class CarbonLoadDataCommand( table.getTablePath, segments.asScala, success = true, - uniqueId).collect() + uniqueId, + partitionNames.toSeq).collect() // get valid segments val validsegments = new SegmentStatusManager( http://git-wip-us.apache.org/repos/asf/carbondata/blob/3c3f33df/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala index dbd686b..c3509a3 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala @@ -129,7 +129,8 @@ case class CarbonAlterTableDropHivePartitionCommand( table.getTablePath, segments.asScala, false, - uniqueId).collect() + uniqueId, + partitionNames.toSeq).collect() throw e } // commit the drop partitions from carbon store @@ -137,7 +138,8 @@ case class CarbonAlterTableDropHivePartitionCommand( table.getTablePath, segments.asScala, true, - uniqueId).collect() + uniqueId, + partitionNames.toSeq).collect() // Update the loadstatus with update time to clear cache from driver. val segmentSet = new util.HashSet[String](new SegmentStatusManager(table .getAbsoluteTableIdentifier).getValidAndInvalidSegments.getValidSegments)