This is an automated email from the ASF dual-hosted git repository.
leirui pushed a commit to branch research/M4-visualization
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/research/M4-visualization by
this push:
new 90650bce0b new cpv
90650bce0b is described below
commit 90650bce0b33b2eebfdbf486ac4729d2919dd909
Author: Lei Rui <[email protected]>
AuthorDate: Wed Jun 29 15:06:38 2022 +0800
new cpv
---
.../dataset/groupby/LocalGroupByExecutor4CPV.java | 668 ++++++++++++++++++++-
.../apache/iotdb/db/integration/m4/MyTest4.java | 545 +++++++++++++++++
.../iotdb/tsfile/read/common/ChunkSuit4CPV.java | 10 +
.../iotdb/tsfile/read/reader/page/PageReader.java | 65 +-
4 files changed, 1272 insertions(+), 16 deletions(-)
diff --git
a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutor4CPV.java
b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutor4CPV.java
index eb819e5360..ab1b77081d 100644
---
a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutor4CPV.java
+++
b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutor4CPV.java
@@ -45,6 +45,7 @@ import org.apache.iotdb.tsfile.read.common.BatchDataFactory;
import org.apache.iotdb.tsfile.read.common.ChunkSuit4CPV;
import org.apache.iotdb.tsfile.read.common.TimeRange;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+import org.apache.iotdb.tsfile.read.reader.BatchDataIterator;
import org.apache.iotdb.tsfile.read.reader.IPageReader;
import org.apache.iotdb.tsfile.read.reader.page.PageReader;
import org.apache.iotdb.tsfile.utils.Pair;
@@ -52,6 +53,7 @@ import org.apache.iotdb.tsfile.utils.Pair;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.ListIterator;
@@ -113,8 +115,8 @@ public class LocalGroupByExecutor4CPV implements
GroupByExecutor {
ascending);
// unpackAllOverlappedFilesToTimeSeriesMetadata
- // TODO: this might be bad to load all chunk metadata at first
try {
+ // TODO: this might be bad to load all chunk metadata at first
futureChunkList.addAll(seriesReader.getAllChunkMetadatas4CPV());
} catch (IOException e) {
throw new QueryProcessException(e.getMessage());
@@ -134,6 +136,669 @@ public class LocalGroupByExecutor4CPV implements
GroupByExecutor {
*/
@Override
public List<AggregateResult> calcResult(
+ long curStartTime, long curEndTime, long startTime, long endTime, long
interval)
+ throws IOException {
+ // System.out.println("====DEBUG====: calcResult for [" + curStartTime
+ "," + curEndTime +
+ // ")");
+
+ // clear result cache
+ for (AggregateResult result : results) {
+ result.reset();
+ }
+ // empty currentChunkList
+ currentChunkList = new ArrayList<>();
+
+ // System.out.println("====DEBUG====: deal with futureChunkList");
+
+ ListIterator itr = futureChunkList.listIterator();
+ List<ChunkSuit4CPV> tmpFutureChunkList = new ArrayList<>();
+ while (itr.hasNext()) {
+ ChunkSuit4CPV chunkSuit4CPV = (ChunkSuit4CPV) (itr.next());
+ ChunkMetadata chunkMetadata = chunkSuit4CPV.getChunkMetadata();
+ long chunkMinTime = chunkMetadata.getStartTime();
+ long chunkMaxTime = chunkMetadata.getEndTime();
+ if (chunkMinTime >= curEndTime && chunkMinTime < endTime) {
+ // the chunk falls on the right side of the current M4 interval Ii
+ continue;
+ } else if (chunkMaxTime < curStartTime || chunkMinTime >= endTime) {
+ // the chunk falls on the left side of the current M4 interval Ii
+ // or the chunk falls on the right side of the total query range
+ itr.remove();
+ } else if (chunkMinTime >= curStartTime && chunkMaxTime < curEndTime) {
+ // the chunk falls completely within the current M4 interval Ii
+ currentChunkList.add(chunkSuit4CPV);
+ itr.remove();
+ } else {
+ // the chunk partially overlaps in time with the current M4 interval
Ii.
+ // load this chunk, split it on deletes and all w intervals.
+ // add to currentChunkList and futureChunkList.
+ itr.remove();
+ List<IPageReader> pageReaderList =
+
FileLoaderUtils.loadPageReaderList(chunkSuit4CPV.getChunkMetadata(),
this.timeFilter);
+ for (IPageReader pageReader : pageReaderList) {
+ // assume only one page in a chunk
+ // assume all data on disk, no data in memory
+ ((PageReader) pageReader)
+ .split4CPV(
+ startTime,
+ endTime,
+ interval,
+ curStartTime,
+ currentChunkList,
+ tmpFutureChunkList,
+ chunkMetadata);
+ }
+
+ // System.out.println(
+ // "====DEBUG====: load the chunk because overlaps the M4
interval. Version="
+ // + chunkMetadata.getVersion()
+ // + " "
+ // + chunkMetadata.getOffsetOfChunkHeader());
+ }
+ }
+ futureChunkList.addAll(tmpFutureChunkList);
+ tmpFutureChunkList = null;
+ itr = null;
+
+ // System.out.println("====DEBUG====: deal with currentChunkList");
+
+ if (currentChunkList.size() == 0) {
+ return results;
+ }
+
+ calculateBottomPoint(currentChunkList, startTime, endTime, interval,
curStartTime);
+ calculateTopPoint(currentChunkList, startTime, endTime, interval,
curStartTime);
+ calculateFirstPoint(currentChunkList, startTime, endTime, interval,
curStartTime);
+ calculateLastPoint(currentChunkList, startTime, endTime, interval,
curStartTime);
+
+ return results;
+ }
+
+ private void calculateBottomPoint(
+ List<ChunkSuit4CPV> currentChunkList,
+ long startTime,
+ long endTime,
+ long interval,
+ long curStartTime)
+ throws IOException {
+ while (true) { // 循环1
+ // 按照bottomValue排序,找出BP candidate set
+ currentChunkList.sort(
+ new Comparator<ChunkSuit4CPV>() { // TODO double check the sort
order logic for different
+ // aggregations
+ public int compare(ChunkSuit4CPV o1, ChunkSuit4CPV o2) {
+ return ((Comparable)
(o1.getChunkMetadata().getStatistics().getMinValue()))
+
.compareTo(o2.getChunkMetadata().getStatistics().getMinValue());
+ }
+ });
+ Object value =
currentChunkList.get(0).getChunkMetadata().getStatistics().getMinValue();
+ List<ChunkSuit4CPV> candidateSet = new ArrayList<>();
+ for (ChunkSuit4CPV chunkSuit4CPV : currentChunkList) {
+ if
(chunkSuit4CPV.getChunkMetadata().getStatistics().getMinValue().equals(value)) {
+ candidateSet.add(chunkSuit4CPV);
+ } else {
+ break;
+ }
+ }
+
+ List<ChunkSuit4CPV> nonLazyLoad =
+ new ArrayList<>(
+ candidateSet); // TODO check, whether nonLazyLoad remove affects
candidateSet
+ nonLazyLoad.sort(
+ new Comparator<ChunkSuit4CPV>() { // TODO double check the sort
order logic for version
+ public int compare(ChunkSuit4CPV o1, ChunkSuit4CPV o2) {
+ return new MergeReaderPriority(
+ o2.getChunkMetadata().getVersion(),
+ o2.getChunkMetadata().getOffsetOfChunkHeader())
+ .compareTo(
+ new MergeReaderPriority(
+ o1.getChunkMetadata().getVersion(),
+ o1.getChunkMetadata().getOffsetOfChunkHeader()));
+ }
+ });
+ while (true) { // 循环2
+ // 如果set里所有点所在的chunk都是lazy
+ // load,则对所有块进行load,应用deleteIntervals,并把BP删掉(因为不管是被删除删掉还是被更新删掉都是删掉这个点)
+ if (nonLazyLoad.size() == 0) {
+ for (ChunkSuit4CPV chunkSuit4CPV : candidateSet) {
+ currentChunkList.remove(chunkSuit4CPV); // TODO check this
+ List<IPageReader> pageReaderList =
+ FileLoaderUtils.loadPageReaderList(
+ chunkSuit4CPV.getChunkMetadata(), this.timeFilter);
+ for (IPageReader pageReader : pageReaderList) { // assume only one
page in a chunk
+ ((PageReader) pageReader)
+ .split4CPV(
+ startTime,
+ endTime,
+ interval,
+ curStartTime,
+ currentChunkList,
+ null,
+ chunkSuit4CPV.getChunkMetadata());
+ }
+ }
+ break; // 退出循环2,进入循环1
+ }
+ // 否则,找出candidate set里非lazy load里version最高的那个块的BP点作为candidate point
+ ChunkSuit4CPV candidate = nonLazyLoad.get(0); // TODO check sort right
+ MergeReaderPriority candidateVersion =
+ new MergeReaderPriority(
+ candidate.getChunkMetadata().getVersion(),
+ candidate.getChunkMetadata().getOffsetOfChunkHeader());
+ long candidateTimestamp =
+ candidate.getChunkMetadata().getStatistics().getBottomTimestamp();
// TODO check
+ Object candidateValue =
+ candidate.getChunkMetadata().getStatistics().getMinValue(); //
TODO check
+
+ // verify这个candidate point
+ // 是否被删除
+ boolean isDeletedItself = false;
+ if (candidate.getChunkMetadata().getDeleteIntervalList() != null) {
+ for (TimeRange timeRange :
candidate.getChunkMetadata().getDeleteIntervalList()) {
+ if (timeRange.contains(candidateTimestamp)) {
+ isDeletedItself = true;
+ break;
+ }
+ }
+ }
+ if (isDeletedItself) { // 是被删除,则标记candidate point所在块为lazy load,然后回到循环2
+ nonLazyLoad.remove(candidate);
+ // TODO check this can really remove the element
+ // TODO check whether nonLazyLoad remove affects candidateSet
+ // TODO check nonLazyLoad sorted by version number from high to low
+ continue; // 回到循环2
+
+ } else { // 否被删除
+
+ // 找出所有更高版本的overlap它的块
+ List<ChunkSuit4CPV> overlaps = new ArrayList<>();
+ for (ChunkSuit4CPV chunkSuit4CPV : currentChunkList) {
+ ChunkMetadata chunkMetadata = chunkSuit4CPV.getChunkMetadata();
+ MergeReaderPriority version =
+ new MergeReaderPriority(
+ chunkMetadata.getVersion(),
chunkMetadata.getOffsetOfChunkHeader());
+ if (version.compareTo(candidateVersion) <= 0) { // including
bottomChunkMetadata
+ continue;
+ }
+ if (candidateTimestamp < chunkMetadata.getStartTime()
+ || candidateTimestamp > chunkMetadata.getEndTime()) {
+ continue;
+ }
+ overlaps.add(chunkSuit4CPV);
+ }
+
+ if (overlaps.size() == 0) { // 否被overlap,则当前candidate point就是计算结果,结束
+ results
+ .get(4) // TODO check: minTimestamp, maxTimestamp, firstValue,
lastValue,
+ // minValue[bottomTimestamp], maxValue[topTimestamp]
+ .updateResultUsingValues(
+ new long[] {candidateTimestamp}, 1, new Object[]
{candidateValue});
+ // TODO check updateResult
+ return; // 计算结束
+ } else { // 是被overlap,则partial scan所有这些overlap的块
+ boolean isUpdate = false;
+ for (ChunkSuit4CPV chunkSuit4CPV : overlaps) {
+ // scan这个chunk的数据
+ if (chunkSuit4CPV.getBatchData() == null) {
+ List<IPageReader> pageReaderList =
+ FileLoaderUtils.loadPageReaderList(
+ chunkSuit4CPV.getChunkMetadata(), this.timeFilter);
+ List<ChunkSuit4CPV> tmpCurrentChunkList = new ArrayList<>();
+ for (IPageReader pageReader : pageReaderList) { // assume only
one page in a chunk
+ isUpdate =
+ ((PageReader)
pageReader).partialScan(candidateTimestamp); // TODO check
+ }
+ } else {
+ // 对已经加载的batchData进行partial scan,直到点的时间戳大于或等于candidateTimestamp
+ BatchDataIterator batchDataIterator =
+ chunkSuit4CPV.getBatchData().getBatchDataIterator();
+ while (batchDataIterator.hasNextTimeValuePair()) {
+ long timestamp =
batchDataIterator.nextTimeValuePair().getTimestamp();
+ if (timestamp > candidateTimestamp) {
+ break;
+ }
+ if (timestamp == candidateTimestamp) {
+ isUpdate = true;
+ break;
+ }
+ }
+ chunkSuit4CPV
+ .getBatchData()
+ .resetBatchData(); // This step is necessary, because this
BatchData may be
+ // accessed multiple times!
+ }
+ if (isUpdate) { //
提前结束对overlaps块的scan,因为已经找到一个update点证明candidate失效
+ break;
+ }
+ }
+ if (!isUpdate) { // partial scan了所有overlap的块都没有找到这样的点,则当前candidate
point就是计算结果,结束
+ results
+ .get(4) // TODO check: minTimestamp, maxTimestamp,
firstValue, lastValue,
+ // minValue[bottomTimestamp], maxValue[topTimestamp]
+ .updateResultUsingValues(
+ new long[] {candidateTimestamp}, 1, new Object[]
{candidateValue});
+ // TODO check updateResult
+ return; // 计算结束
+ } else { // 找到这样的点,于是标记candidate point所在块为lazy
+ // load,并对其chunkMetadata的deleteInterval里加上对该点时间的删除,然后进入循环2
+ if (candidate.getChunkMetadata().getDeleteIntervalList() ==
null) {
+ List<TimeRange> tmp = new ArrayList<>();
+ tmp.add(new TimeRange(candidateTimestamp, candidateTimestamp));
+ candidate.getChunkMetadata().setDeleteIntervalList(tmp);
+ } else {
+ candidate
+ .getChunkMetadata()
+ .getDeleteIntervalList()
+ .add(new TimeRange(candidateTimestamp,
candidateTimestamp)); // TODO check
+ }
+ // 删除那里不需要再加了,而这里更新就需要手动加一下删除操作
+ nonLazyLoad.remove(candidate);
+ // TODO check this can really remove the element
+ // TODO check whether nonLazyLoad remove affects candidateSet
+ // TODO check nonLazyLoad sorted by version number from high to
low
+ continue; // 回到循环2
+ }
+ }
+ }
+ }
+ }
+ }
+
+ private void calculateTopPoint(
+ List<ChunkSuit4CPV> currentChunkList,
+ long startTime,
+ long endTime,
+ long interval,
+ long curStartTime)
+ throws IOException {
+ while (true) { // 循环1
+ // 按照topValue排序,找出TP candidate set
+ currentChunkList.sort(
+ new Comparator<ChunkSuit4CPV>() { // TODO double check the sort
order logic for different
+ // aggregations
+ public int compare(ChunkSuit4CPV o1, ChunkSuit4CPV o2) {
+ return ((Comparable)
(o2.getChunkMetadata().getStatistics().getMaxValue()))
+
.compareTo(o1.getChunkMetadata().getStatistics().getMaxValue());
+ }
+ });
+ Object value =
currentChunkList.get(0).getChunkMetadata().getStatistics().getMaxValue();
+ List<ChunkSuit4CPV> candidateSet = new ArrayList<>();
+ for (ChunkSuit4CPV chunkSuit4CPV : currentChunkList) {
+ if (chunkSuit4CPV
+ .getChunkMetadata()
+ .getStatistics()
+ .getMaxValue()
+ .equals(value)) { // TODO CHECK
+ candidateSet.add(chunkSuit4CPV);
+ } else {
+ break;
+ }
+ }
+
+ List<ChunkSuit4CPV> nonLazyLoad =
+ new ArrayList<>(
+ candidateSet); // TODO check, whether nonLazyLoad remove affects
candidateSet
+ nonLazyLoad.sort(
+ new Comparator<ChunkSuit4CPV>() { // TODO double check the sort
order logic for version
+ public int compare(ChunkSuit4CPV o1, ChunkSuit4CPV o2) {
+ return new MergeReaderPriority(
+ o2.getChunkMetadata().getVersion(),
+ o2.getChunkMetadata().getOffsetOfChunkHeader())
+ .compareTo(
+ new MergeReaderPriority(
+ o1.getChunkMetadata().getVersion(),
+ o1.getChunkMetadata().getOffsetOfChunkHeader()));
+ }
+ });
+ while (true) { // 循环2
+ // 如果set里所有点所在的chunk都是lazy
+ // load,则对所有块进行load,应用deleteIntervals,并把TP删掉(因为不管是被删除删掉还是被更新删掉都是删掉这个点)
+ if (nonLazyLoad.size() == 0) {
+ for (ChunkSuit4CPV chunkSuit4CPV : candidateSet) {
+ currentChunkList.remove(chunkSuit4CPV); // TODO check this
+ List<IPageReader> pageReaderList =
+ FileLoaderUtils.loadPageReaderList(
+ chunkSuit4CPV.getChunkMetadata(), this.timeFilter);
+ for (IPageReader pageReader : pageReaderList) { // assume only one
page in a chunk
+ ((PageReader) pageReader)
+ .split4CPV(
+ startTime,
+ endTime,
+ interval,
+ curStartTime,
+ currentChunkList,
+ null,
+ chunkSuit4CPV.getChunkMetadata());
+ }
+ }
+ break; // 退出循环2,进入循环1
+ }
+ // 否则,找出candidate set里非lazy load里version最高的那个块的TP点作为candidate point
+ ChunkSuit4CPV candidate = nonLazyLoad.get(0); // TODO check sort right
+ MergeReaderPriority candidateVersion =
+ new MergeReaderPriority(
+ candidate.getChunkMetadata().getVersion(),
+ candidate.getChunkMetadata().getOffsetOfChunkHeader());
+ long candidateTimestamp =
+ candidate.getChunkMetadata().getStatistics().getTopTimestamp(); //
TODO check
+ Object candidateValue =
+ candidate.getChunkMetadata().getStatistics().getMaxValue(); //
TODO check
+
+ // verify这个candidate point
+ // 是否被删除
+ boolean isDeletedItself = false;
+ if (candidate.getChunkMetadata().getDeleteIntervalList() != null) {
+ for (TimeRange timeRange :
candidate.getChunkMetadata().getDeleteIntervalList()) {
+ if (timeRange.contains(candidateTimestamp)) {
+ isDeletedItself = true;
+ break;
+ }
+ }
+ }
+ if (isDeletedItself) { // 是被删除,则标记candidate point所在块为lazy load,然后回到循环2
+ nonLazyLoad.remove(candidate);
+ // TODO check this can really remove the element
+ // TODO check whether nonLazyLoad remove affects candidateSet
+ // TODO check nonLazyLoad sorted by version number from high to low
+ continue; // 回到循环2
+
+ } else { // 否被删除
+
+ // 找出所有更高版本的overlap它的块
+ List<ChunkSuit4CPV> overlaps = new ArrayList<>();
+ for (ChunkSuit4CPV chunkSuit4CPV : currentChunkList) {
+ ChunkMetadata chunkMetadata = chunkSuit4CPV.getChunkMetadata();
+ MergeReaderPriority version =
+ new MergeReaderPriority(
+ chunkMetadata.getVersion(),
chunkMetadata.getOffsetOfChunkHeader());
+ if (version.compareTo(candidateVersion) <= 0) { // including
topChunkMetadata
+ continue;
+ }
+ if (candidateTimestamp < chunkMetadata.getStartTime()
+ || candidateTimestamp > chunkMetadata.getEndTime()) {
+ continue;
+ }
+ overlaps.add(chunkSuit4CPV);
+ }
+
+ if (overlaps.size() == 0) { // 否被overlap,则当前candidate point就是计算结果,结束
+ results
+ .get(5) // TODO check: minTimestamp, maxTimestamp, firstValue,
lastValue,
+ // minValue[bottomTimestamp], maxValue[topTimestamp]
+ .updateResultUsingValues(
+ new long[] {candidateTimestamp}, 1, new Object[]
{candidateValue});
+ // TODO check updateResult
+ return; // 计算结束
+ } else { // 是被overlap,则partial scan所有这些overlap的块
+ boolean isUpdate = false;
+ for (ChunkSuit4CPV chunkSuit4CPV : overlaps) {
+ // scan这个chunk的数据
+ if (chunkSuit4CPV.getBatchData() == null) {
+ List<IPageReader> pageReaderList =
+ FileLoaderUtils.loadPageReaderList(
+ chunkSuit4CPV.getChunkMetadata(), this.timeFilter);
+ List<ChunkSuit4CPV> tmpCurrentChunkList = new ArrayList<>();
+ for (IPageReader pageReader : pageReaderList) { // assume only
one page in a chunk
+ isUpdate =
+ ((PageReader)
pageReader).partialScan(candidateTimestamp); // TODO check
+ }
+ } else {
+ // 对已经加载的batchData进行partial scan,直到点的时间戳大于或等于candidateTimestamp
+ BatchDataIterator batchDataIterator =
+ chunkSuit4CPV.getBatchData().getBatchDataIterator();
+ while (batchDataIterator.hasNextTimeValuePair()) {
+ long timestamp =
batchDataIterator.nextTimeValuePair().getTimestamp();
+ if (timestamp > candidateTimestamp) {
+ break;
+ }
+ if (timestamp == candidateTimestamp) {
+ isUpdate = true;
+ break;
+ }
+ }
+ chunkSuit4CPV
+ .getBatchData()
+ .resetBatchData(); // This step is necessary, because this
BatchData may be
+ // accessed multiple times!
+ }
+ if (isUpdate) { //
提前结束对overlaps块的scan,因为已经找到一个update点证明candidate失效
+ break;
+ }
+ }
+ if (!isUpdate) { // partial scan了所有overlap的块都没有找到这样的点,则当前candidate
point就是计算结果,结束
+ results
+ .get(5) // TODO check: minTimestamp, maxTimestamp,
firstValue, lastValue,
+ // minValue[bottomTimestamp], maxValue[topTimestamp]
+ .updateResultUsingValues(
+ new long[] {candidateTimestamp}, 1, new Object[]
{candidateValue});
+ // TODO check updateResult
+ return; // 计算结束
+ } else { // 找到这样的点,于是标记candidate point所在块为lazy
+ // load,并对其chunkMetadata的deleteInterval里加上对该点时间的删除,然后进入循环2
+ if (candidate.getChunkMetadata().getDeleteIntervalList() ==
null) {
+ List<TimeRange> tmp = new ArrayList<>();
+ tmp.add(new TimeRange(candidateTimestamp, candidateTimestamp));
+ candidate.getChunkMetadata().setDeleteIntervalList(tmp);
+ } else {
+ candidate
+ .getChunkMetadata()
+ .getDeleteIntervalList()
+ .add(new TimeRange(candidateTimestamp,
candidateTimestamp)); // TODO check
+ }
+ // 删除那里不需要再加了,而这里更新就需要手动加一下删除操作
+ nonLazyLoad.remove(candidate);
+ // TODO check this can really remove the element
+ // TODO check whether nonLazyLoad remove affects candidateSet
+ // TODO check nonLazyLoad sorted by version number from high to
low
+ continue; // 回到循环2
+ }
+ }
+ }
+ }
+ }
+ }
+
+ private void calculateFirstPoint(
+ List<ChunkSuit4CPV> currentChunkList,
+ long startTime,
+ long endTime,
+ long interval,
+ long curStartTime)
+ throws IOException {
+ while (true) { // 循环1
+ // 按照startTime和version排序,找出疑似FP candidate
+ currentChunkList.sort(
+ new Comparator<ChunkSuit4CPV>() { // TODO double check the sort
order logic for different
+ // aggregations
+ public int compare(ChunkSuit4CPV o1, ChunkSuit4CPV o2) {
+ int res =
+ ((Comparable) (o1.getChunkMetadata().getStartTime()))
+ .compareTo(o2.getChunkMetadata().getStartTime());
+ if (res != 0) {
+ return res;
+ } else {
+ return new MergeReaderPriority(
+ o2.getChunkMetadata().getVersion(),
+ o2.getChunkMetadata().getOffsetOfChunkHeader())
+ .compareTo(
+ new MergeReaderPriority(
+ o1.getChunkMetadata().getVersion(),
+ o1.getChunkMetadata().getOffsetOfChunkHeader()));
+ }
+ }
+ });
+
+ // 判断该疑似candidate所在chunk是否lazy load
+ ChunkSuit4CPV susp_candidate = currentChunkList.get(0);
+ if (susp_candidate.isLazyLoad()) { // 如果是lazy
+ // load,则此时load、应用deletes、更新batchData和statistics,取消lazyLoad标记,然后回到循环1
+ currentChunkList.remove(susp_candidate); // TODO check this
+ List<IPageReader> pageReaderList =
+
FileLoaderUtils.loadPageReaderList(susp_candidate.getChunkMetadata(),
this.timeFilter);
+ for (IPageReader pageReader : pageReaderList) { // assume only one
page in a chunk
+ ((PageReader) pageReader)
+ .split4CPV(
+ startTime,
+ endTime,
+ interval,
+ curStartTime,
+ currentChunkList,
+ null,
+ susp_candidate.getChunkMetadata()); //
新增的ChunkSuit4CPV默认isLazyLoad=false
+ }
+ continue; // 回到循环1
+ } else { // 如果不是lazy load,则该疑似candidate就是真正的candidate。
+ //
于是verification判断该点是否被更高优先级(更高优先级这一点在QueryUtils.modifyChunkMetaData(chunkMetadataList,
+ // pathModifications)已做好)的deletes覆盖
+ long candidateTimestamp =
susp_candidate.getChunkMetadata().getStartTime(); // TODO check
+ Object candidateValue =
+ susp_candidate.getChunkMetadata().getStatistics().getFirstValue();
// TODO check
+
+ boolean isDeletedItself = false;
+ long deleteEndTime = -1;
+ if (susp_candidate.getChunkMetadata().getDeleteIntervalList() != null)
{
+ for (TimeRange timeRange :
susp_candidate.getChunkMetadata().getDeleteIntervalList()) {
+ if (timeRange.contains(candidateTimestamp)) {
+ isDeletedItself = true;
+ deleteEndTime =
+ Math.max(
+ deleteEndTime,
+ timeRange
+ .getMax()); //
deleteEndTime不会超过chunkEndTime,因为否则的话这个chunk就会modifyChunkMetaData步骤里被处理掉整个删掉
+ // TODO check
+ }
+ }
+ }
+ // 如果被删除,标记该点所在chunk为lazy load,并且在不load数据的情况下更新chunkStartTime,然后回到循环1
+ if (isDeletedItself) {
+ susp_candidate.setLazyLoad(true);
+ susp_candidate
+ .getChunkMetadata()
+ .getStatistics()
+ .setStartTime(deleteEndTime); // TODO check
+ continue; // 回到循环1
+ } else {
+ // 否则,则就是计算结果,结束
+ // TODO check: minTimestamp, maxTimestamp, firstValue, lastValue,
+ // minValue[bottomTimestamp], maxValue[topTimestamp]
+ results
+ .get(0)
+ .updateResultUsingValues(
+ new long[] {candidateTimestamp}, 1, new Object[]
{candidateValue});
+ results
+ .get(2)
+ .updateResultUsingValues(
+ new long[] {candidateTimestamp}, 1, new Object[]
{candidateValue});
+ return;
+ }
+ }
+ }
+ }
+
+ private void calculateLastPoint(
+ List<ChunkSuit4CPV> currentChunkList,
+ long startTime,
+ long endTime,
+ long interval,
+ long curStartTime)
+ throws IOException {
+ while (true) { // 循环1
+ // 按照startTime和version排序,找出疑似LP candidate
+ currentChunkList.sort(
+ new Comparator<ChunkSuit4CPV>() { // TODO double check the sort
order logic for different
+ // aggregations
+ public int compare(ChunkSuit4CPV o1, ChunkSuit4CPV o2) {
+ int res =
+ ((Comparable) (o2.getChunkMetadata().getEndTime()))
+ .compareTo(o1.getChunkMetadata().getEndTime());
+ if (res != 0) {
+ return res;
+ } else {
+ return new MergeReaderPriority(
+ o2.getChunkMetadata().getVersion(),
+ o2.getChunkMetadata().getOffsetOfChunkHeader())
+ .compareTo(
+ new MergeReaderPriority(
+ o1.getChunkMetadata().getVersion(),
+ o1.getChunkMetadata().getOffsetOfChunkHeader()));
+ }
+ }
+ });
+
+ // 判断该疑似candidate所在chunk是否lazy load
+ ChunkSuit4CPV susp_candidate = currentChunkList.get(0);
+ if (susp_candidate.isLazyLoad()) { // 如果是lazy
+ // load,则此时load、应用deletes、更新batchData和statistics,取消lazyLoad标记,然后回到循环1
+ currentChunkList.remove(susp_candidate); // TODO check this
+ List<IPageReader> pageReaderList =
+
FileLoaderUtils.loadPageReaderList(susp_candidate.getChunkMetadata(),
this.timeFilter);
+ for (IPageReader pageReader : pageReaderList) { // assume only one
page in a chunk
+ ((PageReader) pageReader)
+ .split4CPV(
+ startTime,
+ endTime,
+ interval,
+ curStartTime,
+ currentChunkList,
+ null,
+ susp_candidate.getChunkMetadata()); //
新增的ChunkSuit4CPV默认isLazyLoad=false
+ }
+ continue; // 回到循环1
+ } else { // 如果不是lazy load,则该疑似candidate就是真正的candidate。
+ //
于是verification判断该点是否被更高优先级(更高优先级这一点在QueryUtils.modifyChunkMetaData(chunkMetadataList,
+ // pathModifications)已做好)的deletes覆盖
+ long candidateTimestamp =
susp_candidate.getChunkMetadata().getEndTime(); // TODO check
+ Object candidateValue =
+ susp_candidate.getChunkMetadata().getStatistics().getLastValue();
// TODO check
+
+ boolean isDeletedItself = false;
+ long deleteStartTime = Long.MAX_VALUE; // TODO check
+ if (susp_candidate.getChunkMetadata().getDeleteIntervalList() != null)
{
+ for (TimeRange timeRange :
susp_candidate.getChunkMetadata().getDeleteIntervalList()) {
+ if (timeRange.contains(candidateTimestamp)) {
+ isDeletedItself = true;
+ deleteStartTime =
+ Math.min(
+ deleteStartTime,
+ timeRange
+ .getMin()); //
deleteStartTime不会小于chunkStartTime,因为否则的话这个chunk就会modifyChunkMetaData步骤里被处理掉整个删掉
+ // TODO check
+ }
+ }
+ }
+ // 如果被删除,标记该点所在chunk为lazy load,并且在不load数据的情况下更新chunkEndTime,然后回到循环1
+ if (isDeletedItself) {
+ susp_candidate.setLazyLoad(true);
+ susp_candidate
+ .getChunkMetadata()
+ .getStatistics()
+ .setEndTime(deleteStartTime); // TODO check
+ continue; // 回到循环1
+ } else {
+ // 否则,则就是计算结果,结束
+ // TODO check: minTimestamp, maxTimestamp, firstValue, lastValue,
+ // minValue[bottomTimestamp], maxValue[topTimestamp]
+ results
+ .get(1)
+ .updateResultUsingValues(
+ new long[] {candidateTimestamp}, 1, new Object[]
{candidateValue});
+ results
+ .get(3)
+ .updateResultUsingValues(
+ new long[] {candidateTimestamp}, 1, new Object[]
{candidateValue});
+ return;
+ }
+ }
+ }
+ }
+
+ /**
+ * @param curStartTime closed
+ * @param curEndTime open
+ * @param startTime closed
+ * @param endTime open
+ */
+ public List<AggregateResult> calcResult_deprecated(
long curStartTime, long curEndTime, long startTime, long endTime, long
interval)
throws IOException, QueryProcessException {
// System.out.println("====DEBUG====: calcResult for [" + curStartTime
+ "," + curEndTime +
@@ -218,6 +883,7 @@ public class LocalGroupByExecutor4CPV implements
GroupByExecutor {
// find candidate points
// System.out.println("====DEBUG====: find candidate points");
+ // TODO: may change the loop of generating candidate points? sort first??
for (int j = 0; j < currentChunkList.size(); j++) {
ChunkMetadata chunkMetadata =
currentChunkList.get(j).getChunkMetadata();
diff --git
a/server/src/test/java/org/apache/iotdb/db/integration/m4/MyTest4.java
b/server/src/test/java/org/apache/iotdb/db/integration/m4/MyTest4.java
new file mode 100644
index 0000000000..bafb22478b
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/integration/m4/MyTest4.java
@@ -0,0 +1,545 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.integration.m4;
+
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.compaction.CompactionStrategy;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.jdbc.Config;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.Locale;
+
+import static org.junit.Assert.fail;
+
+public class MyTest4 {
+
+ private static final String TIMESTAMP_STR = "Time";
+
+ private static String[] creationSqls =
+ new String[] {
+ "SET STORAGE GROUP TO root.vehicle.d0",
+ "CREATE TIMESERIES root.vehicle.d0.s0 WITH DATATYPE=INT32,
ENCODING=RLE",
+ };
+
+ private final String d0s0 = "root.vehicle.d0.s0";
+
+ private static final String insertTemplate =
+ "INSERT INTO root.vehicle.d0(timestamp,s0)" + " VALUES(%d,%d)";
+
+ private static final IoTDBConfig config =
IoTDBDescriptor.getInstance().getConfig();
+ private static boolean originalEnableCPV;
+ private static CompactionStrategy originalCompactionStrategy;
+
+ @Before
+ public void setUp() throws Exception {
+ originalCompactionStrategy = config.getCompactionStrategy();
+ config.setCompactionStrategy(CompactionStrategy.NO_COMPACTION);
+
+ originalEnableCPV = config.isEnableCPV();
+ config.setEnableCPV(true); // CPV
+
+ EnvironmentUtils.envSetUp();
+ Class.forName(Config.JDBC_DRIVER_NAME);
+ config.setTimestampPrecision("ms");
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ EnvironmentUtils.cleanEnv();
+ config.setCompactionStrategy(originalCompactionStrategy);
+ config.setEnableCPV(originalEnableCPV);
+ }
+
+ @Test
+ public void test1() {
+ prepareData1();
+
+ String[] res = new String[] {"0,1,14,7,6,2[6],8[2]"};
+ try (Connection connection =
+ DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/",
"root", "root");
+ Statement statement = connection.createStatement()) {
+ boolean hasResultSet =
+ statement.execute(
+ "SELECT min_time(s0), max_time(s0), first_value(s0),
last_value(s0), min_value(s0), max_value(s0)"
+ + " FROM root.vehicle.d0 group by ([0,25),25ms)");
+
+ Assert.assertTrue(hasResultSet);
+ try (ResultSet resultSet = statement.getResultSet()) {
+ int i = 0;
+ while (resultSet.next()) {
+ String ans =
+ resultSet.getString(TIMESTAMP_STR)
+ + ","
+ + resultSet.getString(String.format("min_time(%s)", d0s0))
+ + ","
+ + resultSet.getString(String.format("max_time(%s)", d0s0))
+ + ","
+ + resultSet.getString(String.format("first_value(%s)", d0s0))
+ + ","
+ + resultSet.getString(String.format("last_value(%s)", d0s0))
+ + ","
+ + resultSet.getString(String.format("min_value(%s)", d0s0))
+ + ","
+ + resultSet.getString(String.format("max_value(%s)", d0s0));
+ System.out.println(ans);
+ Assert.assertEquals(res[i++], ans);
+ }
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ private static void prepareData1() {
+ // data:
+ //
https://user-images.githubusercontent.com/33376433/176371221-1ecd5cdf-155b-4a00-a8b9-c793464cf289.png
+ try (Connection connection =
+ DriverManager.getConnection(
+ Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+ Statement statement = connection.createStatement()) {
+
+ for (String sql : creationSqls) {
+ statement.execute(sql);
+ }
+
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 10000,
7));
+ statement.execute("FLUSH");
+
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 1, 7));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 2, 8));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 3, 6));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 4, 7));
+ statement.execute("FLUSH");
+
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 3, 5));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 5, 6));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 6, 4));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 8, 6));
+ statement.execute("FLUSH");
+
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 4, 2));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 5, 5));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 8, 0));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 9, 2));
+ statement.execute("FLUSH");
+
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 7, 7));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 11, 8));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 13, 3));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 14, 6));
+ statement.execute("FLUSH");
+
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 6, 2));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 10, 0));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 11, 7));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 12, 2));
+ statement.execute("FLUSH");
+
+ statement.execute("delete from root.vehicle.d0.s0 where time>=8 and
time<=10");
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Test
+ public void test2() {
+ prepareData2();
+
+ String[] res = new String[] {"0,1,12,7,2,2[6],8[10]"};
+ try (Connection connection =
+ DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/",
"root", "root");
+ Statement statement = connection.createStatement()) {
+ boolean hasResultSet =
+ statement.execute(
+ "SELECT min_time(s0), max_time(s0), first_value(s0),
last_value(s0), min_value(s0), max_value(s0)"
+ + " FROM root.vehicle.d0 group by ([0,13),13ms)");
+
+ Assert.assertTrue(hasResultSet);
+ try (ResultSet resultSet = statement.getResultSet()) {
+ int i = 0;
+ while (resultSet.next()) {
+ String ans =
+ resultSet.getString(TIMESTAMP_STR)
+ + ","
+ + resultSet.getString(String.format("min_time(%s)", d0s0))
+ + ","
+ + resultSet.getString(String.format("max_time(%s)", d0s0))
+ + ","
+ + resultSet.getString(String.format("first_value(%s)", d0s0))
+ + ","
+ + resultSet.getString(String.format("last_value(%s)", d0s0))
+ + ","
+ + resultSet.getString(String.format("min_value(%s)", d0s0))
+ + ","
+ + resultSet.getString(String.format("max_value(%s)", d0s0));
+ System.out.println(ans);
+ Assert.assertEquals(res[i++], ans);
+ }
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ private static void prepareData2() {
+ // data:
+ //
https://user-images.githubusercontent.com/33376433/176371428-b7b04db1-827b-4324-82b9-7e409f1a5b2e.png
+ try (Connection connection =
+ DriverManager.getConnection(
+ Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+ Statement statement = connection.createStatement()) {
+
+ for (String sql : creationSqls) {
+ statement.execute(sql);
+ }
+
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 10000,
7));
+ statement.execute("FLUSH");
+
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 1, 7));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 2, 8));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 3, 6));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 4, 7));
+ statement.execute("FLUSH");
+
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 3, 5));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 5, 6));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 6, 4));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 8, 6));
+ statement.execute("FLUSH");
+
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 4, 2));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 5, 5));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 8, 0));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 9, 2));
+ statement.execute("FLUSH");
+
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 6, 2));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 10, 0));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 11, 7));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 12, 2));
+ statement.execute("FLUSH");
+
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 7, 7));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 10, 8));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 13, 3));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 14, 6));
+ statement.execute("FLUSH");
+
+ statement.execute("delete from root.vehicle.d0.s0 where time>=8 and
time<=8");
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Test
+ public void test3() {
+ prepareData3();
+
+ String[] res = new String[] {"0,3,14,6,6,0[10],7[11]"};
+ try (Connection connection =
+ DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/",
"root", "root");
+ Statement statement = connection.createStatement()) {
+ boolean hasResultSet =
+ statement.execute(
+ "SELECT min_time(s0), max_time(s0), first_value(s0),
last_value(s0), min_value(s0), max_value(s0)"
+ + " FROM root.vehicle.d0 group by ([0,25),25ms)");
+
+ Assert.assertTrue(hasResultSet);
+ try (ResultSet resultSet = statement.getResultSet()) {
+ int i = 0;
+ while (resultSet.next()) {
+ String ans =
+ resultSet.getString(TIMESTAMP_STR)
+ + ","
+ + resultSet.getString(String.format("min_time(%s)", d0s0))
+ + ","
+ + resultSet.getString(String.format("max_time(%s)", d0s0))
+ + ","
+ + resultSet.getString(String.format("first_value(%s)", d0s0))
+ + ","
+ + resultSet.getString(String.format("last_value(%s)", d0s0))
+ + ","
+ + resultSet.getString(String.format("min_value(%s)", d0s0))
+ + ","
+ + resultSet.getString(String.format("max_value(%s)", d0s0));
+ System.out.println(ans);
+ Assert.assertEquals(res[i++], ans);
+ }
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ private static void prepareData3() {
+ // data:
+ //
https://user-images.githubusercontent.com/33376433/176371648-b101be5d-d3cc-4673-9af8-50574d22a864.png
+ try (Connection connection =
+ DriverManager.getConnection(
+ Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+ Statement statement = connection.createStatement()) {
+
+ for (String sql : creationSqls) {
+ statement.execute(sql);
+ }
+
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 10000,
7));
+ statement.execute("FLUSH");
+
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 1, 7));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 2, 8));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 3, 6));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 4, 7));
+ statement.execute("FLUSH");
+
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 1, 5));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 5, 6));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 6, 4));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 8, 6));
+ statement.execute("FLUSH");
+
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 4, 2));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 5, 5));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 8, 0));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 9, 2));
+ statement.execute("FLUSH");
+
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 7, 7));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 11, 8));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 13, 3));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 14, 6));
+ statement.execute("FLUSH");
+
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 6, 2));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 10, 0));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 11, 7));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 12, 2));
+ statement.execute("FLUSH");
+
+ statement.execute("delete from root.vehicle.d0.s0 where time>=1 and
time<=2");
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Test
+ public void test4() {
+ prepareData4();
+
+ String[] res = new String[] {"0,4,14,2,6,0[10],7[11]"};
+ try (Connection connection =
+ DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/",
"root", "root");
+ Statement statement = connection.createStatement()) {
+ boolean hasResultSet =
+ statement.execute(
+ "SELECT min_time(s0), max_time(s0), first_value(s0),
last_value(s0), min_value(s0), max_value(s0)"
+ + " FROM root.vehicle.d0 group by ([0,25),25ms)");
+
+ Assert.assertTrue(hasResultSet);
+ try (ResultSet resultSet = statement.getResultSet()) {
+ int i = 0;
+ while (resultSet.next()) {
+ String ans =
+ resultSet.getString(TIMESTAMP_STR)
+ + ","
+ + resultSet.getString(String.format("min_time(%s)", d0s0))
+ + ","
+ + resultSet.getString(String.format("max_time(%s)", d0s0))
+ + ","
+ + resultSet.getString(String.format("first_value(%s)", d0s0))
+ + ","
+ + resultSet.getString(String.format("last_value(%s)", d0s0))
+ + ","
+ + resultSet.getString(String.format("min_value(%s)", d0s0))
+ + ","
+ + resultSet.getString(String.format("max_value(%s)", d0s0));
+ System.out.println(ans);
+ Assert.assertEquals(res[i++], ans);
+ }
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ private static void prepareData4() {
+ // data:
+ //
https://user-images.githubusercontent.com/33376433/176371811-62d8c175-fb8c-4a5e-84ab-1088ba6779d7.png
+ try (Connection connection =
+ DriverManager.getConnection(
+ Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+ Statement statement = connection.createStatement()) {
+
+ for (String sql : creationSqls) {
+ statement.execute(sql);
+ }
+
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 10000,
7));
+ statement.execute("FLUSH");
+
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 1, 7));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 2, 8));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 3, 6));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 4, 7));
+ statement.execute("FLUSH");
+
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 1, 5));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 5, 6));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 6, 4));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 8, 6));
+ statement.execute("FLUSH");
+
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 4, 2));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 5, 5));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 8, 0));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 9, 2));
+ statement.execute("FLUSH");
+
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 7, 7));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 11, 8));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 13, 3));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 14, 6));
+ statement.execute("FLUSH");
+
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 6, 2));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 10, 0));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 11, 7));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 12, 2));
+ statement.execute("FLUSH");
+
+ statement.execute("delete from root.vehicle.d0.s0 where time>=1 and
time<=2");
+ statement.execute("delete from root.vehicle.d0.s0 where time>=2 and
time<=3");
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Test
+ public void test5() {
+ prepareData5();
+
+ String[] res = new String[] {"0,1,13,5,3,0[10],8[2]"};
+ try (Connection connection =
+ DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/",
"root", "root");
+ Statement statement = connection.createStatement()) {
+ boolean hasResultSet =
+ statement.execute(
+ "SELECT min_time(s0), max_time(s0), first_value(s0),
last_value(s0), min_value(s0), max_value(s0)"
+ + " FROM root.vehicle.d0 group by ([0,25),25ms)");
+
+ Assert.assertTrue(hasResultSet);
+ try (ResultSet resultSet = statement.getResultSet()) {
+ int i = 0;
+ while (resultSet.next()) {
+ String ans =
+ resultSet.getString(TIMESTAMP_STR)
+ + ","
+ + resultSet.getString(String.format("min_time(%s)", d0s0))
+ + ","
+ + resultSet.getString(String.format("max_time(%s)", d0s0))
+ + ","
+ + resultSet.getString(String.format("first_value(%s)", d0s0))
+ + ","
+ + resultSet.getString(String.format("last_value(%s)", d0s0))
+ + ","
+ + resultSet.getString(String.format("min_value(%s)", d0s0))
+ + ","
+ + resultSet.getString(String.format("max_value(%s)", d0s0));
+ System.out.println(ans);
+ Assert.assertEquals(res[i++], ans);
+ }
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ private static void prepareData5() {
+ // data:
+ //
https://user-images.githubusercontent.com/33376433/176371991-207e306a-5a0c-443b-9ada-527309f3c42a.png
+ try (Connection connection =
+ DriverManager.getConnection(
+ Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+ Statement statement = connection.createStatement()) {
+
+ for (String sql : creationSqls) {
+ statement.execute(sql);
+ }
+
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 10000,
7));
+ statement.execute("FLUSH");
+
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 1, 7));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 2, 8));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 3, 6));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 4, 7));
+ statement.execute("FLUSH");
+
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 1, 5));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 5, 6));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 6, 4));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 8, 6));
+ statement.execute("FLUSH");
+
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 4, 2));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 5, 5));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 8, 0));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 9, 2));
+ statement.execute("FLUSH");
+
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 7, 7));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 11, 8));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 13, 3));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 14, 6));
+ statement.execute("FLUSH");
+
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 6, 2));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 10, 0));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 11, 7));
+ statement.execute(String.format(Locale.ENGLISH, insertTemplate, 14, 2));
+ statement.execute("FLUSH");
+
+ statement.execute("delete from root.vehicle.d0.s0 where time>=14 and
time<=14");
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+}
diff --git
a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/ChunkSuit4CPV.java
b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/ChunkSuit4CPV.java
index 9a66119996..ba20b96613 100644
---
a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/ChunkSuit4CPV.java
+++
b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/ChunkSuit4CPV.java
@@ -32,10 +32,12 @@ public class ChunkSuit4CPV {
private BatchData batchData;
private List<Long> mergeVersionList = new ArrayList<>();
private List<Long> mergeOffsetList = new ArrayList<>();
+ private boolean isLazyLoad = false;
public ChunkSuit4CPV(ChunkMetadata chunkMetadata) {
this.chunkMetadata = chunkMetadata;
this.batchData = null;
+ this.isLazyLoad = false;
}
public ChunkSuit4CPV(ChunkMetadata chunkMetadata, BatchData batchData) {
@@ -43,6 +45,14 @@ public class ChunkSuit4CPV {
this.batchData = batchData;
}
+ public void setLazyLoad(boolean lazyLoad) {
+ isLazyLoad = lazyLoad;
+ }
+
+ public boolean isLazyLoad() {
+ return isLazyLoad;
+ }
+
public ChunkMetadata getChunkMetadata() {
return chunkMetadata;
}
diff --git
a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/PageReader.java
b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/PageReader.java
index 0bb64cc4b6..8315b39a79 100644
---
a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/PageReader.java
+++
b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/PageReader.java
@@ -175,12 +175,13 @@ public class PageReader implements IPageReader {
statistics);
chunkMetadata1.setVersion(chunkMetadata.getVersion()); // don't miss
this
- // important, used later for candidate point verification
- // (1) candidate point itself whether is in the deleted interval
- // (2) candidate point whether is overlapped by a chunk with a larger
version number and
- // the chunk does not have a deleted interval overlapping this
candidate point
-
chunkMetadata1.setDeleteIntervalList(chunkMetadata.getDeleteIntervalList());
- // not use current Ii to modify deletedIntervalList any more
+ // // important, used later for candidate point verification
+ // // (1) candidate point itself whether is in the deleted
interval
+ // // (2) candidate point whether is overlapped by a chunk with
a larger version
+ // number and
+ // // the chunk does not have a deleted interval overlapping
this candidate point
+ //
chunkMetadata1.setDeleteIntervalList(chunkMetadata.getDeleteIntervalList());
+ // // not use current Ii to modify deletedIntervalList any more
splitChunkMetadataMap.put(idx, chunkMetadata1);
}
@@ -229,17 +230,51 @@ public class PageReader implements IPageReader {
}
int curIdx = (int) Math.floor((curStartTime - startTime) * 1.0 / interval);
int num = (int) Math.floor((endTime - startTime) * 1.0 / interval);
- for (int i = 0; i < num; i++) {
- if (splitBatchDataMap.containsKey(i) && i == curIdx &&
!splitBatchDataMap.get(i).isEmpty()) {
- currentChunkList.add(
- new ChunkSuit4CPV(splitChunkMetadataMap.get(i),
splitBatchDataMap.get(i).flip()));
- } else if (splitBatchDataMap.containsKey(i)
- && i != curIdx
- && !splitBatchDataMap.get(i).isEmpty()) {
- futureChunkList.add(
- new ChunkSuit4CPV(splitChunkMetadataMap.get(i),
splitBatchDataMap.get(i).flip()));
+
+ // for (int i = 0; i < num;
+ // i++) { // TODO: [M4] this loop can be polished, no need to loop
0~num, just loop the
+ // keySet of splitBatchDataMap
+ // if (splitBatchDataMap.containsKey(i) && i == curIdx &&
+ // !splitBatchDataMap.get(i).isEmpty()) {
+ // currentChunkList.add(
+ // new ChunkSuit4CPV(splitChunkMetadataMap.get(i),
splitBatchDataMap.get(i).flip()));
+ // } else if (splitBatchDataMap.containsKey(i)
+ // && i != curIdx
+ // && !splitBatchDataMap.get(i).isEmpty()) {
+ // futureChunkList.add(
+ // new ChunkSuit4CPV(splitChunkMetadataMap.get(i),
splitBatchDataMap.get(i).flip()));
+ // }
+ // }
+
+ for (Integer i : splitBatchDataMap.keySet()) {
+ if (!splitBatchDataMap.get(i).isEmpty()) {
+ if (i == curIdx) {
+ currentChunkList.add(
+ new ChunkSuit4CPV(splitChunkMetadataMap.get(i),
splitBatchDataMap.get(i).flip()));
+ } else {
+ futureChunkList.add(
+ new ChunkSuit4CPV(splitChunkMetadataMap.get(i),
splitBatchDataMap.get(i).flip()));
+ }
+ }
+ }
+ }
+
+ /**
+ * chunk里点时间戳从小到大递增, 所以遍历直到点的时间戳大于或等于candidateTimestamp即可结束
+ *
+ * @return true if the point whose time equals candidateTimestamp exists,
false if not
+ */
+ public boolean partialScan(long candidateTimestamp) throws IOException {
+ while (timeDecoder.hasNext(timeBuffer)) {
+ long timestamp = timeDecoder.readLong(timeBuffer);
+ if (timestamp > candidateTimestamp) {
+ return false;
+ }
+ if (timestamp == candidateTimestamp) {
+ return true;
}
}
+ return false;
}
/** @return the returned BatchData may be empty, but never be null */