This is an automated email from the ASF dual-hosted git repository.
leirui pushed a commit to branch research/M4-visualization
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/research/M4-visualization by
this push:
new b2819f1bd8 speed up by making futureChunkList sorted and maintain
splitChunkList
b2819f1bd8 is described below
commit b2819f1bd878cf7560b37d3bd6beed059489a58b
Author: Lei Rui <[email protected]>
AuthorDate: Fri Jul 1 13:32:44 2022 +0800
speed up by making futureChunkList sorted and maintain splitChunkList
---
.../dataset/groupby/LocalGroupByExecutor4CPV.java | 1468 ++++++++++----------
.../apache/iotdb/db/integration/m4/MyTest1.java | 1 +
.../apache/iotdb/db/integration/m4/MyTest2.java | 1 +
.../apache/iotdb/db/integration/m4/MyTest3.java | 1 +
.../iotdb/tsfile/read/reader/page/PageReader.java | 60 +-
5 files changed, 771 insertions(+), 760 deletions(-)
diff --git
a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutor4CPV.java
b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutor4CPV.java
index d2aa192be6..bb9e837e0f 100644
---
a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutor4CPV.java
+++
b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutor4CPV.java
@@ -19,6 +19,14 @@
package org.apache.iotdb.db.query.dataset.groupby;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Map;
+import java.util.Set;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
@@ -51,16 +59,6 @@ import org.apache.iotdb.tsfile.read.reader.page.PageReader;
import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.List;
-import java.util.ListIterator;
-import java.util.Map;
-import java.util.Set;
-
/**
* Sql format: SELECT min_time(s0), max_time(s0), first_value(s0),
last_value(s0), min_value(s0),
* max_value(s0) ROM root.xx group by ([tqs,tqe),IntervalLength).
Requirements: (1) Don't change the
@@ -77,6 +75,9 @@ public class LocalGroupByExecutor4CPV implements
GroupByExecutor {
private List<ChunkSuit4CPV> currentChunkList;
private final List<ChunkSuit4CPV> futureChunkList = new ArrayList<>();
+ // this is designed to keep the split chunk from futureChunkList, not
destroying the sorted order of futureChunkList
+ private Map<Integer, List<ChunkSuit4CPV>> splitChunkList = new HashMap<>();
+
private Filter timeFilter;
private TSDataType tsDataType;
@@ -119,6 +120,14 @@ public class LocalGroupByExecutor4CPV implements
GroupByExecutor {
try {
// TODO: this might be bad to load all chunk metadata at first
futureChunkList.addAll(seriesReader.getAllChunkMetadatas4CPV());
+ // order futureChunkList by chunk startTime
+ futureChunkList.sort(
+ new Comparator<ChunkSuit4CPV>() {
+ public int compare(ChunkSuit4CPV o1, ChunkSuit4CPV o2) {
+ return ((Comparable) (o1.getChunkMetadata().getStartTime()))
+ .compareTo(o2.getChunkMetadata().getStartTime());
+ }
+ });
} catch (IOException e) {
throw new QueryProcessException(e.getMessage());
}
@@ -129,38 +138,30 @@ public class LocalGroupByExecutor4CPV implements
GroupByExecutor {
results.add(aggrResult);
}
- /**
- * @param curStartTime closed
- * @param curEndTime open
- * @param startTime closed
- * @param endTime open
- */
- @Override
- public List<AggregateResult> calcResult(
- long curStartTime, long curEndTime, long startTime, long endTime, long
interval)
- throws IOException {
- // System.out.println("====DEBUG====: calcResult for [" + curStartTime
+ "," + curEndTime +
- // ")");
-
- // clear result cache
- for (AggregateResult result : results) {
- result.reset();
- }
+ private void getCurrentChunkListFromFutureChunkList(long curStartTime, long
curEndTime,
+ long startTime, long endTime, long interval) throws IOException {
// empty currentChunkList
currentChunkList = new ArrayList<>();
- // System.out.println("====DEBUG====: deal with futureChunkList");
+ // get related chunks from splitChunkList
+ int curIdx = (int) Math.floor((curStartTime - startTime) * 1.0 / interval);
+ if (splitChunkList.get(curIdx) != null) {
+ currentChunkList.addAll(splitChunkList.get(curIdx));
+ }
+ // iterate futureChunkList
ListIterator itr = futureChunkList.listIterator();
- List<ChunkSuit4CPV> tmpFutureChunkList = new ArrayList<>();
+// List<ChunkSuit4CPV> tmpFutureChunkList = new ArrayList<>();
while (itr.hasNext()) {
ChunkSuit4CPV chunkSuit4CPV = (ChunkSuit4CPV) (itr.next());
ChunkMetadata chunkMetadata = chunkSuit4CPV.getChunkMetadata();
long chunkMinTime = chunkMetadata.getStartTime();
long chunkMaxTime = chunkMetadata.getEndTime();
if (chunkMinTime >= curEndTime && chunkMinTime < endTime) {
- // the chunk falls on the right side of the current M4 interval Ii
- continue;
+ // the chunk falls on the right side of the current M4 interval Ii,
+ // and since futureChunkList is ordered by the startTime of
chunkMetadata,
+ // the loop can be terminated early.
+ break;
} else if (chunkMaxTime < curStartTime || chunkMinTime >= endTime) {
// the chunk falls on the left side of the current M4 interval Ii
// or the chunk falls on the right side of the total query range
@@ -186,22 +187,29 @@ public class LocalGroupByExecutor4CPV implements
GroupByExecutor {
interval,
curStartTime,
currentChunkList,
- tmpFutureChunkList,
+ splitChunkList,
chunkMetadata);
}
-
- // System.out.println(
- // "====DEBUG====: load the chunk because overlaps the M4
interval. Version="
- // + chunkMetadata.getVersion()
- // + " "
- // + chunkMetadata.getOffsetOfChunkHeader());
}
}
- futureChunkList.addAll(tmpFutureChunkList);
- tmpFutureChunkList = null;
- itr = null;
+ }
+
+ /**
+ * @param curStartTime closed
+ * @param curEndTime open
+ * @param startTime closed
+ * @param endTime open
+ */
+ @Override
+ public List<AggregateResult> calcResult(
+ long curStartTime, long curEndTime, long startTime, long endTime, long
interval)
+ throws IOException {
+ // clear result cache
+ for (AggregateResult result : results) {
+ result.reset();
+ }
- // System.out.println("====DEBUG====: deal with currentChunkList");
+ getCurrentChunkListFromFutureChunkList(curStartTime, curEndTime,
startTime, endTime, interval);
if (currentChunkList.size() == 0) {
return results;
@@ -215,7 +223,9 @@ public class LocalGroupByExecutor4CPV implements
GroupByExecutor {
return results;
}
- /** 对BatchData应用deletes操作,获得更新的BatchData和statistics赋值到chunkSuit4CPV中 */
+ /**
+ * 对BatchData应用deletes操作,获得更新的BatchData和statistics赋值到chunkSuit4CPV中
+ */
private void updateBatchData(ChunkSuit4CPV chunkSuit4CPV, TSDataType
dataType) {
if (chunkSuit4CPV.getBatchData() != null) {
BatchData batchData1 = BatchDataFactory.createBatchData(dataType, true,
false);
@@ -312,8 +322,8 @@ public class LocalGroupByExecutor4CPV implements
GroupByExecutor {
new Comparator<ChunkSuit4CPV>() { // TODO double check the sort
order logic for version
public int compare(ChunkSuit4CPV o1, ChunkSuit4CPV o2) {
return new MergeReaderPriority(
- o2.getChunkMetadata().getVersion(),
- o2.getChunkMetadata().getOffsetOfChunkHeader())
+ o2.getChunkMetadata().getVersion(),
+ o2.getChunkMetadata().getOffsetOfChunkHeader())
.compareTo(
new MergeReaderPriority(
o1.getChunkMetadata().getVersion(),
@@ -400,7 +410,7 @@ public class LocalGroupByExecutor4CPV implements
GroupByExecutor {
.get(4) // TODO check: minTimestamp, maxTimestamp, firstValue,
lastValue,
// minValue[bottomTimestamp], maxValue[topTimestamp]
.updateResultUsingValues(
- new long[] {candidateTimestamp}, 1, new Object[]
{candidateValue});
+ new long[]{candidateTimestamp}, 1, new
Object[]{candidateValue});
// TODO check updateResult
return; // 计算结束
} else { // 是被overlap,则partial scan所有这些overlap的块
@@ -444,7 +454,7 @@ public class LocalGroupByExecutor4CPV implements
GroupByExecutor {
.get(4) // TODO check: minTimestamp, maxTimestamp,
firstValue, lastValue,
// minValue[bottomTimestamp], maxValue[topTimestamp]
.updateResultUsingValues(
- new long[] {candidateTimestamp}, 1, new Object[]
{candidateValue});
+ new long[]{candidateTimestamp}, 1, new
Object[]{candidateValue});
// TODO check updateResult
return; // 计算结束
} else { // 找到这样的点,于是标记candidate point所在块为lazy
@@ -510,8 +520,8 @@ public class LocalGroupByExecutor4CPV implements
GroupByExecutor {
new Comparator<ChunkSuit4CPV>() { // TODO double check the sort
order logic for version
public int compare(ChunkSuit4CPV o1, ChunkSuit4CPV o2) {
return new MergeReaderPriority(
- o2.getChunkMetadata().getVersion(),
- o2.getChunkMetadata().getOffsetOfChunkHeader())
+ o2.getChunkMetadata().getVersion(),
+ o2.getChunkMetadata().getOffsetOfChunkHeader())
.compareTo(
new MergeReaderPriority(
o1.getChunkMetadata().getVersion(),
@@ -598,7 +608,7 @@ public class LocalGroupByExecutor4CPV implements
GroupByExecutor {
.get(5) // TODO check: minTimestamp, maxTimestamp, firstValue,
lastValue,
// minValue[bottomTimestamp], maxValue[topTimestamp]
.updateResultUsingValues(
- new long[] {candidateTimestamp}, 1, new Object[]
{candidateValue});
+ new long[]{candidateTimestamp}, 1, new
Object[]{candidateValue});
// TODO check updateResult
return; // 计算结束
} else { // 是被overlap,则partial scan所有这些overlap的块
@@ -642,7 +652,7 @@ public class LocalGroupByExecutor4CPV implements
GroupByExecutor {
.get(5) // TODO check: minTimestamp, maxTimestamp,
firstValue, lastValue,
// minValue[bottomTimestamp], maxValue[topTimestamp]
.updateResultUsingValues(
- new long[] {candidateTimestamp}, 1, new Object[]
{candidateValue});
+ new long[]{candidateTimestamp}, 1, new
Object[]{candidateValue});
// TODO check updateResult
return; // 计算结束
} else { // 找到这样的点,于是标记candidate point所在块为lazy
@@ -690,8 +700,8 @@ public class LocalGroupByExecutor4CPV implements
GroupByExecutor {
return res;
} else {
return new MergeReaderPriority(
- o2.getChunkMetadata().getVersion(),
- o2.getChunkMetadata().getOffsetOfChunkHeader())
+ o2.getChunkMetadata().getVersion(),
+ o2.getChunkMetadata().getOffsetOfChunkHeader())
.compareTo(
new MergeReaderPriority(
o1.getChunkMetadata().getVersion(),
@@ -756,11 +766,11 @@ public class LocalGroupByExecutor4CPV implements
GroupByExecutor {
results
.get(0)
.updateResultUsingValues(
- new long[] {candidateTimestamp}, 1, new Object[]
{candidateValue});
+ new long[]{candidateTimestamp}, 1, new
Object[]{candidateValue});
results
.get(2)
.updateResultUsingValues(
- new long[] {candidateTimestamp}, 1, new Object[]
{candidateValue});
+ new long[]{candidateTimestamp}, 1, new
Object[]{candidateValue});
return;
}
}
@@ -787,8 +797,8 @@ public class LocalGroupByExecutor4CPV implements
GroupByExecutor {
return res;
} else {
return new MergeReaderPriority(
- o2.getChunkMetadata().getVersion(),
- o2.getChunkMetadata().getOffsetOfChunkHeader())
+ o2.getChunkMetadata().getVersion(),
+ o2.getChunkMetadata().getOffsetOfChunkHeader())
.compareTo(
new MergeReaderPriority(
o1.getChunkMetadata().getVersion(),
@@ -853,687 +863,687 @@ public class LocalGroupByExecutor4CPV implements
GroupByExecutor {
results
.get(1)
.updateResultUsingValues(
- new long[] {candidateTimestamp}, 1, new Object[]
{candidateValue});
+ new long[]{candidateTimestamp}, 1, new
Object[]{candidateValue});
results
.get(3)
.updateResultUsingValues(
- new long[] {candidateTimestamp}, 1, new Object[]
{candidateValue});
+ new long[]{candidateTimestamp}, 1, new
Object[]{candidateValue});
return;
}
}
}
}
- /**
- * @param curStartTime closed
- * @param curEndTime open
- * @param startTime closed
- * @param endTime open
- */
- public List<AggregateResult> calcResult_deprecated(
- long curStartTime, long curEndTime, long startTime, long endTime, long
interval)
- throws IOException, QueryProcessException {
- // System.out.println("====DEBUG====: calcResult for [" + curStartTime
+ "," + curEndTime +
- // ")");
-
- // clear result cache
- for (AggregateResult result : results) {
- result.reset();
- }
- // empty currentChunkList
- currentChunkList = new ArrayList<>();
-
- // System.out.println("====DEBUG====: deal with futureChunkList");
-
- ListIterator itr = futureChunkList.listIterator();
- List<ChunkSuit4CPV> tmpFutureChunkList = new ArrayList<>();
- while (itr.hasNext()) {
- ChunkSuit4CPV chunkSuit4CPV = (ChunkSuit4CPV) (itr.next());
- ChunkMetadata chunkMetadata = chunkSuit4CPV.getChunkMetadata();
- long chunkMinTime = chunkMetadata.getStartTime();
- long chunkMaxTime = chunkMetadata.getEndTime();
- if (chunkMinTime >= curEndTime && chunkMinTime < endTime) {
- // the chunk falls on the right side of the current M4 interval Ii
- continue;
- } else if (chunkMaxTime < curStartTime || chunkMinTime >= endTime) {
- // the chunk falls on the left side of the current M4 interval Ii
- // or the chunk falls on the right side of the total query range
- itr.remove();
- } else if (chunkMinTime >= curStartTime && chunkMaxTime < curEndTime) {
- // the chunk falls completely within the current M4 interval Ii
- currentChunkList.add(chunkSuit4CPV);
- itr.remove();
- } else {
- // the chunk partially overlaps in time with the current M4 interval
Ii.
- // load this chunk, split it on deletes and all w intervals.
- // add to currentChunkList and futureChunkList.
- itr.remove();
- List<IPageReader> pageReaderList =
-
FileLoaderUtils.loadPageReaderList(chunkSuit4CPV.getChunkMetadata(),
this.timeFilter);
- for (IPageReader pageReader : pageReaderList) {
- // assume only one page in a chunk
- // assume all data on disk, no data in memory
- ((PageReader) pageReader)
- .split4CPV(
- startTime,
- endTime,
- interval,
- curStartTime,
- currentChunkList,
- tmpFutureChunkList,
- chunkMetadata);
- }
-
- // System.out.println(
- // "====DEBUG====: load the chunk because overlaps the M4
interval. Version="
- // + chunkMetadata.getVersion()
- // + " "
- // + chunkMetadata.getOffsetOfChunkHeader());
- }
- }
- futureChunkList.addAll(tmpFutureChunkList);
- tmpFutureChunkList = null;
- itr = null;
-
- // System.out.println("====DEBUG====: deal with currentChunkList");
-
- if (currentChunkList.size() == 0) {
- return results;
- }
-
- boolean[] isFinal = new boolean[4]; // default false
- do {
- long[] timestamps = new long[4]; // firstTime, lastTime, bottomTime,
topTime
- Object[] values = new Object[4]; // firstValue, lastValue, bottomValue,
topValue
- PriorityMergeReader.MergeReaderPriority[] versions =
- new PriorityMergeReader.MergeReaderPriority[4];
- int[] listIdx = new int[4];
- timestamps[0] = -1;
- timestamps[1] = -1;
- values[2] = null;
- values[3] = null;
-
- // find candidate points
- // System.out.println("====DEBUG====: find candidate points");
- // TODO: may change the loop of generating candidate points? sort first??
-
- for (int j = 0; j < currentChunkList.size(); j++) {
- ChunkMetadata chunkMetadata =
currentChunkList.get(j).getChunkMetadata();
- Statistics statistics = chunkMetadata.getStatistics();
- MergeReaderPriority version =
- new MergeReaderPriority(
- chunkMetadata.getVersion(),
chunkMetadata.getOffsetOfChunkHeader());
- // update firstPoint
- if (!isFinal[0]) {
- if (timestamps[0] == -1
- || (statistics.getStartTime() < timestamps[0])
- || (statistics.getStartTime() == timestamps[0]
- && version.compareTo(versions[0]) > 0)) {
- timestamps[0] = statistics.getStartTime();
- values[0] = statistics.getFirstValue();
- versions[0] = version;
- listIdx[0] = j;
- }
- }
- // update lastPoint
- if (!isFinal[1]) {
- if (timestamps[1] == -1
- || (statistics.getEndTime() > timestamps[1])
- || (statistics.getEndTime() == timestamps[1] &&
version.compareTo(versions[1]) > 0)) {
- timestamps[1] = statistics.getEndTime();
- values[1] = statistics.getLastValue();
- versions[1] = version;
- listIdx[1] = j;
- }
- }
- // update bottomPoint
- if (!isFinal[2]) {
- if (values[2] == null
- || (((Comparable)
(values[2])).compareTo(statistics.getMinValue()) > 0)) {
- timestamps[2] = statistics.getBottomTimestamp();
- values[2] = statistics.getMinValue();
- versions[2] = version;
- listIdx[2] = j;
- }
- }
- // update topPoint
- if (!isFinal[3]) {
- if (values[3] == null
- || (((Comparable)
(values[3])).compareTo(statistics.getMaxValue()) < 0)) {
- timestamps[3] = statistics.getTopTimestamp();
- values[3] = statistics.getMaxValue();
- versions[3] = version;
- listIdx[3] = j;
- }
- }
- }
-
- // System.out.println("====DEBUG====: verify candidate points");
-
- // verify candidate points.
- // firstPoint and lastPoint are valid for sure.
- // default results sequence: min_time(%s), max_time(%s),
first_value(%s), last_value(%s),
- // min_value(%s), max_value(%s)
- if (!isFinal[0]) { // firstPoint
- long firstTimestamp = timestamps[0];
- ChunkMetadata firstChunkMetadata =
currentChunkList.get(listIdx[0]).getChunkMetadata();
- // check if the point is deleted:
- List<TimeRange> firstDeleteIntervalList =
firstChunkMetadata.getDeleteIntervalList();
- boolean isDeletedItself = false;
- if (firstDeleteIntervalList != null) {
- for (TimeRange timeRange : firstDeleteIntervalList) {
- if (timeRange.contains(firstTimestamp)) {
- isDeletedItself = true;
- break;
- }
- }
- }
- if (isDeletedItself) {
- // System.out.println(
- // "====DEBUG====: load the chunk because candidate
firstPoint is actually
- // deleted. Version="
- // + firstChunkMetadata.getVersion()
- // + " "
- // + firstChunkMetadata.getOffsetOfChunkHeader());
-
- currentChunkList.remove(listIdx[0]);
- List<IPageReader> pageReaderList =
- FileLoaderUtils.loadPageReaderList(firstChunkMetadata,
this.timeFilter);
- for (IPageReader pageReader : pageReaderList) { // assume only one
page in a chunk
- ((PageReader) pageReader)
- .split4CPV(
- startTime,
- endTime,
- interval,
- curStartTime,
- currentChunkList,
- null,
- firstChunkMetadata);
- }
- continue; // next iteration to check currentChunkList
- } else {
- results
- .get(0)
- .updateResultUsingValues(
- Arrays.copyOfRange(timestamps, 0, 1),
- 1,
- Arrays.copyOfRange(values, 0, 1)); // min_time
- results
- .get(2)
- .updateResultUsingValues(
- Arrays.copyOfRange(timestamps, 0, 1),
- 1,
- Arrays.copyOfRange(values, 0, 1)); // first_value
- isFinal[0] = true;
- // System.out.println("====DEBUG====: find firstPoint");
- }
- }
- if (!isFinal[1]) { // lastPoint
- long lastTimestamp = timestamps[1];
- ChunkMetadata lastChunkMetadata =
currentChunkList.get(listIdx[1]).getChunkMetadata();
- // check if the point is deleted:
- List<TimeRange> lastDeleteIntervalList =
lastChunkMetadata.getDeleteIntervalList();
- boolean isDeletedItself = false;
- if (lastDeleteIntervalList != null) {
- for (TimeRange timeRange : lastDeleteIntervalList) {
- if (timeRange.contains(lastTimestamp)) {
- isDeletedItself = true;
- break;
- }
- }
- }
- if (isDeletedItself) {
- // System.out.println(
- // "====DEBUG====: load the chunk because candidate
lastPoint is actually
- // deleted. Version="
- // + lastChunkMetadata.getVersion()
- // + " "
- // + lastChunkMetadata.getOffsetOfChunkHeader());
-
- currentChunkList.remove(listIdx[1]);
- List<IPageReader> pageReaderList =
- FileLoaderUtils.loadPageReaderList(lastChunkMetadata,
this.timeFilter);
- for (IPageReader pageReader : pageReaderList) { // assume only one
page in a chunk
- ((PageReader) pageReader)
- .split4CPV(
- startTime,
- endTime,
- interval,
- curStartTime,
- currentChunkList,
- null,
- lastChunkMetadata);
- }
- continue; // next iteration to check currentChunkList
- } else {
- results
- .get(1)
- .updateResultUsingValues(
- Arrays.copyOfRange(timestamps, 1, 2),
- 1,
- Arrays.copyOfRange(values, 1, 2)); // min_time
- results
- .get(3)
- .updateResultUsingValues(
- Arrays.copyOfRange(timestamps, 1, 2),
- 1,
- Arrays.copyOfRange(values, 1, 2)); // first_value
- isFinal[1] = true;
-
- // System.out.println("====DEBUG====: find lastPoint");
- }
- }
- if (!isFinal[2]) { // bottomPoint
- long bottomTimestamp = timestamps[2];
- ChunkMetadata bottomChunkMetadata =
currentChunkList.get(listIdx[2]).getChunkMetadata();
- List<Long> mergedVersionList =
currentChunkList.get(listIdx[2]).getMergeVersionList();
- List<Long> mergedOffsetList =
currentChunkList.get(listIdx[2]).getMergeOffsetList();
- // check if the point is deleted:
- List<TimeRange> bottomDeleteIntervalList =
bottomChunkMetadata.getDeleteIntervalList();
- boolean isDeletedItself = false;
- if (bottomDeleteIntervalList != null) {
- for (TimeRange timeRange : bottomDeleteIntervalList) {
- if (timeRange.contains(bottomTimestamp)) {
- isDeletedItself = true;
- break;
- }
- }
- }
- if (isDeletedItself) {
- // System.out.println(
- // "====DEBUG====: load the chunk because candidate
bottomPoint is actually
- // deleted. Version="
- // + bottomChunkMetadata.getVersion()
- // + " "
- // + bottomChunkMetadata.getOffsetOfChunkHeader());
-
- currentChunkList.remove(listIdx[2]);
- List<IPageReader> pageReaderList =
- FileLoaderUtils.loadPageReaderList(bottomChunkMetadata,
this.timeFilter);
- for (IPageReader pageReader : pageReaderList) { // assume only one
page in a chunk
- ((PageReader) pageReader)
- .split4CPV(
- startTime,
- endTime,
- interval,
- curStartTime,
- currentChunkList,
- null,
- bottomChunkMetadata);
- }
- continue; // next iteration to check currentChunkList
- } else { // verify if it is overlapped by other chunks with larger
version number and not in
- // the deleted time interval
- List<Integer> toMerge = new ArrayList<>();
- for (int i = 0; i < currentChunkList.size(); i++) {
- ChunkMetadata chunkMetadata =
currentChunkList.get(i).getChunkMetadata();
- MergeReaderPriority version =
- new MergeReaderPriority(
- chunkMetadata.getVersion(),
chunkMetadata.getOffsetOfChunkHeader());
- if (version.compareTo(versions[2]) <= 0) { // including
bottomChunkMetadata
- continue;
- }
- if (bottomTimestamp < chunkMetadata.getStartTime()
- || bottomTimestamp > chunkMetadata.getEndTime()) {
- continue;
- }
- boolean isMerged = false;
- for (int k = 0; k < mergedVersionList.size(); k++) {
- // these chunks are MARKED "merged" - not overlapped any more
- if (mergedVersionList.get(k) == chunkMetadata.getVersion()
- && mergedOffsetList.get(k) ==
chunkMetadata.getOffsetOfChunkHeader()) {
- isMerged = true;
- break;
- }
- }
- if (isMerged) {
- continue;
- }
- toMerge.add(i);
- }
- if (toMerge.isEmpty()) {
- // System.out.println("====DEBUG====: find
bottomPoint");
-
- results
- .get(4)
- .updateResultUsingValues(
- Arrays.copyOfRange(timestamps, 2, 3),
- 1,
- Arrays.copyOfRange(values, 2, 3)); // min_value
- isFinal[2] = true;
- } else {
- // deal with toMerge chunks: delete updated points
- toMerge.add(listIdx[2]);
- List<Long> newMergedVersionList = new ArrayList<>();
- List<Long> newMergedOffsetList = new ArrayList<>();
- for (int m : toMerge) { // to MARK these chunks are "merged" - not
overlapped any more
- ChunkMetadata tmpChunkMetadata =
currentChunkList.get(m).getChunkMetadata();
- newMergedVersionList.add(tmpChunkMetadata.getVersion());
-
newMergedOffsetList.add(tmpChunkMetadata.getOffsetOfChunkHeader());
- }
- Map<MergeReaderPriority, BatchData> updateBatchDataMap = new
HashMap<>();
- Map<MergeReaderPriority, Statistics> statisticsMap = new
HashMap<>();
- for (int o = 0; o < toMerge.size(); o++) {
- // create empty batchData
- ChunkSuit4CPV chunkSuit4CPV =
currentChunkList.get(toMerge.get(o));
- ChunkMetadata chunkMetadata = chunkSuit4CPV.getChunkMetadata();
- MergeReaderPriority mergeReaderPriority =
- new MergeReaderPriority(
- chunkMetadata.getVersion(),
chunkMetadata.getOffsetOfChunkHeader());
- BatchData batch1 = BatchDataFactory.createBatchData(tsDataType,
true, false);
- updateBatchDataMap.put(mergeReaderPriority, batch1);
- // create empty statistics
- Statistics statistics = null;
- switch (tsDataType) {
- case INT32:
- statistics = new IntegerStatistics();
- break;
- case INT64:
- statistics = new LongStatistics();
- break;
- case FLOAT:
- statistics = new FloatStatistics();
- break;
- case DOUBLE:
- statistics = new DoubleStatistics();
- break;
- default:
- break;
- }
- statisticsMap.put(mergeReaderPriority, statistics);
- // prepare mergeReader
- if (chunkSuit4CPV.getBatchData() == null) {
- List<IPageReader> pageReaderList =
- FileLoaderUtils.loadPageReaderList(chunkMetadata,
this.timeFilter);
- List<ChunkSuit4CPV> tmpCurrentChunkList = new ArrayList<>();
- for (IPageReader pageReader : pageReaderList) { // assume only
one page in a chunk
- ((PageReader) pageReader)
- .split4CPV(
- startTime,
- endTime,
- interval,
- curStartTime,
- tmpCurrentChunkList,
- null,
- chunkMetadata);
- }
- currentChunkList.set(toMerge.get(o),
tmpCurrentChunkList.get(0));
- chunkSuit4CPV = currentChunkList.get(toMerge.get(o));
-
- // System.out.println(
- // "====DEBUG====: load chunk for update
merge. Version="
- // + chunkMetadata.getVersion()
- // + " "
- // +
chunkMetadata.getOffsetOfChunkHeader());
- }
- mergeReader.addReader(
- chunkSuit4CPV.getBatchData().getBatchDataIterator(),
- new MergeReaderPriority(chunkSuit4CPV.getVersion(),
chunkSuit4CPV.getOffset()));
- }
- while (mergeReader.hasNextTimeValuePair()) {
- Pair<TimeValuePair, MergeReaderPriority> res =
mergeReader.nextElement();
- TimeValuePair ret = res.left;
- // System.out.println(
- // "====DEBUG====: merge for bottomPoint.
(t,v)="
- // + ret.getTimestamp()
- // + ","
- // + ret.getValue().getValue());
- updateBatchDataMap
- .get(res.right)
- .putAnObject(ret.getTimestamp(), ret.getValue().getValue());
- switch (tsDataType) {
- case INT32:
- statisticsMap
- .get(res.right)
- .update(ret.getTimestamp(), (int)
ret.getValue().getValue());
- break;
- case INT64:
- statisticsMap
- .get(res.right)
- .update(ret.getTimestamp(), (long)
ret.getValue().getValue());
- break;
- case FLOAT:
- statisticsMap
- .get(res.right)
- .update(ret.getTimestamp(), (float)
ret.getValue().getValue());
- break;
- case DOUBLE:
- statisticsMap
- .get(res.right)
- .update(ret.getTimestamp(), (double)
ret.getValue().getValue());
- break;
- default:
- throw new
UnSupportedDataTypeException(String.valueOf(tsDataType));
- }
- }
- mergeReader.close();
-
- for (int o = 0; o < toMerge.size(); o++) {
- ChunkSuit4CPV chunkSuit4CPV =
currentChunkList.get(toMerge.get(o));
- // to MARK these chunks are "merged" - not overlapped any more
- chunkSuit4CPV.getMergeVersionList().addAll(newMergedVersionList);
- chunkSuit4CPV.getMergeOffsetList().addAll(newMergedOffsetList);
- // update BatchData
- MergeReaderPriority mergeReaderPriority =
- new MergeReaderPriority(chunkSuit4CPV.getVersion(),
chunkSuit4CPV.getOffset());
-
chunkSuit4CPV.setBatchData(updateBatchDataMap.get(mergeReaderPriority));
- chunkSuit4CPV
- .getChunkMetadata()
- .setStatistics(statisticsMap.get(mergeReaderPriority));
- }
- // System.out.println(
- // "====DEBUG====: merged chunks are : version="
- // + newMergedVersionList
- // + " offsets="
- // + newMergedOffsetList);
- continue;
- }
- }
- }
-
- if (!isFinal[3]) { // topPoint
- long topTimestamp = timestamps[3];
- ChunkMetadata topChunkMetadata =
currentChunkList.get(listIdx[3]).getChunkMetadata();
- List<Long> mergedVersionList =
currentChunkList.get(listIdx[3]).getMergeVersionList();
- List<Long> mergedOffsetList =
currentChunkList.get(listIdx[3]).getMergeOffsetList();
- // check if the point is deleted:
- List<TimeRange> topDeleteIntervalList =
topChunkMetadata.getDeleteIntervalList();
- boolean isDeletedItself = false;
- if (topDeleteIntervalList != null) {
- for (TimeRange timeRange : topDeleteIntervalList) {
- if (timeRange.contains(topTimestamp)) {
- isDeletedItself = true;
- break;
- }
- }
- }
- if (isDeletedItself) {
- // System.out.println(
- // "====DEBUG====: load the chunk because candidate
topPoint is actually
- // deleted. Version="
- // + topChunkMetadata.getVersion()
- // + " "
- // + topChunkMetadata.getOffsetOfChunkHeader());
-
- currentChunkList.remove(listIdx[3]);
- List<IPageReader> pageReaderList =
- FileLoaderUtils.loadPageReaderList(topChunkMetadata,
this.timeFilter);
- for (IPageReader pageReader : pageReaderList) { // assume only one
page in a chunk
- ((PageReader) pageReader)
- .split4CPV(
- startTime,
- endTime,
- interval,
- curStartTime,
- currentChunkList,
- null,
- topChunkMetadata);
- }
- continue; // next iteration to check currentChunkList
- } else { // verify if it is overlapped by other chunks with larger
version number and not in
- // the deleted time interval
- List<Integer> toMerge = new ArrayList<>();
- for (int i = 0; i < currentChunkList.size(); i++) {
- ChunkMetadata chunkMetadata =
currentChunkList.get(i).getChunkMetadata();
- MergeReaderPriority version =
- new MergeReaderPriority(
- chunkMetadata.getVersion(),
chunkMetadata.getOffsetOfChunkHeader());
- if (version.compareTo(versions[3]) <= 0) { // including
topChunkMetadata
- continue;
- }
- if (topTimestamp < chunkMetadata.getStartTime()
- || topTimestamp > chunkMetadata.getEndTime()) {
- continue;
- }
- boolean isMerged = false;
- for (int k = 0; k < mergedVersionList.size(); k++) {
- if (mergedVersionList.get(k) == chunkMetadata.getVersion()
- && mergedOffsetList.get(k) ==
chunkMetadata.getOffsetOfChunkHeader()) {
- isMerged = true;
- break;
- }
- }
- if (isMerged) {
- continue;
- }
- toMerge.add(i);
- }
- if (toMerge.isEmpty()) {
- results
- .get(5)
- .updateResultUsingValues(
- Arrays.copyOfRange(timestamps, 3, 4),
- 1,
- Arrays.copyOfRange(values, 3, 4)); // max_value
- isFinal[3] = true;
- // System.out.println("====DEBUG====: find topPoint");
- return results;
- } else {
- // deal with toMerge chunks: delete updated points
- toMerge.add(listIdx[3]);
- List<Long> newMergedVersionList = new ArrayList<>();
- List<Long> newMergedOffsetList = new ArrayList<>();
- for (int m : toMerge) { // to MARK these chunks are "merged" - not
overlapped any more
- ChunkMetadata tmpChunkMetadata =
currentChunkList.get(m).getChunkMetadata();
- newMergedVersionList.add(tmpChunkMetadata.getVersion());
-
newMergedOffsetList.add(tmpChunkMetadata.getOffsetOfChunkHeader());
- }
- Map<MergeReaderPriority, BatchData> updateBatchDataMap = new
HashMap<>();
- Map<MergeReaderPriority, Statistics> statisticsMap = new
HashMap<>();
- for (int o = 0; o < toMerge.size(); o++) {
- // create empty batchData
- ChunkSuit4CPV chunkSuit4CPV =
currentChunkList.get(toMerge.get(o));
- ChunkMetadata chunkMetadata = chunkSuit4CPV.getChunkMetadata();
- MergeReaderPriority mergeReaderPriority =
- new MergeReaderPriority(
- chunkMetadata.getVersion(),
chunkMetadata.getOffsetOfChunkHeader());
- BatchData batch1 = BatchDataFactory.createBatchData(tsDataType,
true, false);
- updateBatchDataMap.put(mergeReaderPriority, batch1);
- // create empty statistics
- Statistics statistics = null;
- switch (tsDataType) {
- case INT32:
- statistics = new IntegerStatistics();
- break;
- case INT64:
- statistics = new LongStatistics();
- break;
- case FLOAT:
- statistics = new FloatStatistics();
- break;
- case DOUBLE:
- statistics = new DoubleStatistics();
- break;
- default:
- break;
- }
- statisticsMap.put(mergeReaderPriority, statistics);
- // prepare mergeReader
- if (chunkSuit4CPV.getBatchData() == null) {
- List<IPageReader> pageReaderList =
- FileLoaderUtils.loadPageReaderList(chunkMetadata,
this.timeFilter);
- List<ChunkSuit4CPV> tmpCurrentChunkList = new ArrayList<>();
- for (IPageReader pageReader : pageReaderList) { // assume only
one page in a chunk
- ((PageReader) pageReader)
- .split4CPV(
- startTime,
- endTime,
- interval,
- curStartTime,
- tmpCurrentChunkList,
- null,
- chunkMetadata);
- }
- currentChunkList.set(toMerge.get(o),
tmpCurrentChunkList.get(0));
- chunkSuit4CPV = currentChunkList.get(toMerge.get(o));
-
- // System.out.println(
- // "====DEBUG====: load chunk for update
merge. Version="
- // + chunkMetadata.getVersion()
- // + " "
- // +
chunkMetadata.getOffsetOfChunkHeader());
- }
- mergeReader.addReader(
- chunkSuit4CPV.getBatchData().getBatchDataIterator(),
- new MergeReaderPriority(chunkSuit4CPV.getVersion(),
chunkSuit4CPV.getOffset()));
- }
- while (mergeReader.hasNextTimeValuePair()) {
- Pair<TimeValuePair, MergeReaderPriority> res =
mergeReader.nextElement();
- TimeValuePair ret = res.left;
- // System.out.println(
- // "====DEBUG====: merge for topPoint. (t,v)="
- // + ret.getTimestamp()
- // + ","
- // + ret.getValue().getValue());
- updateBatchDataMap
- .get(res.right)
- .putAnObject(ret.getTimestamp(), ret.getValue().getValue());
- switch (tsDataType) {
- case INT32:
- statisticsMap
- .get(res.right)
- .update(ret.getTimestamp(), (int)
ret.getValue().getValue());
- break;
- case INT64:
- statisticsMap
- .get(res.right)
- .update(ret.getTimestamp(), (long)
ret.getValue().getValue());
- break;
- case FLOAT:
- statisticsMap
- .get(res.right)
- .update(ret.getTimestamp(), (float)
ret.getValue().getValue());
- break;
- case DOUBLE:
- statisticsMap
- .get(res.right)
- .update(ret.getTimestamp(), (double)
ret.getValue().getValue());
- break;
- default:
- throw new
UnSupportedDataTypeException(String.valueOf(tsDataType));
- }
- }
- mergeReader.close();
-
- for (int o = 0; o < toMerge.size(); o++) {
- ChunkSuit4CPV chunkSuit4CPV =
currentChunkList.get(toMerge.get(o));
- // to MARK these chunks are "merged" - not overlapped any more
- chunkSuit4CPV.getMergeVersionList().addAll(newMergedVersionList);
- chunkSuit4CPV.getMergeOffsetList().addAll(newMergedOffsetList);
- // update BatchData
- MergeReaderPriority mergeReaderPriority =
- new MergeReaderPriority(chunkSuit4CPV.getVersion(),
chunkSuit4CPV.getOffset());
-
chunkSuit4CPV.setBatchData(updateBatchDataMap.get(mergeReaderPriority));
- chunkSuit4CPV
- .getChunkMetadata()
- .setStatistics(statisticsMap.get(mergeReaderPriority));
- }
- continue;
- }
- }
- }
- } while (true);
- }
+// /**
+// * @param curStartTime closed
+// * @param curEndTime open
+// * @param startTime closed
+// * @param endTime open
+// */
+// public List<AggregateResult> calcResult_deprecated(
+// long curStartTime, long curEndTime, long startTime, long endTime, long
interval)
+// throws IOException, QueryProcessException {
+// // System.out.println("====DEBUG====: calcResult for [" +
curStartTime + "," + curEndTime +
+// // ")");
+//
+// // clear result cache
+// for (AggregateResult result : results) {
+// result.reset();
+// }
+// // empty currentChunkList
+// currentChunkList = new ArrayList<>();
+//
+// // System.out.println("====DEBUG====: deal with futureChunkList");
+//
+// ListIterator itr = futureChunkList.listIterator();
+// List<ChunkSuit4CPV> tmpFutureChunkList = new ArrayList<>();
+// while (itr.hasNext()) {
+// ChunkSuit4CPV chunkSuit4CPV = (ChunkSuit4CPV) (itr.next());
+// ChunkMetadata chunkMetadata = chunkSuit4CPV.getChunkMetadata();
+// long chunkMinTime = chunkMetadata.getStartTime();
+// long chunkMaxTime = chunkMetadata.getEndTime();
+// if (chunkMinTime >= curEndTime && chunkMinTime < endTime) {
+// // the chunk falls on the right side of the current M4 interval Ii
+// continue;
+// } else if (chunkMaxTime < curStartTime || chunkMinTime >= endTime) {
+// // the chunk falls on the left side of the current M4 interval Ii
+// // or the chunk falls on the right side of the total query range
+// itr.remove();
+// } else if (chunkMinTime >= curStartTime && chunkMaxTime < curEndTime) {
+// // the chunk falls completely within the current M4 interval Ii
+// currentChunkList.add(chunkSuit4CPV);
+// itr.remove();
+// } else {
+// // the chunk partially overlaps in time with the current M4 interval
Ii.
+// // load this chunk, split it on deletes and all w intervals.
+// // add to currentChunkList and futureChunkList.
+// itr.remove();
+// List<IPageReader> pageReaderList =
+//
FileLoaderUtils.loadPageReaderList(chunkSuit4CPV.getChunkMetadata(),
this.timeFilter);
+// for (IPageReader pageReader : pageReaderList) {
+// // assume only one page in a chunk
+// // assume all data on disk, no data in memory
+// ((PageReader) pageReader)
+// .split4CPV(
+// startTime,
+// endTime,
+// interval,
+// curStartTime,
+// currentChunkList,
+// tmpFutureChunkList,
+// chunkMetadata);
+// }
+//
+// // System.out.println(
+// // "====DEBUG====: load the chunk because overlaps the M4
interval. Version="
+// // + chunkMetadata.getVersion()
+// // + " "
+// // + chunkMetadata.getOffsetOfChunkHeader());
+// }
+// }
+// futureChunkList.addAll(tmpFutureChunkList);
+// tmpFutureChunkList = null;
+// itr = null;
+//
+// // System.out.println("====DEBUG====: deal with currentChunkList");
+//
+// if (currentChunkList.size() == 0) {
+// return results;
+// }
+//
+// boolean[] isFinal = new boolean[4]; // default false
+// do {
+// long[] timestamps = new long[4]; // firstTime, lastTime, bottomTime,
topTime
+// Object[] values = new Object[4]; // firstValue, lastValue,
bottomValue, topValue
+// PriorityMergeReader.MergeReaderPriority[] versions =
+// new PriorityMergeReader.MergeReaderPriority[4];
+// int[] listIdx = new int[4];
+// timestamps[0] = -1;
+// timestamps[1] = -1;
+// values[2] = null;
+// values[3] = null;
+//
+// // find candidate points
+// // System.out.println("====DEBUG====: find candidate points");
+// // TODO: may change the loop of generating candidate points? sort
first??
+//
+// for (int j = 0; j < currentChunkList.size(); j++) {
+// ChunkMetadata chunkMetadata =
currentChunkList.get(j).getChunkMetadata();
+// Statistics statistics = chunkMetadata.getStatistics();
+// MergeReaderPriority version =
+// new MergeReaderPriority(
+// chunkMetadata.getVersion(),
chunkMetadata.getOffsetOfChunkHeader());
+// // update firstPoint
+// if (!isFinal[0]) {
+// if (timestamps[0] == -1
+// || (statistics.getStartTime() < timestamps[0])
+// || (statistics.getStartTime() == timestamps[0]
+// && version.compareTo(versions[0]) > 0)) {
+// timestamps[0] = statistics.getStartTime();
+// values[0] = statistics.getFirstValue();
+// versions[0] = version;
+// listIdx[0] = j;
+// }
+// }
+// // update lastPoint
+// if (!isFinal[1]) {
+// if (timestamps[1] == -1
+// || (statistics.getEndTime() > timestamps[1])
+// || (statistics.getEndTime() == timestamps[1] &&
version.compareTo(versions[1]) > 0)) {
+// timestamps[1] = statistics.getEndTime();
+// values[1] = statistics.getLastValue();
+// versions[1] = version;
+// listIdx[1] = j;
+// }
+// }
+// // update bottomPoint
+// if (!isFinal[2]) {
+// if (values[2] == null
+// || (((Comparable)
(values[2])).compareTo(statistics.getMinValue()) > 0)) {
+// timestamps[2] = statistics.getBottomTimestamp();
+// values[2] = statistics.getMinValue();
+// versions[2] = version;
+// listIdx[2] = j;
+// }
+// }
+// // update topPoint
+// if (!isFinal[3]) {
+// if (values[3] == null
+// || (((Comparable)
(values[3])).compareTo(statistics.getMaxValue()) < 0)) {
+// timestamps[3] = statistics.getTopTimestamp();
+// values[3] = statistics.getMaxValue();
+// versions[3] = version;
+// listIdx[3] = j;
+// }
+// }
+// }
+//
+// // System.out.println("====DEBUG====: verify candidate points");
+//
+// // verify candidate points.
+// // firstPoint and lastPoint are valid for sure.
+// // default results sequence: min_time(%s), max_time(%s),
first_value(%s), last_value(%s),
+// // min_value(%s), max_value(%s)
+// if (!isFinal[0]) { // firstPoint
+// long firstTimestamp = timestamps[0];
+// ChunkMetadata firstChunkMetadata =
currentChunkList.get(listIdx[0]).getChunkMetadata();
+// // check if the point is deleted:
+// List<TimeRange> firstDeleteIntervalList =
firstChunkMetadata.getDeleteIntervalList();
+// boolean isDeletedItself = false;
+// if (firstDeleteIntervalList != null) {
+// for (TimeRange timeRange : firstDeleteIntervalList) {
+// if (timeRange.contains(firstTimestamp)) {
+// isDeletedItself = true;
+// break;
+// }
+// }
+// }
+// if (isDeletedItself) {
+// // System.out.println(
+// // "====DEBUG====: load the chunk because candidate
firstPoint is actually
+// // deleted. Version="
+// // + firstChunkMetadata.getVersion()
+// // + " "
+// // + firstChunkMetadata.getOffsetOfChunkHeader());
+//
+// currentChunkList.remove(listIdx[0]);
+// List<IPageReader> pageReaderList =
+// FileLoaderUtils.loadPageReaderList(firstChunkMetadata,
this.timeFilter);
+// for (IPageReader pageReader : pageReaderList) { // assume only one
page in a chunk
+// ((PageReader) pageReader)
+// .split4CPV(
+// startTime,
+// endTime,
+// interval,
+// curStartTime,
+// currentChunkList,
+// null,
+// firstChunkMetadata);
+// }
+// continue; // next iteration to check currentChunkList
+// } else {
+// results
+// .get(0)
+// .updateResultUsingValues(
+// Arrays.copyOfRange(timestamps, 0, 1),
+// 1,
+// Arrays.copyOfRange(values, 0, 1)); // min_time
+// results
+// .get(2)
+// .updateResultUsingValues(
+// Arrays.copyOfRange(timestamps, 0, 1),
+// 1,
+// Arrays.copyOfRange(values, 0, 1)); // first_value
+// isFinal[0] = true;
+// // System.out.println("====DEBUG====: find firstPoint");
+// }
+// }
+// if (!isFinal[1]) { // lastPoint
+// long lastTimestamp = timestamps[1];
+// ChunkMetadata lastChunkMetadata =
currentChunkList.get(listIdx[1]).getChunkMetadata();
+// // check if the point is deleted:
+// List<TimeRange> lastDeleteIntervalList =
lastChunkMetadata.getDeleteIntervalList();
+// boolean isDeletedItself = false;
+// if (lastDeleteIntervalList != null) {
+// for (TimeRange timeRange : lastDeleteIntervalList) {
+// if (timeRange.contains(lastTimestamp)) {
+// isDeletedItself = true;
+// break;
+// }
+// }
+// }
+// if (isDeletedItself) {
+// // System.out.println(
+// // "====DEBUG====: load the chunk because candidate
lastPoint is actually
+// // deleted. Version="
+// // + lastChunkMetadata.getVersion()
+// // + " "
+// // + lastChunkMetadata.getOffsetOfChunkHeader());
+//
+// currentChunkList.remove(listIdx[1]);
+// List<IPageReader> pageReaderList =
+// FileLoaderUtils.loadPageReaderList(lastChunkMetadata,
this.timeFilter);
+// for (IPageReader pageReader : pageReaderList) { // assume only one
page in a chunk
+// ((PageReader) pageReader)
+// .split4CPV(
+// startTime,
+// endTime,
+// interval,
+// curStartTime,
+// currentChunkList,
+// null,
+// lastChunkMetadata);
+// }
+// continue; // next iteration to check currentChunkList
+// } else {
+// results
+// .get(1)
+// .updateResultUsingValues(
+// Arrays.copyOfRange(timestamps, 1, 2),
+// 1,
+// Arrays.copyOfRange(values, 1, 2)); // min_time
+// results
+// .get(3)
+// .updateResultUsingValues(
+// Arrays.copyOfRange(timestamps, 1, 2),
+// 1,
+// Arrays.copyOfRange(values, 1, 2)); // first_value
+// isFinal[1] = true;
+//
+// // System.out.println("====DEBUG====: find lastPoint");
+// }
+// }
+// if (!isFinal[2]) { // bottomPoint
+// long bottomTimestamp = timestamps[2];
+// ChunkMetadata bottomChunkMetadata =
currentChunkList.get(listIdx[2]).getChunkMetadata();
+// List<Long> mergedVersionList =
currentChunkList.get(listIdx[2]).getMergeVersionList();
+// List<Long> mergedOffsetList =
currentChunkList.get(listIdx[2]).getMergeOffsetList();
+// // check if the point is deleted:
+// List<TimeRange> bottomDeleteIntervalList =
bottomChunkMetadata.getDeleteIntervalList();
+// boolean isDeletedItself = false;
+// if (bottomDeleteIntervalList != null) {
+// for (TimeRange timeRange : bottomDeleteIntervalList) {
+// if (timeRange.contains(bottomTimestamp)) {
+// isDeletedItself = true;
+// break;
+// }
+// }
+// }
+// if (isDeletedItself) {
+// // System.out.println(
+// // "====DEBUG====: load the chunk because candidate
bottomPoint is actually
+// // deleted. Version="
+// // + bottomChunkMetadata.getVersion()
+// // + " "
+// // +
bottomChunkMetadata.getOffsetOfChunkHeader());
+//
+// currentChunkList.remove(listIdx[2]);
+// List<IPageReader> pageReaderList =
+// FileLoaderUtils.loadPageReaderList(bottomChunkMetadata,
this.timeFilter);
+// for (IPageReader pageReader : pageReaderList) { // assume only one
page in a chunk
+// ((PageReader) pageReader)
+// .split4CPV(
+// startTime,
+// endTime,
+// interval,
+// curStartTime,
+// currentChunkList,
+// null,
+// bottomChunkMetadata);
+// }
+// continue; // next iteration to check currentChunkList
+// } else { // verify if it is overlapped by other chunks with larger
version number and not in
+// // the deleted time interval
+// List<Integer> toMerge = new ArrayList<>();
+// for (int i = 0; i < currentChunkList.size(); i++) {
+// ChunkMetadata chunkMetadata =
currentChunkList.get(i).getChunkMetadata();
+// MergeReaderPriority version =
+// new MergeReaderPriority(
+// chunkMetadata.getVersion(),
chunkMetadata.getOffsetOfChunkHeader());
+// if (version.compareTo(versions[2]) <= 0) { // including
bottomChunkMetadata
+// continue;
+// }
+// if (bottomTimestamp < chunkMetadata.getStartTime()
+// || bottomTimestamp > chunkMetadata.getEndTime()) {
+// continue;
+// }
+// boolean isMerged = false;
+// for (int k = 0; k < mergedVersionList.size(); k++) {
+// // these chunks are MARKED "merged" - not overlapped any more
+// if (mergedVersionList.get(k) == chunkMetadata.getVersion()
+// && mergedOffsetList.get(k) ==
chunkMetadata.getOffsetOfChunkHeader()) {
+// isMerged = true;
+// break;
+// }
+// }
+// if (isMerged) {
+// continue;
+// }
+// toMerge.add(i);
+// }
+// if (toMerge.isEmpty()) {
+// // System.out.println("====DEBUG====: find
bottomPoint");
+//
+// results
+// .get(4)
+// .updateResultUsingValues(
+// Arrays.copyOfRange(timestamps, 2, 3),
+// 1,
+// Arrays.copyOfRange(values, 2, 3)); // min_value
+// isFinal[2] = true;
+// } else {
+// // deal with toMerge chunks: delete updated points
+// toMerge.add(listIdx[2]);
+// List<Long> newMergedVersionList = new ArrayList<>();
+// List<Long> newMergedOffsetList = new ArrayList<>();
+// for (int m : toMerge) { // to MARK these chunks are "merged" -
not overlapped any more
+// ChunkMetadata tmpChunkMetadata =
currentChunkList.get(m).getChunkMetadata();
+// newMergedVersionList.add(tmpChunkMetadata.getVersion());
+//
newMergedOffsetList.add(tmpChunkMetadata.getOffsetOfChunkHeader());
+// }
+// Map<MergeReaderPriority, BatchData> updateBatchDataMap = new
HashMap<>();
+// Map<MergeReaderPriority, Statistics> statisticsMap = new
HashMap<>();
+// for (int o = 0; o < toMerge.size(); o++) {
+// // create empty batchData
+// ChunkSuit4CPV chunkSuit4CPV =
currentChunkList.get(toMerge.get(o));
+// ChunkMetadata chunkMetadata = chunkSuit4CPV.getChunkMetadata();
+// MergeReaderPriority mergeReaderPriority =
+// new MergeReaderPriority(
+// chunkMetadata.getVersion(),
chunkMetadata.getOffsetOfChunkHeader());
+// BatchData batch1 =
BatchDataFactory.createBatchData(tsDataType, true, false);
+// updateBatchDataMap.put(mergeReaderPriority, batch1);
+// // create empty statistics
+// Statistics statistics = null;
+// switch (tsDataType) {
+// case INT32:
+// statistics = new IntegerStatistics();
+// break;
+// case INT64:
+// statistics = new LongStatistics();
+// break;
+// case FLOAT:
+// statistics = new FloatStatistics();
+// break;
+// case DOUBLE:
+// statistics = new DoubleStatistics();
+// break;
+// default:
+// break;
+// }
+// statisticsMap.put(mergeReaderPriority, statistics);
+// // prepare mergeReader
+// if (chunkSuit4CPV.getBatchData() == null) {
+// List<IPageReader> pageReaderList =
+// FileLoaderUtils.loadPageReaderList(chunkMetadata,
this.timeFilter);
+// List<ChunkSuit4CPV> tmpCurrentChunkList = new ArrayList<>();
+// for (IPageReader pageReader : pageReaderList) { // assume
only one page in a chunk
+// ((PageReader) pageReader)
+// .split4CPV(
+// startTime,
+// endTime,
+// interval,
+// curStartTime,
+// tmpCurrentChunkList,
+// null,
+// chunkMetadata);
+// }
+// currentChunkList.set(toMerge.get(o),
tmpCurrentChunkList.get(0));
+// chunkSuit4CPV = currentChunkList.get(toMerge.get(o));
+//
+// // System.out.println(
+// // "====DEBUG====: load chunk for update
merge. Version="
+// // + chunkMetadata.getVersion()
+// // + " "
+// // +
chunkMetadata.getOffsetOfChunkHeader());
+// }
+// mergeReader.addReader(
+// chunkSuit4CPV.getBatchData().getBatchDataIterator(),
+// new MergeReaderPriority(chunkSuit4CPV.getVersion(),
chunkSuit4CPV.getOffset()));
+// }
+// while (mergeReader.hasNextTimeValuePair()) {
+// Pair<TimeValuePair, MergeReaderPriority> res =
mergeReader.nextElement();
+// TimeValuePair ret = res.left;
+// // System.out.println(
+// // "====DEBUG====: merge for bottomPoint.
(t,v)="
+// // + ret.getTimestamp()
+// // + ","
+// // + ret.getValue().getValue());
+// updateBatchDataMap
+// .get(res.right)
+// .putAnObject(ret.getTimestamp(),
ret.getValue().getValue());
+// switch (tsDataType) {
+// case INT32:
+// statisticsMap
+// .get(res.right)
+// .update(ret.getTimestamp(), (int)
ret.getValue().getValue());
+// break;
+// case INT64:
+// statisticsMap
+// .get(res.right)
+// .update(ret.getTimestamp(), (long)
ret.getValue().getValue());
+// break;
+// case FLOAT:
+// statisticsMap
+// .get(res.right)
+// .update(ret.getTimestamp(), (float)
ret.getValue().getValue());
+// break;
+// case DOUBLE:
+// statisticsMap
+// .get(res.right)
+// .update(ret.getTimestamp(), (double)
ret.getValue().getValue());
+// break;
+// default:
+// throw new
UnSupportedDataTypeException(String.valueOf(tsDataType));
+// }
+// }
+// mergeReader.close();
+//
+// for (int o = 0; o < toMerge.size(); o++) {
+// ChunkSuit4CPV chunkSuit4CPV =
currentChunkList.get(toMerge.get(o));
+// // to MARK these chunks are "merged" - not overlapped any more
+//
chunkSuit4CPV.getMergeVersionList().addAll(newMergedVersionList);
+// chunkSuit4CPV.getMergeOffsetList().addAll(newMergedOffsetList);
+// // update BatchData
+// MergeReaderPriority mergeReaderPriority =
+// new MergeReaderPriority(chunkSuit4CPV.getVersion(),
chunkSuit4CPV.getOffset());
+//
chunkSuit4CPV.setBatchData(updateBatchDataMap.get(mergeReaderPriority));
+// chunkSuit4CPV
+// .getChunkMetadata()
+// .setStatistics(statisticsMap.get(mergeReaderPriority));
+// }
+// // System.out.println(
+// // "====DEBUG====: merged chunks are : version="
+// // + newMergedVersionList
+// // + " offsets="
+// // + newMergedOffsetList);
+// continue;
+// }
+// }
+// }
+//
+// if (!isFinal[3]) { // topPoint
+// long topTimestamp = timestamps[3];
+// ChunkMetadata topChunkMetadata =
currentChunkList.get(listIdx[3]).getChunkMetadata();
+// List<Long> mergedVersionList =
currentChunkList.get(listIdx[3]).getMergeVersionList();
+// List<Long> mergedOffsetList =
currentChunkList.get(listIdx[3]).getMergeOffsetList();
+// // check if the point is deleted:
+// List<TimeRange> topDeleteIntervalList =
topChunkMetadata.getDeleteIntervalList();
+// boolean isDeletedItself = false;
+// if (topDeleteIntervalList != null) {
+// for (TimeRange timeRange : topDeleteIntervalList) {
+// if (timeRange.contains(topTimestamp)) {
+// isDeletedItself = true;
+// break;
+// }
+// }
+// }
+// if (isDeletedItself) {
+// // System.out.println(
+// // "====DEBUG====: load the chunk because candidate
topPoint is actually
+// // deleted. Version="
+// // + topChunkMetadata.getVersion()
+// // + " "
+// // + topChunkMetadata.getOffsetOfChunkHeader());
+//
+// currentChunkList.remove(listIdx[3]);
+// List<IPageReader> pageReaderList =
+// FileLoaderUtils.loadPageReaderList(topChunkMetadata,
this.timeFilter);
+// for (IPageReader pageReader : pageReaderList) { // assume only one
page in a chunk
+// ((PageReader) pageReader)
+// .split4CPV(
+// startTime,
+// endTime,
+// interval,
+// curStartTime,
+// currentChunkList,
+// null,
+// topChunkMetadata);
+// }
+// continue; // next iteration to check currentChunkList
+// } else { // verify if it is overlapped by other chunks with larger
version number and not in
+// // the deleted time interval
+// List<Integer> toMerge = new ArrayList<>();
+// for (int i = 0; i < currentChunkList.size(); i++) {
+// ChunkMetadata chunkMetadata =
currentChunkList.get(i).getChunkMetadata();
+// MergeReaderPriority version =
+// new MergeReaderPriority(
+// chunkMetadata.getVersion(),
chunkMetadata.getOffsetOfChunkHeader());
+// if (version.compareTo(versions[3]) <= 0) { // including
topChunkMetadata
+// continue;
+// }
+// if (topTimestamp < chunkMetadata.getStartTime()
+// || topTimestamp > chunkMetadata.getEndTime()) {
+// continue;
+// }
+// boolean isMerged = false;
+// for (int k = 0; k < mergedVersionList.size(); k++) {
+// if (mergedVersionList.get(k) == chunkMetadata.getVersion()
+// && mergedOffsetList.get(k) ==
chunkMetadata.getOffsetOfChunkHeader()) {
+// isMerged = true;
+// break;
+// }
+// }
+// if (isMerged) {
+// continue;
+// }
+// toMerge.add(i);
+// }
+// if (toMerge.isEmpty()) {
+// results
+// .get(5)
+// .updateResultUsingValues(
+// Arrays.copyOfRange(timestamps, 3, 4),
+// 1,
+// Arrays.copyOfRange(values, 3, 4)); // max_value
+// isFinal[3] = true;
+// // System.out.println("====DEBUG====: find topPoint");
+// return results;
+// } else {
+// // deal with toMerge chunks: delete updated points
+// toMerge.add(listIdx[3]);
+// List<Long> newMergedVersionList = new ArrayList<>();
+// List<Long> newMergedOffsetList = new ArrayList<>();
+// for (int m : toMerge) { // to MARK these chunks are "merged" -
not overlapped any more
+// ChunkMetadata tmpChunkMetadata =
currentChunkList.get(m).getChunkMetadata();
+// newMergedVersionList.add(tmpChunkMetadata.getVersion());
+//
newMergedOffsetList.add(tmpChunkMetadata.getOffsetOfChunkHeader());
+// }
+// Map<MergeReaderPriority, BatchData> updateBatchDataMap = new
HashMap<>();
+// Map<MergeReaderPriority, Statistics> statisticsMap = new
HashMap<>();
+// for (int o = 0; o < toMerge.size(); o++) {
+// // create empty batchData
+// ChunkSuit4CPV chunkSuit4CPV =
currentChunkList.get(toMerge.get(o));
+// ChunkMetadata chunkMetadata = chunkSuit4CPV.getChunkMetadata();
+// MergeReaderPriority mergeReaderPriority =
+// new MergeReaderPriority(
+// chunkMetadata.getVersion(),
chunkMetadata.getOffsetOfChunkHeader());
+// BatchData batch1 =
BatchDataFactory.createBatchData(tsDataType, true, false);
+// updateBatchDataMap.put(mergeReaderPriority, batch1);
+// // create empty statistics
+// Statistics statistics = null;
+// switch (tsDataType) {
+// case INT32:
+// statistics = new IntegerStatistics();
+// break;
+// case INT64:
+// statistics = new LongStatistics();
+// break;
+// case FLOAT:
+// statistics = new FloatStatistics();
+// break;
+// case DOUBLE:
+// statistics = new DoubleStatistics();
+// break;
+// default:
+// break;
+// }
+// statisticsMap.put(mergeReaderPriority, statistics);
+// // prepare mergeReader
+// if (chunkSuit4CPV.getBatchData() == null) {
+// List<IPageReader> pageReaderList =
+// FileLoaderUtils.loadPageReaderList(chunkMetadata,
this.timeFilter);
+// List<ChunkSuit4CPV> tmpCurrentChunkList = new ArrayList<>();
+// for (IPageReader pageReader : pageReaderList) { // assume
only one page in a chunk
+// ((PageReader) pageReader)
+// .split4CPV(
+// startTime,
+// endTime,
+// interval,
+// curStartTime,
+// tmpCurrentChunkList,
+// null,
+// chunkMetadata);
+// }
+// currentChunkList.set(toMerge.get(o),
tmpCurrentChunkList.get(0));
+// chunkSuit4CPV = currentChunkList.get(toMerge.get(o));
+//
+// // System.out.println(
+// // "====DEBUG====: load chunk for update
merge. Version="
+// // + chunkMetadata.getVersion()
+// // + " "
+// // +
chunkMetadata.getOffsetOfChunkHeader());
+// }
+// mergeReader.addReader(
+// chunkSuit4CPV.getBatchData().getBatchDataIterator(),
+// new MergeReaderPriority(chunkSuit4CPV.getVersion(),
chunkSuit4CPV.getOffset()));
+// }
+// while (mergeReader.hasNextTimeValuePair()) {
+// Pair<TimeValuePair, MergeReaderPriority> res =
mergeReader.nextElement();
+// TimeValuePair ret = res.left;
+// // System.out.println(
+// // "====DEBUG====: merge for topPoint. (t,v)="
+// // + ret.getTimestamp()
+// // + ","
+// // + ret.getValue().getValue());
+// updateBatchDataMap
+// .get(res.right)
+// .putAnObject(ret.getTimestamp(),
ret.getValue().getValue());
+// switch (tsDataType) {
+// case INT32:
+// statisticsMap
+// .get(res.right)
+// .update(ret.getTimestamp(), (int)
ret.getValue().getValue());
+// break;
+// case INT64:
+// statisticsMap
+// .get(res.right)
+// .update(ret.getTimestamp(), (long)
ret.getValue().getValue());
+// break;
+// case FLOAT:
+// statisticsMap
+// .get(res.right)
+// .update(ret.getTimestamp(), (float)
ret.getValue().getValue());
+// break;
+// case DOUBLE:
+// statisticsMap
+// .get(res.right)
+// .update(ret.getTimestamp(), (double)
ret.getValue().getValue());
+// break;
+// default:
+// throw new
UnSupportedDataTypeException(String.valueOf(tsDataType));
+// }
+// }
+// mergeReader.close();
+//
+// for (int o = 0; o < toMerge.size(); o++) {
+// ChunkSuit4CPV chunkSuit4CPV =
currentChunkList.get(toMerge.get(o));
+// // to MARK these chunks are "merged" - not overlapped any more
+//
chunkSuit4CPV.getMergeVersionList().addAll(newMergedVersionList);
+// chunkSuit4CPV.getMergeOffsetList().addAll(newMergedOffsetList);
+// // update BatchData
+// MergeReaderPriority mergeReaderPriority =
+// new MergeReaderPriority(chunkSuit4CPV.getVersion(),
chunkSuit4CPV.getOffset());
+//
chunkSuit4CPV.setBatchData(updateBatchDataMap.get(mergeReaderPriority));
+// chunkSuit4CPV
+// .getChunkMetadata()
+// .setStatistics(statisticsMap.get(mergeReaderPriority));
+// }
+// continue;
+// }
+// }
+// }
+// } while (true);
+// }
@Override
public Pair<Long, Object> peekNextNotNullValue(long nextStartTime, long
nextEndTime)
diff --git
a/server/src/test/java/org/apache/iotdb/db/integration/m4/MyTest1.java
b/server/src/test/java/org/apache/iotdb/db/integration/m4/MyTest1.java
index fa4001f54a..a6a11054a5 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/m4/MyTest1.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/m4/MyTest1.java
@@ -68,6 +68,7 @@ public class MyTest1 {
EnvironmentUtils.envSetUp();
Class.forName(Config.JDBC_DRIVER_NAME);
+ config.setTimestampPrecision("ms");
}
@After
diff --git
a/server/src/test/java/org/apache/iotdb/db/integration/m4/MyTest2.java
b/server/src/test/java/org/apache/iotdb/db/integration/m4/MyTest2.java
index 8412ae44e6..cc9ef2ba6a 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/m4/MyTest2.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/m4/MyTest2.java
@@ -79,6 +79,7 @@ public class MyTest2 {
config.setEnableCPV(
true); // this test cannot be false, as the expected answer for
bottomTime and topTime can
// be different
+ config.setTimestampPrecision("ms");
}
@After
diff --git
a/server/src/test/java/org/apache/iotdb/db/integration/m4/MyTest3.java
b/server/src/test/java/org/apache/iotdb/db/integration/m4/MyTest3.java
index d09b8d90e3..5bcce47a66 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/m4/MyTest3.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/m4/MyTest3.java
@@ -68,6 +68,7 @@ public class MyTest3 {
EnvironmentUtils.envSetUp();
Class.forName(Config.JDBC_DRIVER_NAME);
+ config.setTimestampPrecision("ms");
}
@After
diff --git
a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/PageReader.java
b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/PageReader.java
index 8315b39a79..2e4541c28c 100644
---
a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/PageReader.java
+++
b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/PageReader.java
@@ -18,6 +18,12 @@
*/
package org.apache.iotdb.tsfile.read.reader.page;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
import org.apache.iotdb.tsfile.encoding.decoder.Decoder;
import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
import org.apache.iotdb.tsfile.file.header.PageHeader;
@@ -38,33 +44,37 @@ import org.apache.iotdb.tsfile.read.reader.IPageReader;
import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.iotdb.tsfile.utils.ReadWriteForEncodingUtils;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
public class PageReader implements IPageReader {
private PageHeader pageHeader;
protected TSDataType dataType;
- /** decoder for value column */
+ /**
+ * decoder for value column
+ */
protected Decoder valueDecoder;
- /** decoder for time column */
+ /**
+ * decoder for time column
+ */
protected Decoder timeDecoder;
- /** time column in memory */
+ /**
+ * time column in memory
+ */
protected ByteBuffer timeBuffer;
- /** value column in memory */
+ /**
+ * value column in memory
+ */
protected ByteBuffer valueBuffer;
protected Filter filter;
- /** A list of deleted intervals. */
+ /**
+ * A list of deleted intervals.
+ */
private List<TimeRange> deleteIntervalList;
private int deleteCursor = 0;
@@ -114,7 +124,7 @@ public class PageReader implements IPageReader {
long interval,
long curStartTime,
List<ChunkSuit4CPV> currentChunkList,
- List<ChunkSuit4CPV> futureChunkList,
+ Map<Integer, List<ChunkSuit4CPV>> splitChunkList,
ChunkMetadata chunkMetadata)
throws IOException { // note: [startTime,endTime),
[curStartTime,curEndTime)
Map<Integer, BatchData> splitBatchDataMap = new HashMap<>();
@@ -228,35 +238,21 @@ public class PageReader implements IPageReader {
throw new UnSupportedDataTypeException(String.valueOf(dataType));
}
}
- int curIdx = (int) Math.floor((curStartTime - startTime) * 1.0 / interval);
- int num = (int) Math.floor((endTime - startTime) * 1.0 / interval);
-
- // for (int i = 0; i < num;
- // i++) { // TODO: [M4] this loop can be polished, no need to loop
0~num, just loop the
- // keySet of splitBatchDataMap
- // if (splitBatchDataMap.containsKey(i) && i == curIdx &&
- // !splitBatchDataMap.get(i).isEmpty()) {
- // currentChunkList.add(
- // new ChunkSuit4CPV(splitChunkMetadataMap.get(i),
splitBatchDataMap.get(i).flip()));
- // } else if (splitBatchDataMap.containsKey(i)
- // && i != curIdx
- // && !splitBatchDataMap.get(i).isEmpty()) {
- // futureChunkList.add(
- // new ChunkSuit4CPV(splitChunkMetadataMap.get(i),
splitBatchDataMap.get(i).flip()));
- // }
- // }
+ int curIdx = (int) Math.floor((curStartTime - startTime) * 1.0 / interval);
for (Integer i : splitBatchDataMap.keySet()) {
if (!splitBatchDataMap.get(i).isEmpty()) {
if (i == curIdx) {
currentChunkList.add(
new ChunkSuit4CPV(splitChunkMetadataMap.get(i),
splitBatchDataMap.get(i).flip()));
} else {
- futureChunkList.add(
+ splitChunkList.computeIfAbsent(i, k -> new ArrayList<>());
+ splitChunkList.get(i).add(
new ChunkSuit4CPV(splitChunkMetadataMap.get(i),
splitBatchDataMap.get(i).flip()));
}
}
}
+
}
/**
@@ -277,7 +273,9 @@ public class PageReader implements IPageReader {
return false;
}
- /** @return the returned BatchData may be empty, but never be null */
+ /**
+ * @return the returned BatchData may be empty, but never be null
+ */
@SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity
warning
@Override
public BatchData getAllSatisfiedPageData(boolean ascending) throws
IOException {