[CARBONDATA-2426] Added fix for data mismatch after compaction on Partition 
with Pre-Aggregate tables

Problem: Partition directory is getting deleted when one on the segment is 
marked for delete and
another segment is loaded in the same partition.
In case of aggregate table we only have partition specs for the specified 
segments therefore
when the deleted segment is scanned against locationMap for stale partitions 
one of the
valid partition is considered as stale as we dont get partitionSpecs for all 
the segments.

Solution: Delete the index file instead of the partition directory and then 
delete the directory if it is empty.

This closes #2259


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/09feb9cc
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/09feb9cc
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/09feb9cc

Branch: refs/heads/spark-2.3
Commit: 09feb9cc2c80f648a5789923394dbdd41f2c8e67
Parents: 325eac2
Author: praveenmeenakshi56 <praveenmeenaksh...@gmail.com>
Authored: Wed May 2 15:39:28 2018 +0530
Committer: kunal642 <kunalkapoor...@gmail.com>
Committed: Tue May 8 17:38:07 2018 +0530

----------------------------------------------------------------------
 .../core/metadata/SegmentFileStore.java         | 17 +++++++-
 ...ndardPartitionWithPreaggregateTestCase.scala | 43 ++++++++++++++++++++
 2 files changed, 59 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/09feb9cc/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java 
b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
index 9326b1d..d72ded3 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
@@ -700,16 +700,31 @@ public class SegmentFileStore {
     FileFactory.deleteFile(segmentFilePath, 
FileFactory.getFileType(segmentFilePath));
   }
 
+  /**
+   * If partition specs are available, then check the location map for any 
index file path which is
+   * not present in the partitionSpecs. If found then delete that index file.
+   * If the partition directory is empty, then delete the directory also.
+   * If partition specs are null, then directly delete parent directory in 
locationMap.
+   */
   private static void deletePhysicalPartition(List<PartitionSpec> 
partitionSpecs,
       Map<String, List<String>> locationMap) throws IOException {
     for (Map.Entry<String, List<String>> entry : locationMap.entrySet()) {
-      Path location = new Path(entry.getKey()).getParent();
       if (partitionSpecs != null) {
+        Path location = new Path(entry.getKey());
         boolean exists = pathExistsInPartitionSpec(partitionSpecs, location);
         if (!exists) {
           
FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(location.toString()));
+          for (String carbonDataFile : entry.getValue()) {
+            
FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(carbonDataFile));
+          }
+        }
+        CarbonFile path = 
FileFactory.getCarbonFile(location.getParent().toString());
+        if (path.listFiles().length == 0) {
+          FileFactory.deleteAllCarbonFilesOfDir(
+              FileFactory.getCarbonFile(location.getParent().toString()));
         }
       } else {
+        Path location = new Path(entry.getKey()).getParent();
         // delete the segment folder
         CarbonFile segmentPath = 
FileFactory.getCarbonFile(location.toString());
         if (null != segmentPath) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/09feb9cc/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionWithPreaggregateTestCase.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionWithPreaggregateTestCase.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionWithPreaggregateTestCase.scala
index 8a3ae3f..84c07c4 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionWithPreaggregateTestCase.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionWithPreaggregateTestCase.scala
@@ -525,6 +525,49 @@ class StandardPartitionWithPreaggregateTestCase extends 
QueryTest with BeforeAnd
     sql("drop table if exists updatetime_8")
   }
 
+  test("Test data updation after compaction on Partition with Pre-Aggregate 
tables") {
+    sql("drop table if exists partitionallcompaction")
+    sql(
+      "create table partitionallcompaction(empno int,empname 
String,designation String," +
+      "workgroupcategory int,workgroupcategoryname String,deptno 
int,projectjoindate timestamp," +
+      "projectenddate date,attendance int,utilization int,salary int) 
partitioned by (deptname " +
+      "String,doj timestamp,projectcode int) stored  by 'carbondata' 
tblproperties" +
+      "('sort_scope'='global_sort')")
+    sql(
+      "create datamap sensor_1 on table partitionallcompaction using 
'preaggregate' as select " +
+      "sum(salary),doj, deptname,projectcode from partitionallcompaction group 
by doj," +
+      "deptname,projectcode")
+    sql(
+      s"""LOAD DATA local inpath '$resourcesPath/data.csv' OVERWRITE INTO TABLE
+         |partitionallcompaction OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= 
'"')""".stripMargin)
+    sql(
+      s"""LOAD DATA local inpath '$resourcesPath/data.csv' OVERWRITE INTO TABLE
+         |partitionallcompaction OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= 
'"')""".stripMargin)
+    sql(
+      s"""LOAD DATA local inpath '$resourcesPath/data.csv' OVERWRITE INTO TABLE
+         |partitionallcompaction PARTITION(deptname='Learning', doj, 
projectcode) OPTIONS
+         |('DELIMITER'= ',', 'QUOTECHAR'= '"') """.stripMargin)
+    sql(
+      s"""LOAD DATA local inpath '$resourcesPath/data.csv' OVERWRITE INTO TABLE
+         |partitionallcompaction PARTITION(deptname='configManagement', doj, 
projectcode) OPTIONS
+         |('DELIMITER'= ',', 'QUOTECHAR'= '"')""".stripMargin)
+    sql(
+      s"""LOAD DATA local inpath '$resourcesPath/data.csv' OVERWRITE INTO TABLE
+         |partitionallcompaction PARTITION(deptname='network', doj, 
projectcode) OPTIONS
+         |('DELIMITER'= ',', 'QUOTECHAR'= '"')""".stripMargin)
+    sql(
+      s"""LOAD DATA local inpath '$resourcesPath/data.csv' OVERWRITE INTO TABLE
+         |partitionallcompaction PARTITION(deptname='protocol', doj, 
projectcode) OPTIONS
+         |('DELIMITER'= ',', 'QUOTECHAR'= '"')""".stripMargin)
+    sql(
+      s"""LOAD DATA local inpath '$resourcesPath/data.csv' OVERWRITE INTO TABLE
+         |partitionallcompaction PARTITION(deptname='security', doj, 
projectcode) OPTIONS
+         |('DELIMITER'= ',', 'QUOTECHAR'= '"')""".stripMargin)
+    sql("ALTER TABLE partitionallcompaction COMPACT 'MINOR'").collect()
+    checkAnswer(sql("select count(empno) from partitionallcompaction where 
empno=14"),
+      Seq(Row(5)))
+  }
+
   test("Test data updation in Aggregate query after compaction on Partitioned 
table with Pre-Aggregate table") {
     sql("drop table if exists updatetime_8")
     sql("create table updatetime_8" +

Reply via email to