This is an automated email from the ASF dual-hosted git repository. leirui pushed a commit to branch research/M4-visualization in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 46963054960006e020a4ba3d1886ef95a9390e64 Author: Lei Rui <[email protected]> AuthorDate: Wed Oct 18 16:27:35 2023 +0800 update --- .../groupby/LocalGroupByExecutor4MinMax.java | 311 +++++---------------- .../apache/iotdb/db/integration/m4/MyTest8.java | 3 + .../iotdb/tsfile/read/common/ChunkSuit4CPV.java | 1 + 3 files changed, 72 insertions(+), 243 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutor4MinMax.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutor4MinMax.java index ff074f588cb..44f29f9d897 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutor4MinMax.java +++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutor4MinMax.java @@ -266,30 +266,30 @@ public class LocalGroupByExecutor4MinMax implements GroupByExecutor { currentChunkList.add(chunkSuit4CPV); itr.remove(); } else { + // TODO modify logic // the chunk partially overlaps in time with the current M4 interval Ii. // load this chunk, split it on deletes and all w intervals. // add to currentChunkList and futureChunkList. itr.remove(); - // B: loads chunk data from disk to memory - // C: decompress page data, split time&value buffers - PageReader pageReader = - FileLoaderUtils.loadPageReaderList4CPV( - chunkSuit4CPV.getChunkMetadata(), this.timeFilter); - // ATTENTION: YOU HAVE TO ENSURE THAT THERE IS ONLY ONE PAGE IN A CHUNK, - // BECAUSE THE WHOLE IMPLEMENTATION IS BASED ON THIS ASSUMPTION. - // OTHERWISE, PAGEREADER IS FOR THE FIRST PAGE IN THE CHUNK WHILE - // STEPREGRESS IS FOR THE LAST PAGE IN THE CHUNK (THE MERGE OF STEPREGRESS IS ASSIGN - // DIRECTLY), WHICH WILL INTRODUCE BUGS! - - // chunk data read operation (b) get the closest data point after or before a timestamp - pageReader.split4CPV( - startTime, - endTime, - interval, - curStartTime, - currentChunkList, - splitChunkList, - chunkMetadata); + int numberOfSpans = + (int) + Math.floor( + (Math.min(chunkMetadata.getEndTime(), endTime - 1) - curStartTime) + * 1.0 + / interval) + + 1; + for (int n = 0; n < numberOfSpans; n++) { + ChunkSuit4CPV newChunkSuit4CPV = new ChunkSuit4CPV(chunkMetadata, null, true); + newChunkSuit4CPV.needsUpdateStartEndPos = + true; // note this, because this chunk does not fall within a time span, but cross + if (n == 0) { + currentChunkList.add(newChunkSuit4CPV); + } else { + int globalIdx = curIdx + n; // note splitChunkList uses global idx key + splitChunkList.computeIfAbsent(globalIdx, k -> new ArrayList<>()); + splitChunkList.get(globalIdx).add(newChunkSuit4CPV); + } + } } } } @@ -396,6 +396,30 @@ public class LocalGroupByExecutor4MinMax implements GroupByExecutor { .getPageReader() .setDeleteIntervalList(chunkSuit4CPV.getChunkMetadata().getDeleteIntervalList()); } + // TODO add logic + if (chunkSuit4CPV + .needsUpdateStartEndPos) { // otherwise the chunk falls completely with the current + // time span + chunkSuit4CPV.needsUpdateStartEndPos = false; + // update startPos & endPos by dealing with the current time span virtual deletes + long leftEndIncluded = curStartTime; + long rightEndExcluded = curStartTime + interval; + int FP_pos = -1; + int LP_pos = -1; + if (leftEndIncluded > chunkSuit4CPV.statistics.getStartTime()) { + FP_pos = chunkSuit4CPV.updateFPwithTheClosetPointEqualOrAfter(leftEndIncluded); + } + if (rightEndExcluded <= chunkSuit4CPV.statistics.getEndTime()) { + // -1 is because right end is excluded end + LP_pos = + chunkSuit4CPV.updateLPwithTheClosetPointEqualOrBefore(rightEndExcluded - 1); + } + if (FP_pos != -1 && LP_pos != -1 && FP_pos > LP_pos) { + // means the chunk has no point in this span + currentChunkList.remove(chunkSuit4CPV); + } + } + // chunk data read operation (c) // chunkSuit4CPV.getPageReader().updateBPTP(chunkSuit4CPV); chunkSuit4CPV.getPageReader().updateBP_withValueIndex(chunkSuit4CPV); @@ -594,6 +618,30 @@ public class LocalGroupByExecutor4MinMax implements GroupByExecutor { .getPageReader() .setDeleteIntervalList(chunkSuit4CPV.getChunkMetadata().getDeleteIntervalList()); } + // TODO add logic + if (chunkSuit4CPV + .needsUpdateStartEndPos) { // otherwise the chunk falls completely with the current + // time span + chunkSuit4CPV.needsUpdateStartEndPos = false; + // update startPos & endPos by dealing with the current time span virtual deletes + long leftEndIncluded = curStartTime; + long rightEndExcluded = curStartTime + interval; + int FP_pos = -1; + int LP_pos = -1; + if (leftEndIncluded > chunkSuit4CPV.statistics.getStartTime()) { + FP_pos = chunkSuit4CPV.updateFPwithTheClosetPointEqualOrAfter(leftEndIncluded); + } + if (rightEndExcluded <= chunkSuit4CPV.statistics.getEndTime()) { + // -1 is because right end is excluded end + LP_pos = + chunkSuit4CPV.updateLPwithTheClosetPointEqualOrBefore(rightEndExcluded - 1); + } + if (FP_pos != -1 && LP_pos != -1 && FP_pos > LP_pos) { + // means the chunk has no point in this span + currentChunkList.remove(chunkSuit4CPV); + } + } + // chunk data read operation (c) // chunkSuit4CPV.getPageReader().updateBPTP(chunkSuit4CPV); chunkSuit4CPV.getPageReader().updateTP_withValueIndex(chunkSuit4CPV); // @@ -721,229 +769,6 @@ public class LocalGroupByExecutor4MinMax implements GroupByExecutor { } } - private void calculateFirstPoint( - List<ChunkSuit4CPV> currentChunkList, - long startTime, - long endTime, - long interval, - long curStartTime) - throws IOException { - // IOMonitor2.M4_LSM_status = Operation.M4_LSM_FP; - while (currentChunkList.size() > 0) { // loop 1 - // sorted by startTime and version, find FP candidate - currentChunkList.sort( - new Comparator<ChunkSuit4CPV>() { // double check the sort order logic for different - // aggregations - public int compare(ChunkSuit4CPV o1, ChunkSuit4CPV o2) { - // NOTE here get statistics from ChunkSuit4CPV, not from ChunkSuit4CPV.ChunkMetadata - int res = - ((Comparable) (o1.getStatistics().getStartTime())) - .compareTo(o2.getStatistics().getStartTime()); - if (res != 0) { - return res; - } else { - return new MergeReaderPriority( - o2.getChunkMetadata().getVersion(), - o2.getChunkMetadata().getOffsetOfChunkHeader()) - .compareTo( - new MergeReaderPriority( - o1.getChunkMetadata().getVersion(), - o1.getChunkMetadata().getOffsetOfChunkHeader())); - } - } - }); - - ChunkSuit4CPV susp_candidate = currentChunkList.get(0); - if (susp_candidate.isLazyLoad()) { - // means the chunk is already lazy loaded, then load the chunk, apply deletes, update - // statistics, - // cancel the lazy loaded mark, and back to loop 1 - if (susp_candidate.getPageReader() == null) { - PageReader pageReader = - FileLoaderUtils.loadPageReaderList4CPV( - susp_candidate.getChunkMetadata(), this.timeFilter); - // ATTENTION: YOU HAVE TO ENSURE THAT THERE IS ONLY ONE PAGE IN A CHUNK, - // BECAUSE THE WHOLE IMPLEMENTATION IS BASED ON THIS ASSUMPTION. - // OTHERWISE, PAGEREADER IS FOR THE FIRST PAGE IN THE CHUNK WHILE - // STEPREGRESS IS FOR THE LAST PAGE IN THE CHUNK (THE MERGE OF STEPREGRESS IS ASSIGN - // DIRECTLY), WHICH WILL INTRODUCE BUGS! - susp_candidate.setPageReader(pageReader); - } - // chunk data read operation (b): get the closest data point after or before a timestamp - susp_candidate.updateFPwithTheClosetPointEqualOrAfter( - susp_candidate.getStatistics().getStartTime()); - susp_candidate.setLazyLoad(false); // DO NOT FORGET THIS!!! - continue; // back to loop 1 - } else { - // the chunk has not been lazy loaded, then verify whether the candidate point is deleted - // Note the higher versions of deletes are guaranteed by - // QueryUtils.modifyChunkMetaData(chunkMetadataList,pathModifications) - // NOTE here get statistics from ChunkSuit4CPV, not from ChunkSuit4CPV.ChunkMetadata - long candidateTimestamp = susp_candidate.getStatistics().getStartTime(); // check - Object candidateValue = susp_candidate.getStatistics().getFirstValue(); // check - - boolean isDeletedItself = false; - long deleteEndTime = -1; - List<TimeRange> deleteIntervalList = - susp_candidate.getChunkMetadata().getDeleteIntervalList(); - if (deleteIntervalList != null) { - int deleteCursor = 0; - while (deleteCursor < deleteIntervalList.size()) { - if (deleteIntervalList.get(deleteCursor).getMax() < candidateTimestamp) { - deleteCursor++; - } else if (deleteIntervalList.get(deleteCursor).contains(candidateTimestamp)) { - isDeletedItself = true; - deleteEndTime = deleteIntervalList.get(deleteCursor).getMax(); - break; // since delete intervals are already sorted and merged - } else { - break; // since delete intervals are already sorted and merged - } - } - } - if (isDeletedItself) { - // deleteEndTime may be after the current endTime, - // because deleteStartTime can be after the startTime of the whole chunk - if (deleteEndTime - >= susp_candidate.getStatistics().getEndTime()) { // NOTE here calculate FP - // deleted as a whole - currentChunkList.remove(susp_candidate); - } else { - // the candidate point is deleted, then label the chunk as already lazy loaded, - // update chunkStartTime without loading data, and back to loop 1 - susp_candidate.setLazyLoad(true); - // NOTE here get statistics from ChunkSuit4CPV, not from ChunkSuit4CPV.ChunkMetadata - susp_candidate.getStatistics().setStartTime(deleteEndTime + 1); // check - // +1 is because delete is closed interval - } - continue; // back to loop 1 - } else { - // the candidate point is not deleted, then it is the final result - results - .get(0) - .updateResultUsingValues( - new long[] {candidateTimestamp}, 1, new Object[] {candidateValue}); - results - .get(2) - .updateResultUsingValues( - new long[] {candidateTimestamp}, 1, new Object[] {candidateValue}); - return; - } - } - } - } - - private void calculateLastPoint( - List<ChunkSuit4CPV> currentChunkList, - long startTime, - long endTime, - long interval, - long curStartTime) - throws IOException { - // IOMonitor2.M4_LSM_status = Operation.M4_LSM_LP; - while (currentChunkList.size() > 0) { // loop 1 - // sorted by endTime and version, find LP candidate - currentChunkList.sort( - new Comparator<ChunkSuit4CPV>() { - // aggregations - public int compare(ChunkSuit4CPV o1, ChunkSuit4CPV o2) { - int res = - ((Comparable) (o2.getStatistics().getEndTime())) - .compareTo(o1.getStatistics().getEndTime()); - // NOTE here get statistics from ChunkSuit4CPV, not from ChunkSuit4CPV.ChunkMetadata - if (res != 0) { - return res; - } else { - return new MergeReaderPriority( - o2.getChunkMetadata().getVersion(), - o2.getChunkMetadata().getOffsetOfChunkHeader()) - .compareTo( - new MergeReaderPriority( - o1.getChunkMetadata().getVersion(), - o1.getChunkMetadata().getOffsetOfChunkHeader())); - } - } - }); - - ChunkSuit4CPV susp_candidate = currentChunkList.get(0); - if (susp_candidate.isLazyLoad()) { - // means the chunk is already lazy loaded, then load the chunk, apply deletes, update - // statistics, - // cancel the lazy loaded mark, and back to loop 1 - if (susp_candidate.getPageReader() == null) { - PageReader pageReader = - FileLoaderUtils.loadPageReaderList4CPV( - susp_candidate.getChunkMetadata(), this.timeFilter); - // ATTENTION: YOU HAVE TO ENSURE THAT THERE IS ONLY ONE PAGE IN A CHUNK, - // BECAUSE THE WHOLE IMPLEMENTATION IS BASED ON THIS ASSUMPTION. - // OTHERWISE, PAGEREADER IS FOR THE FIRST PAGE IN THE CHUNK WHILE - // STEPREGRESS IS FOR THE LAST PAGE IN THE CHUNK (THE MERGE OF STEPREGRESS IS ASSIGN - // DIRECTLY), WHICH WILL INTRODUCE BUGS! - susp_candidate.setPageReader(pageReader); - } - // update LP equal to or before statistics.getEndTime - // (b) get the closest data point after or before a timestamp - susp_candidate.updateLPwithTheClosetPointEqualOrBefore( - susp_candidate.getStatistics().getEndTime()); // DEBUG - susp_candidate.setLazyLoad(false); // DO NOT FORGET THIS!!! - continue; // back to loop 1 - } else { - // the chunk has not been lazy loaded, then verify whether the candidate point is deleted - // Note the higher versions of deletes are guaranteed by - // QueryUtils.modifyChunkMetaData(chunkMetadataList,pathModifications) - // NOTE here get statistics from ChunkSuit4CPV, not from ChunkSuit4CPV.ChunkMetadata - long candidateTimestamp = susp_candidate.getStatistics().getEndTime(); // check - Object candidateValue = susp_candidate.getStatistics().getLastValue(); // check - - boolean isDeletedItself = false; - long deleteStartTime = Long.MAX_VALUE; // check - List<TimeRange> deleteIntervalList = - susp_candidate.getChunkMetadata().getDeleteIntervalList(); - if (deleteIntervalList != null) { - int deleteCursor = 0; - while (deleteCursor < deleteIntervalList.size()) { - if (deleteIntervalList.get(deleteCursor).getMax() < candidateTimestamp) { - deleteCursor++; - } else if (deleteIntervalList.get(deleteCursor).contains(candidateTimestamp)) { - isDeletedItself = true; - deleteStartTime = deleteIntervalList.get(deleteCursor).getMin(); - break; // since delete intervals are already sorted and merged - } else { - break; // since delete intervals are already sorted and merged - } - } - } - if (isDeletedItself) { - // deleteStartTime may be before the current startTime, - // because deleteEndTime can be before the endTime of the whole chunk - if (deleteStartTime <= susp_candidate.getStatistics().getStartTime()) { - // NOTE here calculate LP. - // deleted as a whole - currentChunkList.remove(susp_candidate); - } else { - susp_candidate.setLazyLoad(true); - // NOTE here get statistics from ChunkSuit4CPV, not from - // ChunkSuit4CPV.ChunkMetadata - susp_candidate.getStatistics().setEndTime(deleteStartTime - 1); - // -1 is because delete is closed interval - // check - } - continue; // back to loop 1 - } else { - // the candidate point is not deleted, then it is the final result - results - .get(1) - .updateResultUsingValues( - new long[] {candidateTimestamp}, 1, new Object[] {candidateValue}); - results - .get(3) - .updateResultUsingValues( - new long[] {candidateTimestamp}, 1, new Object[] {candidateValue}); - return; - } - } - } - } - @Override public Pair<Long, Object> peekNextNotNullValue(long nextStartTime, long nextEndTime) throws IOException { diff --git a/server/src/test/java/org/apache/iotdb/db/integration/m4/MyTest8.java b/server/src/test/java/org/apache/iotdb/db/integration/m4/MyTest8.java index e3a8a2c3b53..df261cfc175 100644 --- a/server/src/test/java/org/apache/iotdb/db/integration/m4/MyTest8.java +++ b/server/src/test/java/org/apache/iotdb/db/integration/m4/MyTest8.java @@ -41,6 +41,7 @@ import java.util.Locale; import static org.junit.Assert.fail; public class MyTest8 { + // test MinMax-LSM private static final String TIMESTAMP_STR = "Time"; @@ -65,6 +66,7 @@ public class MyTest8 { @Before public void setUp() throws Exception { TSFileDescriptor.getInstance().getConfig().setEnableMinMaxLSM(true); + TSFileDescriptor.getInstance().getConfig().setTimeEncoder("PLAIN"); originalCompactionStrategy = config.getCompactionStrategy(); config.setCompactionStrategy(CompactionStrategy.NO_COMPACTION); @@ -84,6 +86,7 @@ public class MyTest8 { @After public void tearDown() throws Exception { + TSFileDescriptor.getInstance().getConfig().setEnableMinMaxLSM(false); EnvironmentUtils.cleanEnv(); config.setCompactionStrategy(originalCompactionStrategy); config.setEnableCPV(originalEnableCPV); diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/ChunkSuit4CPV.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/ChunkSuit4CPV.java index ccab972652f..9be0b03a15d 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/ChunkSuit4CPV.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/ChunkSuit4CPV.java @@ -45,6 +45,7 @@ public class ChunkSuit4CPV { public Statistics statistics; // dynamically updated, includes FP/LP/BP/TP info // [startPos,endPos] definitely for curStartTime interval, thanks to split4CPV + public boolean needsUpdateStartEndPos = false; public int startPos = -1; // the first point position, starting from 0 public int endPos = -1; // the last point position, starting from 0
