[CARBONDATA-2426] Added fix for data mismatch after compaction on Partition with Pre-Aggregate tables
Problem: Partition directory is getting deleted when one on the segment is marked for delete and another segment is loaded in the same partition. In case of aggregate table we only have partition specs for the specified segments therefore when the deleted segment is scanned against locationMap for stale partitions one of the valid partition is considered as stale as we dont get partitionSpecs for all the segments. Solution: Delete the index file instead of the partition directory and then delete the directory if it is empty. This closes #2259 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/09feb9cc Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/09feb9cc Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/09feb9cc Branch: refs/heads/spark-2.3 Commit: 09feb9cc2c80f648a5789923394dbdd41f2c8e67 Parents: 325eac2 Author: praveenmeenakshi56 <praveenmeenaksh...@gmail.com> Authored: Wed May 2 15:39:28 2018 +0530 Committer: kunal642 <kunalkapoor...@gmail.com> Committed: Tue May 8 17:38:07 2018 +0530 ---------------------------------------------------------------------- .../core/metadata/SegmentFileStore.java | 17 +++++++- ...ndardPartitionWithPreaggregateTestCase.scala | 43 ++++++++++++++++++++ 2 files changed, 59 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/09feb9cc/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java index 9326b1d..d72ded3 100644 --- a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java +++ b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java @@ -700,16 +700,31 @@ public class SegmentFileStore { FileFactory.deleteFile(segmentFilePath, FileFactory.getFileType(segmentFilePath)); } + /** + * If partition specs are available, then check the location map for any index file path which is + * not present in the partitionSpecs. If found then delete that index file. + * If the partition directory is empty, then delete the directory also. + * If partition specs are null, then directly delete parent directory in locationMap. + */ private static void deletePhysicalPartition(List<PartitionSpec> partitionSpecs, Map<String, List<String>> locationMap) throws IOException { for (Map.Entry<String, List<String>> entry : locationMap.entrySet()) { - Path location = new Path(entry.getKey()).getParent(); if (partitionSpecs != null) { + Path location = new Path(entry.getKey()); boolean exists = pathExistsInPartitionSpec(partitionSpecs, location); if (!exists) { FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(location.toString())); + for (String carbonDataFile : entry.getValue()) { + FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(carbonDataFile)); + } + } + CarbonFile path = FileFactory.getCarbonFile(location.getParent().toString()); + if (path.listFiles().length == 0) { + FileFactory.deleteAllCarbonFilesOfDir( + FileFactory.getCarbonFile(location.getParent().toString())); } } else { + Path location = new Path(entry.getKey()).getParent(); // delete the segment folder CarbonFile segmentPath = FileFactory.getCarbonFile(location.toString()); if (null != segmentPath) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/09feb9cc/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionWithPreaggregateTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionWithPreaggregateTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionWithPreaggregateTestCase.scala index 8a3ae3f..84c07c4 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionWithPreaggregateTestCase.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionWithPreaggregateTestCase.scala @@ -525,6 +525,49 @@ class StandardPartitionWithPreaggregateTestCase extends QueryTest with BeforeAnd sql("drop table if exists updatetime_8") } + test("Test data updation after compaction on Partition with Pre-Aggregate tables") { + sql("drop table if exists partitionallcompaction") + sql( + "create table partitionallcompaction(empno int,empname String,designation String," + + "workgroupcategory int,workgroupcategoryname String,deptno int,projectjoindate timestamp," + + "projectenddate date,attendance int,utilization int,salary int) partitioned by (deptname " + + "String,doj timestamp,projectcode int) stored by 'carbondata' tblproperties" + + "('sort_scope'='global_sort')") + sql( + "create datamap sensor_1 on table partitionallcompaction using 'preaggregate' as select " + + "sum(salary),doj, deptname,projectcode from partitionallcompaction group by doj," + + "deptname,projectcode") + sql( + s"""LOAD DATA local inpath '$resourcesPath/data.csv' OVERWRITE INTO TABLE + |partitionallcompaction OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""".stripMargin) + sql( + s"""LOAD DATA local inpath '$resourcesPath/data.csv' OVERWRITE INTO TABLE + |partitionallcompaction OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""".stripMargin) + sql( + s"""LOAD DATA local inpath '$resourcesPath/data.csv' OVERWRITE INTO TABLE + |partitionallcompaction PARTITION(deptname='Learning', doj, projectcode) OPTIONS + |('DELIMITER'= ',', 'QUOTECHAR'= '"') """.stripMargin) + sql( + s"""LOAD DATA local inpath '$resourcesPath/data.csv' OVERWRITE INTO TABLE + |partitionallcompaction PARTITION(deptname='configManagement', doj, projectcode) OPTIONS + |('DELIMITER'= ',', 'QUOTECHAR'= '"')""".stripMargin) + sql( + s"""LOAD DATA local inpath '$resourcesPath/data.csv' OVERWRITE INTO TABLE + |partitionallcompaction PARTITION(deptname='network', doj, projectcode) OPTIONS + |('DELIMITER'= ',', 'QUOTECHAR'= '"')""".stripMargin) + sql( + s"""LOAD DATA local inpath '$resourcesPath/data.csv' OVERWRITE INTO TABLE + |partitionallcompaction PARTITION(deptname='protocol', doj, projectcode) OPTIONS + |('DELIMITER'= ',', 'QUOTECHAR'= '"')""".stripMargin) + sql( + s"""LOAD DATA local inpath '$resourcesPath/data.csv' OVERWRITE INTO TABLE + |partitionallcompaction PARTITION(deptname='security', doj, projectcode) OPTIONS + |('DELIMITER'= ',', 'QUOTECHAR'= '"')""".stripMargin) + sql("ALTER TABLE partitionallcompaction COMPACT 'MINOR'").collect() + checkAnswer(sql("select count(empno) from partitionallcompaction where empno=14"), + Seq(Row(5))) + } + test("Test data updation in Aggregate query after compaction on Partitioned table with Pre-Aggregate table") { sql("drop table if exists updatetime_8") sql("create table updatetime_8" +