This is an automated email from the ASF dual-hosted git repository. leirui pushed a commit to branch research/M4-visualization in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 0228a67be208f04f715f014f3eb70ee06c19e386 Author: Lei Rui <[email protected]> AuthorDate: Sat Jan 28 11:30:02 2023 +0800 add comments --- .../dataset/groupby/LocalGroupByExecutor4CPV.java | 815 +-------------------- .../iotdb/tsfile/read/common/ChunkSuit4CPV.java | 24 +- 2 files changed, 50 insertions(+), 789 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutor4CPV.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutor4CPV.java index 95be8ffc41..6d96897bfe 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutor4CPV.java +++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutor4CPV.java @@ -178,6 +178,11 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor { // for (IPageReader pageReader : pageReaderList) { // assume only one page in a chunk // assume all data on disk, no data in memory + // TODO ATTENTION: YOU HAVE TO ENSURE THAT THERE IS ONLY ONE PAGE IN A CHUNK, + // BECAUSE THE WHOLE IMPLEMENTATION IS BASED ON THIS ASSUMPTION. + // OTHERWISE, PAGEREADER IS FOR THE FIRST PAGE IN THE CHUNK WHILE + // STEPREGRESS IS FOR THE LAST PAGE IN THE CHUNK (THE MERGE OF STEPREGRESS IS ASSIGN DIRECTLY), + // WHICH WILL INTRODUCE BUGS! ((PageReader) pageReaderList.get(0)) .split4CPV( startTime, @@ -221,68 +226,6 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor { return results; } - /** 对BatchData应用deletes操作,获得更新的BatchData和statistics赋值到chunkSuit4CPV中 */ - // private void updateBatchData(ChunkSuit4CPV chunkSuit4CPV, TSDataType dataType) { - // if (chunkSuit4CPV.getBatchData() != null) { - // BatchData batchData1 = BatchDataFactory.createBatchData(dataType, true, false); - // Statistics statistics = null; - // switch (tsDataType) { - // case INT32: - // statistics = new IntegerStatistics(); - // break; - // case INT64: - // statistics = new LongStatistics(); - // break; - // case FLOAT: - // statistics = new FloatStatistics(); - // break; - // case DOUBLE: - // statistics = new DoubleStatistics(); - // break; - // default: - // break; - // } - // BatchDataIterator batchDataIterator = chunkSuit4CPV.getBatchData().getBatchDataIterator(); - // while (batchDataIterator.hasNextTimeValuePair()) { - // TimeValuePair timeValuePair = batchDataIterator.nextTimeValuePair(); - // long timestamp = timeValuePair.getTimestamp(); - // TsPrimitiveType value = timeValuePair.getValue(); - // boolean isDeletedItself = false; - // if (chunkSuit4CPV.getChunkMetadata().getDeleteIntervalList() != null) { - // for (TimeRange timeRange : chunkSuit4CPV.getChunkMetadata().getDeleteIntervalList()) { - // if (timeRange.contains(timestamp)) { - // isDeletedItself = true; - // break; - // } - // } - // } - // if (!isDeletedItself) { - // switch (dataType) { - // case INT32: - // batchData1.putInt(timestamp, value.getInt()); - // statistics.update(timestamp, value.getInt()); - // break; - // case INT64: - // batchData1.putLong(timestamp, value.getLong()); - // statistics.update(timestamp, value.getLong()); - // break; - // case FLOAT: - // batchData1.putFloat(timestamp, value.getFloat()); - // statistics.update(timestamp, value.getFloat()); - // break; - // case DOUBLE: - // batchData1.putDouble(timestamp, value.getDouble()); - // statistics.update(timestamp, value.getDouble()); - // break; - // default: - // throw new UnSupportedDataTypeException(String.valueOf(dataType)); - // } - // } - // } - // chunkSuit4CPV.setBatchData(batchData1); - // chunkSuit4CPV.getChunkMetadata().setStatistics(statistics); - // } - // } private void calculateBottomPoint( List<ChunkSuit4CPV> currentChunkList, long startTime, @@ -340,6 +283,11 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor { List<IPageReader> pageReaderList = FileLoaderUtils.loadPageReaderList( chunkSuit4CPV.getChunkMetadata(), this.timeFilter); + // TODO ATTENTION: YOU HAVE TO ENSURE THAT THERE IS ONLY ONE PAGE IN A CHUNK, + // BECAUSE THE WHOLE IMPLEMENTATION IS BASED ON THIS ASSUMPTION. + // OTHERWISE, PAGEREADER IS FOR THE FIRST PAGE IN THE CHUNK WHILE + // STEPREGRESS IS FOR THE LAST PAGE IN THE CHUNK (THE MERGE OF STEPREGRESS IS ASSIGN DIRECTLY), + // WHICH WILL INTRODUCE BUGS! chunkSuit4CPV.setPageReader((PageReader) pageReaderList.get(0)); } else { // TODO 注意delete intervals的传递:主要是被重写点作为点删除传递 @@ -374,15 +322,6 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor { if (candidateTimestamp < curStartTime || candidateTimestamp >= curStartTime + interval) { isDeletedItself = true; } - // else if (candidate.getChunkMetadata().getDeleteIntervalList() != null) { - // for (TimeRange timeRange : candidate.getChunkMetadata().getDeleteIntervalList()) - // { - // if (timeRange.contains(candidateTimestamp)) { - // isDeletedItself = true; - // break; - // } // TODO add break early - // } - // } else { isDeletedItself = PageReader.isDeleted( @@ -437,6 +376,11 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor { List<IPageReader> pageReaderList = FileLoaderUtils.loadPageReaderList( chunkSuit4CPV.getChunkMetadata(), this.timeFilter); + // TODO ATTENTION: YOU HAVE TO ENSURE THAT THERE IS ONLY ONE PAGE IN A CHUNK, + // BECAUSE THE WHOLE IMPLEMENTATION IS BASED ON THIS ASSUMPTION. + // OTHERWISE, PAGEREADER IS FOR THE FIRST PAGE IN THE CHUNK WHILE + // STEPREGRESS IS FOR THE LAST PAGE IN THE CHUNK (THE MERGE OF STEPREGRESS IS ASSIGN DIRECTLY), + // WHICH WILL INTRODUCE BUGS! chunkSuit4CPV.setPageReader((PageReader) pageReaderList.get(0)); } isUpdate = chunkSuit4CPV.checkIfExist(candidateTimestamp); @@ -539,7 +483,11 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor { List<IPageReader> pageReaderList = FileLoaderUtils.loadPageReaderList( chunkSuit4CPV.getChunkMetadata(), this.timeFilter); - // we assume and guarantee only one page in a chunk + // TODO ATTENTION: YOU HAVE TO ENSURE THAT THERE IS ONLY ONE PAGE IN A CHUNK, + // BECAUSE THE WHOLE IMPLEMENTATION IS BASED ON THIS ASSUMPTION. + // OTHERWISE, PAGEREADER IS FOR THE FIRST PAGE IN THE CHUNK WHILE + // STEPREGRESS IS FOR THE LAST PAGE IN THE CHUNK (THE MERGE OF STEPREGRESS IS ASSIGN DIRECTLY), + // WHICH WILL INTRODUCE BUGS! chunkSuit4CPV.setPageReader((PageReader) pageReaderList.get(0)); } else { // TODO 注意delete intervals的传递:主要是被重写点作为点删除传递 @@ -632,6 +580,11 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor { List<IPageReader> pageReaderList = FileLoaderUtils.loadPageReaderList( chunkSuit4CPV.getChunkMetadata(), this.timeFilter); + // TODO ATTENTION: YOU HAVE TO ENSURE THAT THERE IS ONLY ONE PAGE IN A CHUNK, + // BECAUSE THE WHOLE IMPLEMENTATION IS BASED ON THIS ASSUMPTION. + // OTHERWISE, PAGEREADER IS FOR THE FIRST PAGE IN THE CHUNK WHILE + // STEPREGRESS IS FOR THE LAST PAGE IN THE CHUNK (THE MERGE OF STEPREGRESS IS ASSIGN DIRECTLY), + // WHICH WILL INTRODUCE BUGS! chunkSuit4CPV.setPageReader((PageReader) pageReaderList.get(0)); } isUpdate = chunkSuit4CPV.checkIfExist(candidateTimestamp); @@ -713,6 +666,11 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor { List<IPageReader> pageReaderList = FileLoaderUtils.loadPageReaderList( susp_candidate.getChunkMetadata(), this.timeFilter); + // TODO ATTENTION: YOU HAVE TO ENSURE THAT THERE IS ONLY ONE PAGE IN A CHUNK, + // BECAUSE THE WHOLE IMPLEMENTATION IS BASED ON THIS ASSUMPTION. + // OTHERWISE, PAGEREADER IS FOR THE FIRST PAGE IN THE CHUNK WHILE + // STEPREGRESS IS FOR THE LAST PAGE IN THE CHUNK (THE MERGE OF STEPREGRESS IS ASSIGN DIRECTLY), + // WHICH WILL INTRODUCE BUGS! susp_candidate.setPageReader((PageReader) pageReaderList.get(0)); } // TODO update FP equal to or after statistics.getEndTime @@ -811,24 +769,21 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor { ChunkSuit4CPV susp_candidate = currentChunkList.get(0); if (susp_candidate.isLazyLoad()) { // 如果是lazy // load,则此时load、应用deletes、更新batchData和statistics,取消lazyLoad标记,然后回到循环1 - // currentChunkList.remove(susp_candidate); // TODO check this - // List<IPageReader> pageReaderList = FileLoaderUtils.loadPageReaderList( - // susp_candidate.getChunkMetadata(), this.timeFilter); - // for (IPageReader pageReader : pageReaderList) { // assume only one page in a chunk - // ((PageReader) pageReader).split4CPV(startTime, endTime, interval, curStartTime, - // currentChunkList, null, - // susp_candidate.getChunkMetadata()); // 新增的ChunkSuit4CPV默认isLazyLoad=false - // } if (susp_candidate.getPageReader() == null) { List<IPageReader> pageReaderList = FileLoaderUtils.loadPageReaderList( susp_candidate.getChunkMetadata(), this.timeFilter); + // TODO ATTENTION: YOU HAVE TO ENSURE THAT THERE IS ONLY ONE PAGE IN A CHUNK, + // BECAUSE THE WHOLE IMPLEMENTATION IS BASED ON THIS ASSUMPTION. + // OTHERWISE, PAGEREADER IS FOR THE FIRST PAGE IN THE CHUNK WHILE + // STEPREGRESS IS FOR THE LAST PAGE IN THE CHUNK (THE MERGE OF STEPREGRESS IS ASSIGN DIRECTLY), + // WHICH WILL INTRODUCE BUGS! susp_candidate.setPageReader((PageReader) pageReaderList.get(0)); } // TODO update FP equal to or after statistics.getEndTime susp_candidate.updateLPwithTheClosetPointEqualOrBefore( susp_candidate.getStatistics().getEndTime()); // TODO DEBUG - susp_candidate.setLazyLoad(false); // DO NOT FORGET THIS!!! + susp_candidate.setLazyLoad(false); // TODO DO NOT FORGET THIS!!! continue; // 回到循环1 } else { // 如果不是lazy load,则该疑似candidate就是真正的candidate。 // 于是verification判断该点是否被更高优先级(更高优先级这一点在QueryUtils.modifyChunkMetaData(chunkMetadataList, @@ -841,18 +796,6 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor { long deleteStartTime = Long.MAX_VALUE; // TODO check List<TimeRange> deleteIntervalList = susp_candidate.getChunkMetadata().getDeleteIntervalList(); - // if (susp_candidate.getChunkMetadata().getDeleteIntervalList() != null) { - // for (TimeRange timeRange : - // susp_candidate.getChunkMetadata().getDeleteIntervalList()) { - // if (timeRange.contains(candidateTimestamp)) { - // isDeletedItself = true; - // deleteStartTime = Math.min(deleteStartTime, - // timeRange.getMin()); // - // deleteStartTime不会小于chunkStartTime,因为否则的话这个chunk就会modifyChunkMetaData步骤里被处理掉整个删掉 - // // TODO check - // } - // } - // } if (deleteIntervalList != null) { int deleteCursor = 0; while (deleteCursor < deleteIntervalList.size()) { @@ -898,692 +841,6 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor { } } - // /** - // * @param curStartTime closed - // * @param curEndTime open - // * @param startTime closed - // * @param endTime open - // */ - // public List<AggregateResult> calcResult_deprecated( - // long curStartTime, long curEndTime, long startTime, long endTime, long interval) - // throws IOException, QueryProcessException { - // // System.out.println("====DEBUG====: calcResult for [" + curStartTime + "," + curEndTime - // + - // // ")"); - // - // // clear result cache - // for (AggregateResult result : results) { - // result.reset(); - // } - // // empty currentChunkList - // currentChunkList = new ArrayList<>(); - // - // // System.out.println("====DEBUG====: deal with futureChunkList"); - // - // ListIterator itr = futureChunkList.listIterator(); - // List<ChunkSuit4CPV> tmpFutureChunkList = new ArrayList<>(); - // while (itr.hasNext()) { - // ChunkSuit4CPV chunkSuit4CPV = (ChunkSuit4CPV) (itr.next()); - // ChunkMetadata chunkMetadata = chunkSuit4CPV.getChunkMetadata(); - // long chunkMinTime = chunkMetadata.getStartTime(); - // long chunkMaxTime = chunkMetadata.getEndTime(); - // if (chunkMinTime >= curEndTime && chunkMinTime < endTime) { - // // the chunk falls on the right side of the current M4 interval Ii - // continue; - // } else if (chunkMaxTime < curStartTime || chunkMinTime >= endTime) { - // // the chunk falls on the left side of the current M4 interval Ii - // // or the chunk falls on the right side of the total query range - // itr.remove(); - // } else if (chunkMinTime >= curStartTime && chunkMaxTime < curEndTime) { - // // the chunk falls completely within the current M4 interval Ii - // currentChunkList.add(chunkSuit4CPV); - // itr.remove(); - // } else { - // // the chunk partially overlaps in time with the current M4 interval Ii. - // // load this chunk, split it on deletes and all w intervals. - // // add to currentChunkList and futureChunkList. - // itr.remove(); - // List<IPageReader> pageReaderList = - // FileLoaderUtils.loadPageReaderList(chunkSuit4CPV.getChunkMetadata(), - // this.timeFilter); - // for (IPageReader pageReader : pageReaderList) { - // // assume only one page in a chunk - // // assume all data on disk, no data in memory - // ((PageReader) pageReader) - // .split4CPV( - // startTime, - // endTime, - // interval, - // curStartTime, - // currentChunkList, - // tmpFutureChunkList, - // chunkMetadata); - // } - // - // // System.out.println( - // // "====DEBUG====: load the chunk because overlaps the M4 interval. Version=" - // // + chunkMetadata.getVersion() - // // + " " - // // + chunkMetadata.getOffsetOfChunkHeader()); - // } - // } - // futureChunkList.addAll(tmpFutureChunkList); - // tmpFutureChunkList = null; - // itr = null; - // - // // System.out.println("====DEBUG====: deal with currentChunkList"); - // - // if (currentChunkList.size() == 0) { - // return results; - // } - // - // boolean[] isFinal = new boolean[4]; // default false - // do { - // long[] timestamps = new long[4]; // firstTime, lastTime, bottomTime, topTime - // Object[] values = new Object[4]; // firstValue, lastValue, bottomValue, topValue - // PriorityMergeReader.MergeReaderPriority[] versions = - // new PriorityMergeReader.MergeReaderPriority[4]; - // int[] listIdx = new int[4]; - // timestamps[0] = -1; - // timestamps[1] = -1; - // values[2] = null; - // values[3] = null; - // - // // find candidate points - // // System.out.println("====DEBUG====: find candidate points"); - // // TODO: may change the loop of generating candidate points? sort first?? - // - // for (int j = 0; j < currentChunkList.size(); j++) { - // ChunkMetadata chunkMetadata = currentChunkList.get(j).getChunkMetadata(); - // Statistics statistics = chunkMetadata.getStatistics(); - // MergeReaderPriority version = - // new MergeReaderPriority( - // chunkMetadata.getVersion(), chunkMetadata.getOffsetOfChunkHeader()); - // // update firstPoint - // if (!isFinal[0]) { - // if (timestamps[0] == -1 - // || (statistics.getStartTime() < timestamps[0]) - // || (statistics.getStartTime() == timestamps[0] - // && version.compareTo(versions[0]) > 0)) { - // timestamps[0] = statistics.getStartTime(); - // values[0] = statistics.getFirstValue(); - // versions[0] = version; - // listIdx[0] = j; - // } - // } - // // update lastPoint - // if (!isFinal[1]) { - // if (timestamps[1] == -1 - // || (statistics.getEndTime() > timestamps[1]) - // || (statistics.getEndTime() == timestamps[1] && version.compareTo(versions[1]) > - // 0)) { - // timestamps[1] = statistics.getEndTime(); - // values[1] = statistics.getLastValue(); - // versions[1] = version; - // listIdx[1] = j; - // } - // } - // // update bottomPoint - // if (!isFinal[2]) { - // if (values[2] == null - // || (((Comparable) (values[2])).compareTo(statistics.getMinValue()) > 0)) { - // timestamps[2] = statistics.getBottomTimestamp(); - // values[2] = statistics.getMinValue(); - // versions[2] = version; - // listIdx[2] = j; - // } - // } - // // update topPoint - // if (!isFinal[3]) { - // if (values[3] == null - // || (((Comparable) (values[3])).compareTo(statistics.getMaxValue()) < 0)) { - // timestamps[3] = statistics.getTopTimestamp(); - // values[3] = statistics.getMaxValue(); - // versions[3] = version; - // listIdx[3] = j; - // } - // } - // } - // - // // System.out.println("====DEBUG====: verify candidate points"); - // - // // verify candidate points. - // // firstPoint and lastPoint are valid for sure. - // // default results sequence: min_time(%s), max_time(%s), first_value(%s), last_value(%s), - // // min_value(%s), max_value(%s) - // if (!isFinal[0]) { // firstPoint - // long firstTimestamp = timestamps[0]; - // ChunkMetadata firstChunkMetadata = currentChunkList.get(listIdx[0]).getChunkMetadata(); - // // check if the point is deleted: - // List<TimeRange> firstDeleteIntervalList = firstChunkMetadata.getDeleteIntervalList(); - // boolean isDeletedItself = false; - // if (firstDeleteIntervalList != null) { - // for (TimeRange timeRange : firstDeleteIntervalList) { - // if (timeRange.contains(firstTimestamp)) { - // isDeletedItself = true; - // break; - // } - // } - // } - // if (isDeletedItself) { - // // System.out.println( - // // "====DEBUG====: load the chunk because candidate firstPoint is - // actually - // // deleted. Version=" - // // + firstChunkMetadata.getVersion() - // // + " " - // // + firstChunkMetadata.getOffsetOfChunkHeader()); - // - // currentChunkList.remove(listIdx[0]); - // List<IPageReader> pageReaderList = - // FileLoaderUtils.loadPageReaderList(firstChunkMetadata, this.timeFilter); - // for (IPageReader pageReader : pageReaderList) { // assume only one page in a chunk - // ((PageReader) pageReader) - // .split4CPV( - // startTime, - // endTime, - // interval, - // curStartTime, - // currentChunkList, - // null, - // firstChunkMetadata); - // } - // continue; // next iteration to check currentChunkList - // } else { - // results - // .get(0) - // .updateResultUsingValues( - // Arrays.copyOfRange(timestamps, 0, 1), - // 1, - // Arrays.copyOfRange(values, 0, 1)); // min_time - // results - // .get(2) - // .updateResultUsingValues( - // Arrays.copyOfRange(timestamps, 0, 1), - // 1, - // Arrays.copyOfRange(values, 0, 1)); // first_value - // isFinal[0] = true; - // // System.out.println("====DEBUG====: find firstPoint"); - // } - // } - // if (!isFinal[1]) { // lastPoint - // long lastTimestamp = timestamps[1]; - // ChunkMetadata lastChunkMetadata = currentChunkList.get(listIdx[1]).getChunkMetadata(); - // // check if the point is deleted: - // List<TimeRange> lastDeleteIntervalList = lastChunkMetadata.getDeleteIntervalList(); - // boolean isDeletedItself = false; - // if (lastDeleteIntervalList != null) { - // for (TimeRange timeRange : lastDeleteIntervalList) { - // if (timeRange.contains(lastTimestamp)) { - // isDeletedItself = true; - // break; - // } - // } - // } - // if (isDeletedItself) { - // // System.out.println( - // // "====DEBUG====: load the chunk because candidate lastPoint is actually - // // deleted. Version=" - // // + lastChunkMetadata.getVersion() - // // + " " - // // + lastChunkMetadata.getOffsetOfChunkHeader()); - // - // currentChunkList.remove(listIdx[1]); - // List<IPageReader> pageReaderList = - // FileLoaderUtils.loadPageReaderList(lastChunkMetadata, this.timeFilter); - // for (IPageReader pageReader : pageReaderList) { // assume only one page in a chunk - // ((PageReader) pageReader) - // .split4CPV( - // startTime, - // endTime, - // interval, - // curStartTime, - // currentChunkList, - // null, - // lastChunkMetadata); - // } - // continue; // next iteration to check currentChunkList - // } else { - // results - // .get(1) - // .updateResultUsingValues( - // Arrays.copyOfRange(timestamps, 1, 2), - // 1, - // Arrays.copyOfRange(values, 1, 2)); // min_time - // results - // .get(3) - // .updateResultUsingValues( - // Arrays.copyOfRange(timestamps, 1, 2), - // 1, - // Arrays.copyOfRange(values, 1, 2)); // first_value - // isFinal[1] = true; - // - // // System.out.println("====DEBUG====: find lastPoint"); - // } - // } - // if (!isFinal[2]) { // bottomPoint - // long bottomTimestamp = timestamps[2]; - // ChunkMetadata bottomChunkMetadata = currentChunkList.get(listIdx[2]).getChunkMetadata(); - // List<Long> mergedVersionList = currentChunkList.get(listIdx[2]).getMergeVersionList(); - // List<Long> mergedOffsetList = currentChunkList.get(listIdx[2]).getMergeOffsetList(); - // // check if the point is deleted: - // List<TimeRange> bottomDeleteIntervalList = bottomChunkMetadata.getDeleteIntervalList(); - // boolean isDeletedItself = false; - // if (bottomDeleteIntervalList != null) { - // for (TimeRange timeRange : bottomDeleteIntervalList) { - // if (timeRange.contains(bottomTimestamp)) { - // isDeletedItself = true; - // break; - // } - // } - // } - // if (isDeletedItself) { - // // System.out.println( - // // "====DEBUG====: load the chunk because candidate bottomPoint is - // actually - // // deleted. Version=" - // // + bottomChunkMetadata.getVersion() - // // + " " - // // + bottomChunkMetadata.getOffsetOfChunkHeader()); - // - // currentChunkList.remove(listIdx[2]); - // List<IPageReader> pageReaderList = - // FileLoaderUtils.loadPageReaderList(bottomChunkMetadata, this.timeFilter); - // for (IPageReader pageReader : pageReaderList) { // assume only one page in a chunk - // ((PageReader) pageReader) - // .split4CPV( - // startTime, - // endTime, - // interval, - // curStartTime, - // currentChunkList, - // null, - // bottomChunkMetadata); - // } - // continue; // next iteration to check currentChunkList - // } else { // verify if it is overlapped by other chunks with larger version number and - // not in - // // the deleted time interval - // List<Integer> toMerge = new ArrayList<>(); - // for (int i = 0; i < currentChunkList.size(); i++) { - // ChunkMetadata chunkMetadata = currentChunkList.get(i).getChunkMetadata(); - // MergeReaderPriority version = - // new MergeReaderPriority( - // chunkMetadata.getVersion(), chunkMetadata.getOffsetOfChunkHeader()); - // if (version.compareTo(versions[2]) <= 0) { // including bottomChunkMetadata - // continue; - // } - // if (bottomTimestamp < chunkMetadata.getStartTime() - // || bottomTimestamp > chunkMetadata.getEndTime()) { - // continue; - // } - // boolean isMerged = false; - // for (int k = 0; k < mergedVersionList.size(); k++) { - // // these chunks are MARKED "merged" - not overlapped any more - // if (mergedVersionList.get(k) == chunkMetadata.getVersion() - // && mergedOffsetList.get(k) == chunkMetadata.getOffsetOfChunkHeader()) { - // isMerged = true; - // break; - // } - // } - // if (isMerged) { - // continue; - // } - // toMerge.add(i); - // } - // if (toMerge.isEmpty()) { - // // System.out.println("====DEBUG====: find bottomPoint"); - // - // results - // .get(4) - // .updateResultUsingValues( - // Arrays.copyOfRange(timestamps, 2, 3), - // 1, - // Arrays.copyOfRange(values, 2, 3)); // min_value - // isFinal[2] = true; - // } else { - // // deal with toMerge chunks: delete updated points - // toMerge.add(listIdx[2]); - // List<Long> newMergedVersionList = new ArrayList<>(); - // List<Long> newMergedOffsetList = new ArrayList<>(); - // for (int m : toMerge) { // to MARK these chunks are "merged" - not overlapped any - // more - // ChunkMetadata tmpChunkMetadata = currentChunkList.get(m).getChunkMetadata(); - // newMergedVersionList.add(tmpChunkMetadata.getVersion()); - // newMergedOffsetList.add(tmpChunkMetadata.getOffsetOfChunkHeader()); - // } - // Map<MergeReaderPriority, BatchData> updateBatchDataMap = new HashMap<>(); - // Map<MergeReaderPriority, Statistics> statisticsMap = new HashMap<>(); - // for (int o = 0; o < toMerge.size(); o++) { - // // create empty batchData - // ChunkSuit4CPV chunkSuit4CPV = currentChunkList.get(toMerge.get(o)); - // ChunkMetadata chunkMetadata = chunkSuit4CPV.getChunkMetadata(); - // MergeReaderPriority mergeReaderPriority = - // new MergeReaderPriority( - // chunkMetadata.getVersion(), chunkMetadata.getOffsetOfChunkHeader()); - // BatchData batch1 = BatchDataFactory.createBatchData(tsDataType, true, false); - // updateBatchDataMap.put(mergeReaderPriority, batch1); - // // create empty statistics - // Statistics statistics = null; - // switch (tsDataType) { - // case INT32: - // statistics = new IntegerStatistics(); - // break; - // case INT64: - // statistics = new LongStatistics(); - // break; - // case FLOAT: - // statistics = new FloatStatistics(); - // break; - // case DOUBLE: - // statistics = new DoubleStatistics(); - // break; - // default: - // break; - // } - // statisticsMap.put(mergeReaderPriority, statistics); - // // prepare mergeReader - // if (chunkSuit4CPV.getBatchData() == null) { - // List<IPageReader> pageReaderList = - // FileLoaderUtils.loadPageReaderList(chunkMetadata, this.timeFilter); - // List<ChunkSuit4CPV> tmpCurrentChunkList = new ArrayList<>(); - // for (IPageReader pageReader : pageReaderList) { // assume only one page in a - // chunk - // ((PageReader) pageReader) - // .split4CPV( - // startTime, - // endTime, - // interval, - // curStartTime, - // tmpCurrentChunkList, - // null, - // chunkMetadata); - // } - // currentChunkList.set(toMerge.get(o), tmpCurrentChunkList.get(0)); - // chunkSuit4CPV = currentChunkList.get(toMerge.get(o)); - // - // // System.out.println( - // // "====DEBUG====: load chunk for update merge. Version=" - // // + chunkMetadata.getVersion() - // // + " " - // // + chunkMetadata.getOffsetOfChunkHeader()); - // } - // mergeReader.addReader( - // chunkSuit4CPV.getBatchData().getBatchDataIterator(), - // new MergeReaderPriority(chunkSuit4CPV.getVersion(), - // chunkSuit4CPV.getOffset())); - // } - // while (mergeReader.hasNextTimeValuePair()) { - // Pair<TimeValuePair, MergeReaderPriority> res = mergeReader.nextElement(); - // TimeValuePair ret = res.left; - // // System.out.println( - // // "====DEBUG====: merge for bottomPoint. (t,v)=" - // // + ret.getTimestamp() - // // + "," - // // + ret.getValue().getValue()); - // updateBatchDataMap - // .get(res.right) - // .putAnObject(ret.getTimestamp(), ret.getValue().getValue()); - // switch (tsDataType) { - // case INT32: - // statisticsMap - // .get(res.right) - // .update(ret.getTimestamp(), (int) ret.getValue().getValue()); - // break; - // case INT64: - // statisticsMap - // .get(res.right) - // .update(ret.getTimestamp(), (long) ret.getValue().getValue()); - // break; - // case FLOAT: - // statisticsMap - // .get(res.right) - // .update(ret.getTimestamp(), (float) ret.getValue().getValue()); - // break; - // case DOUBLE: - // statisticsMap - // .get(res.right) - // .update(ret.getTimestamp(), (double) ret.getValue().getValue()); - // break; - // default: - // throw new UnSupportedDataTypeException(String.valueOf(tsDataType)); - // } - // } - // mergeReader.close(); - // - // for (int o = 0; o < toMerge.size(); o++) { - // ChunkSuit4CPV chunkSuit4CPV = currentChunkList.get(toMerge.get(o)); - // // to MARK these chunks are "merged" - not overlapped any more - // chunkSuit4CPV.getMergeVersionList().addAll(newMergedVersionList); - // chunkSuit4CPV.getMergeOffsetList().addAll(newMergedOffsetList); - // // update BatchData - // MergeReaderPriority mergeReaderPriority = - // new MergeReaderPriority(chunkSuit4CPV.getVersion(), - // chunkSuit4CPV.getOffset()); - // chunkSuit4CPV.setBatchData(updateBatchDataMap.get(mergeReaderPriority)); - // chunkSuit4CPV - // .getChunkMetadata() - // .setStatistics(statisticsMap.get(mergeReaderPriority)); - // } - // // System.out.println( - // // "====DEBUG====: merged chunks are : version=" - // // + newMergedVersionList - // // + " offsets=" - // // + newMergedOffsetList); - // continue; - // } - // } - // } - // - // if (!isFinal[3]) { // topPoint - // long topTimestamp = timestamps[3]; - // ChunkMetadata topChunkMetadata = currentChunkList.get(listIdx[3]).getChunkMetadata(); - // List<Long> mergedVersionList = currentChunkList.get(listIdx[3]).getMergeVersionList(); - // List<Long> mergedOffsetList = currentChunkList.get(listIdx[3]).getMergeOffsetList(); - // // check if the point is deleted: - // List<TimeRange> topDeleteIntervalList = topChunkMetadata.getDeleteIntervalList(); - // boolean isDeletedItself = false; - // if (topDeleteIntervalList != null) { - // for (TimeRange timeRange : topDeleteIntervalList) { - // if (timeRange.contains(topTimestamp)) { - // isDeletedItself = true; - // break; - // } - // } - // } - // if (isDeletedItself) { - // // System.out.println( - // // "====DEBUG====: load the chunk because candidate topPoint is actually - // // deleted. Version=" - // // + topChunkMetadata.getVersion() - // // + " " - // // + topChunkMetadata.getOffsetOfChunkHeader()); - // - // currentChunkList.remove(listIdx[3]); - // List<IPageReader> pageReaderList = - // FileLoaderUtils.loadPageReaderList(topChunkMetadata, this.timeFilter); - // for (IPageReader pageReader : pageReaderList) { // assume only one page in a chunk - // ((PageReader) pageReader) - // .split4CPV( - // startTime, - // endTime, - // interval, - // curStartTime, - // currentChunkList, - // null, - // topChunkMetadata); - // } - // continue; // next iteration to check currentChunkList - // } else { // verify if it is overlapped by other chunks with larger version number and - // not in - // // the deleted time interval - // List<Integer> toMerge = new ArrayList<>(); - // for (int i = 0; i < currentChunkList.size(); i++) { - // ChunkMetadata chunkMetadata = currentChunkList.get(i).getChunkMetadata(); - // MergeReaderPriority version = - // new MergeReaderPriority( - // chunkMetadata.getVersion(), chunkMetadata.getOffsetOfChunkHeader()); - // if (version.compareTo(versions[3]) <= 0) { // including topChunkMetadata - // continue; - // } - // if (topTimestamp < chunkMetadata.getStartTime() - // || topTimestamp > chunkMetadata.getEndTime()) { - // continue; - // } - // boolean isMerged = false; - // for (int k = 0; k < mergedVersionList.size(); k++) { - // if (mergedVersionList.get(k) == chunkMetadata.getVersion() - // && mergedOffsetList.get(k) == chunkMetadata.getOffsetOfChunkHeader()) { - // isMerged = true; - // break; - // } - // } - // if (isMerged) { - // continue; - // } - // toMerge.add(i); - // } - // if (toMerge.isEmpty()) { - // results - // .get(5) - // .updateResultUsingValues( - // Arrays.copyOfRange(timestamps, 3, 4), - // 1, - // Arrays.copyOfRange(values, 3, 4)); // max_value - // isFinal[3] = true; - // // System.out.println("====DEBUG====: find topPoint"); - // return results; - // } else { - // // deal with toMerge chunks: delete updated points - // toMerge.add(listIdx[3]); - // List<Long> newMergedVersionList = new ArrayList<>(); - // List<Long> newMergedOffsetList = new ArrayList<>(); - // for (int m : toMerge) { // to MARK these chunks are "merged" - not overlapped any - // more - // ChunkMetadata tmpChunkMetadata = currentChunkList.get(m).getChunkMetadata(); - // newMergedVersionList.add(tmpChunkMetadata.getVersion()); - // newMergedOffsetList.add(tmpChunkMetadata.getOffsetOfChunkHeader()); - // } - // Map<MergeReaderPriority, BatchData> updateBatchDataMap = new HashMap<>(); - // Map<MergeReaderPriority, Statistics> statisticsMap = new HashMap<>(); - // for (int o = 0; o < toMerge.size(); o++) { - // // create empty batchData - // ChunkSuit4CPV chunkSuit4CPV = currentChunkList.get(toMerge.get(o)); - // ChunkMetadata chunkMetadata = chunkSuit4CPV.getChunkMetadata(); - // MergeReaderPriority mergeReaderPriority = - // new MergeReaderPriority( - // chunkMetadata.getVersion(), chunkMetadata.getOffsetOfChunkHeader()); - // BatchData batch1 = BatchDataFactory.createBatchData(tsDataType, true, false); - // updateBatchDataMap.put(mergeReaderPriority, batch1); - // // create empty statistics - // Statistics statistics = null; - // switch (tsDataType) { - // case INT32: - // statistics = new IntegerStatistics(); - // break; - // case INT64: - // statistics = new LongStatistics(); - // break; - // case FLOAT: - // statistics = new FloatStatistics(); - // break; - // case DOUBLE: - // statistics = new DoubleStatistics(); - // break; - // default: - // break; - // } - // statisticsMap.put(mergeReaderPriority, statistics); - // // prepare mergeReader - // if (chunkSuit4CPV.getBatchData() == null) { - // List<IPageReader> pageReaderList = - // FileLoaderUtils.loadPageReaderList(chunkMetadata, this.timeFilter); - // List<ChunkSuit4CPV> tmpCurrentChunkList = new ArrayList<>(); - // for (IPageReader pageReader : pageReaderList) { // assume only one page in a - // chunk - // ((PageReader) pageReader) - // .split4CPV( - // startTime, - // endTime, - // interval, - // curStartTime, - // tmpCurrentChunkList, - // null, - // chunkMetadata); - // } - // currentChunkList.set(toMerge.get(o), tmpCurrentChunkList.get(0)); - // chunkSuit4CPV = currentChunkList.get(toMerge.get(o)); - // - // // System.out.println( - // // "====DEBUG====: load chunk for update merge. Version=" - // // + chunkMetadata.getVersion() - // // + " " - // // + chunkMetadata.getOffsetOfChunkHeader()); - // } - // mergeReader.addReader( - // chunkSuit4CPV.getBatchData().getBatchDataIterator(), - // new MergeReaderPriority(chunkSuit4CPV.getVersion(), - // chunkSuit4CPV.getOffset())); - // } - // while (mergeReader.hasNextTimeValuePair()) { - // Pair<TimeValuePair, MergeReaderPriority> res = mergeReader.nextElement(); - // TimeValuePair ret = res.left; - // // System.out.println( - // // "====DEBUG====: merge for topPoint. (t,v)=" - // // + ret.getTimestamp() - // // + "," - // // + ret.getValue().getValue()); - // updateBatchDataMap - // .get(res.right) - // .putAnObject(ret.getTimestamp(), ret.getValue().getValue()); - // switch (tsDataType) { - // case INT32: - // statisticsMap - // .get(res.right) - // .update(ret.getTimestamp(), (int) ret.getValue().getValue()); - // break; - // case INT64: - // statisticsMap - // .get(res.right) - // .update(ret.getTimestamp(), (long) ret.getValue().getValue()); - // break; - // case FLOAT: - // statisticsMap - // .get(res.right) - // .update(ret.getTimestamp(), (float) ret.getValue().getValue()); - // break; - // case DOUBLE: - // statisticsMap - // .get(res.right) - // .update(ret.getTimestamp(), (double) ret.getValue().getValue()); - // break; - // default: - // throw new UnSupportedDataTypeException(String.valueOf(tsDataType)); - // } - // } - // mergeReader.close(); - // - // for (int o = 0; o < toMerge.size(); o++) { - // ChunkSuit4CPV chunkSuit4CPV = currentChunkList.get(toMerge.get(o)); - // // to MARK these chunks are "merged" - not overlapped any more - // chunkSuit4CPV.getMergeVersionList().addAll(newMergedVersionList); - // chunkSuit4CPV.getMergeOffsetList().addAll(newMergedOffsetList); - // // update BatchData - // MergeReaderPriority mergeReaderPriority = - // new MergeReaderPriority(chunkSuit4CPV.getVersion(), - // chunkSuit4CPV.getOffset()); - // chunkSuit4CPV.setBatchData(updateBatchDataMap.get(mergeReaderPriority)); - // chunkSuit4CPV - // .getChunkMetadata() - // .setStatistics(statisticsMap.get(mergeReaderPriority)); - // } - // continue; - // } - // } - // } - // } while (true); - // } - @Override public Pair<Long, Object> peekNextNotNullValue(long nextStartTime, long nextEndTime) throws IOException { diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/ChunkSuit4CPV.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/ChunkSuit4CPV.java index debae62647..cb4a691325 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/ChunkSuit4CPV.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/ChunkSuit4CPV.java @@ -19,6 +19,7 @@ package org.apache.iotdb.tsfile.read.common; +import java.io.IOException; import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.file.metadata.statistics.DoubleStatistics; @@ -30,8 +31,6 @@ import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics; import org.apache.iotdb.tsfile.file.metadata.statistics.StepRegress; import org.apache.iotdb.tsfile.read.reader.page.PageReader; -import java.io.IOException; - public class ChunkSuit4CPV { private ChunkMetadata chunkMetadata; // fixed info, including version, dataType, stepRegress @@ -68,6 +67,11 @@ public class ChunkSuit4CPV { // private BatchData batchData; // deprecated + // TODO ATTENTION: YOU HAVE TO ENSURE THAT THERE IS ONLY ONE PAGE IN A CHUNK, + // BECAUSE THE WHOLE IMPLEMENTATION IS BASED ON THIS ASSUMPTION. + // OTHERWISE, PAGEREADER IS FOR THE FIRST PAGE IN THE CHUNK WHILE + // STEPREGRESS IS FOR THE LAST PAGE IN THE CHUNK (THE MERGE OF STEPREGRESS IS ASSIGN DIRECTLY), + // WHICH WILL INTRODUCE BUGS! private PageReader pageReader; // bears plain timeBuffer and valueBuffer // pageReader does not refer to the same deleteInterval as those in chunkMetadata // after chunkMetadata executes insertIntoSortedDeletions @@ -267,10 +271,10 @@ public class ChunkSuit4CPV { long timestamp = pageReader.timeBuffer.getLong(estimatedPos * 8); statistics.setStartTime(timestamp); switch (chunkMetadata.getDataType()) { - // iotdb的int类型的plain编码用的是自制的不支持random access - // case INT32: - // return new MinMaxInfo(pageReader.valueBuffer.getInt(estimatedPos * 4), - // pageReader.timeBuffer.getLong(estimatedPos * 8)); + // iotdb的int类型的plain编码用的是自制的不支持random access + // case INT32: + // return new MinMaxInfo(pageReader.valueBuffer.getInt(estimatedPos * 4), + // pageReader.timeBuffer.getLong(estimatedPos * 8)); case INT64: long longVal = pageReader.valueBuffer.getLong(pageReader.timeBufferLength + estimatedPos * 8); @@ -328,10 +332,10 @@ public class ChunkSuit4CPV { long timestamp = pageReader.timeBuffer.getLong(estimatedPos * 8); statistics.setEndTime(timestamp); switch (chunkMetadata.getDataType()) { - // iotdb的int类型的plain编码用的是自制的不支持random access - // case INT32: - // return new MinMaxInfo(pageReader.valueBuffer.getInt(estimatedPos * 4), - // pageReader.timeBuffer.getLong(estimatedPos * 8)); + // iotdb的int类型的plain编码用的是自制的不支持random access + // case INT32: + // return new MinMaxInfo(pageReader.valueBuffer.getInt(estimatedPos * 4), + // pageReader.timeBuffer.getLong(estimatedPos * 8)); case INT64: long longVal = pageReader.valueBuffer.getLong(pageReader.timeBufferLength + estimatedPos * 8);
