[CARBONDATA-2985]Fix issues in Table level compaction and TableProperties Issue :-
If 2nd Level compaction is 1 like 2,1 or 6,1 then only 1st time compaction is done subsequent compaction are ignored . ( like if 2,1 is given then only 0.1 is segment is created and other segments are ignore forever ) Table level compaction does not support ,0 as 2nd level compaction value but system level compaction supports same. Solution :- if 2nd level compaction value is 1 then user does not want 2nd level compaction at all which mean 2nd level compaction can be set to 0. remove check to support 2nd level compaction as 0 in table level. This closes #2794 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/30adaa8c Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/30adaa8c Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/30adaa8c Branch: refs/heads/branch-1.5 Commit: 30adaa8c15e430b94bd1448969f50cb2451e1746 Parents: 396c26f Author: BJangir <babulaljangir...@gmail.com> Authored: Tue Oct 2 00:17:29 2018 +0530 Committer: ravipesala <ravi.pes...@gmail.com> Committed: Thu Oct 4 18:58:49 2018 +0530 ---------------------------------------------------------------------- .../TableLevelCompactionOptionTest.scala | 84 ++++++++++++++++++++ .../carbondata/spark/util/CommonUtil.scala | 2 +- .../processing/merger/CarbonDataMergerUtil.java | 7 ++ 3 files changed, 92 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/30adaa8c/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/TableLevelCompactionOptionTest.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/TableLevelCompactionOptionTest.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/TableLevelCompactionOptionTest.scala index 458d656..7b138f7 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/TableLevelCompactionOptionTest.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/TableLevelCompactionOptionTest.scala @@ -271,4 +271,88 @@ class TableLevelCompactionOptionTest extends QueryTest assert(!segmentSequenceIds.contains("0.1")) assert(!segmentSequenceIds.contains("3.1")) } + + test("AUTO MERGE TRUE:Verify 2nd Level compaction equals to 1"){ + sql("DROP TABLE IF EXISTS tablecompaction_table") + sql( + """ + |create table tablecompaction_table( + |name string,age int) stored by 'carbondata' + |tblproperties('AUTO_LOAD_MERGE'='true','COMPACTION_LEVEL_THRESHOLD'='2,1') + """.stripMargin) + + for(i <-0 until 4){ + sql("insert into tablecompaction_table select 'a',12") + } + var segments = sql("SHOW SEGMENTS FOR TABLE tablecompaction_table") + var segmentSequenceIds = segments.collect().map { each => (each.toSeq) (0) } + assert(segmentSequenceIds.size==6) + assert(segmentSequenceIds.contains("0.1")) + assert(segmentSequenceIds.contains("2.1")) + } + + test("AUTO MERGE FALSE:Verify 2nd Level compaction equals to 1"){ + sql("DROP TABLE IF EXISTS tablecompaction_table") + sql( + """ + |create table tablecompaction_table( + |name string,age int) stored by 'carbondata' + |tblproperties('COMPACTION_LEVEL_THRESHOLD'='2,1') + """.stripMargin) + + for(i <-0 until 4){ + sql("insert into tablecompaction_table select 'a',12") + } + sql("alter table tablecompaction_table compact 'minor' ") + var segments = sql("SHOW SEGMENTS FOR TABLE tablecompaction_table") + var segmentSequenceIds = segments.collect().map { each => (each.toSeq) (0) } + assert(segmentSequenceIds.size==6) + assert(segmentSequenceIds.contains("0.1")) + assert(segmentSequenceIds.contains("2.1")) + } + + // 2nd Level compaction value = 0 is supported by system level(like 6,0) + // same need to support for table level also + test("Verify 2nd Level compaction equals to 0"){ + sql("DROP TABLE IF EXISTS tablecompaction_table") + sql( + """ + |create table tablecompaction_table( + |name string,age int) stored by 'carbondata' + |tblproperties('AUTO_LOAD_MERGE'='true','COMPACTION_LEVEL_THRESHOLD'='2,0') + """.stripMargin) + + for(i <-0 until 4){ + sql("insert into tablecompaction_table select 'a',12") + } + var segments = sql("SHOW SEGMENTS FOR TABLE tablecompaction_table") + var segmentSequenceIds = segments.collect().map { each => (each.toSeq) (0) } + assert(segmentSequenceIds.size==6) + assert(segmentSequenceIds.contains("0.1")) + assert(segmentSequenceIds.contains("2.1")) + } + + test("System Level:Verify 2nd Level compaction equals to 1"){ + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE, "true") + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.COMPACTION_SEGMENT_LEVEL_THRESHOLD, "2,1") + sql("DROP TABLE IF EXISTS tablecompaction_table") + sql( + """ + |create table tablecompaction_table( + |name string,age int) stored by 'carbondata' + """.stripMargin) + + for(i <-0 until 4){ + sql("insert into tablecompaction_table select 'a',12") + } + sql("alter table tablecompaction_table compact 'minor' ") + var segments = sql("SHOW SEGMENTS FOR TABLE tablecompaction_table") + var segmentSequenceIds = segments.collect().map { each => (each.toSeq) (0) } + assert(segmentSequenceIds.size==6) + assert(segmentSequenceIds.contains("0.1")) + assert(segmentSequenceIds.contains("2.1")) + } + } http://git-wip-us.apache.org/repos/asf/carbondata/blob/30adaa8c/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala index c2f805d..49b17fb 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala @@ -455,7 +455,7 @@ object CommonUtil { try { val levels: Array[String] = regularedStr.split(",") val thresholds = regularedStr.split(",").map(levelThresholdStr => levelThresholdStr.toInt) - if (!thresholds.forall(t => t < 100 && t > 0)) { + if (!thresholds.forall(t => t < 100 && t >= 0)) { throw new MalformedCarbonCommandException(s"Invalid $tblPropName value found: " + s"$regularedStr, only int values separated by comma and between 0 " + s"and 100 are supported.") http://git-wip-us.apache.org/repos/asf/carbondata/blob/30adaa8c/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java index 2951283..5b001bf 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java +++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java @@ -744,6 +744,13 @@ public final class CarbonDataMergerUtil { if (size >= 2) { level1Size = noOfSegmentLevelsCount[0]; level2Size = noOfSegmentLevelsCount[1]; + /* + Ex. if segs => 0.1,2,3 and threshold =2,1 + during 2nd time compaction,mergeCounter becomes 1 and we checks if mergeCounter==level2Size + then return mergedSegments which will return 0.1 and since only 1 segment(0.1) is identified , + no segment would go for compaction .So change 2nd level threshold to 0 if it is 1. + */ + level2Size = level2Size == 1 ? 0 : level2Size; } else if (size == 1) { level1Size = noOfSegmentLevelsCount[0]; }