jt2594838 commented on code in PR #14265:
URL: https://github.com/apache/iotdb/pull/14265#discussion_r1866890236
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java:
##########
@@ -241,17 +310,36 @@ private void sortTVList() {
@Override
public synchronized void sortTvListForFlush() {
- sortTVList();
+ if (!list.isSorted()) {
+ list.sort();
+ }
}
@Override
public TVList getTVList() {
return list;
}
+ public void setTVList(TVList list) {
+ this.list = list;
+ }
Review Comment:
The two methods (and the member `list`) should be renamed to increase
clarity. Maybe `workingList`?
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/chunk/MemPageReader.java:
##########
@@ -214,4 +237,73 @@ public boolean isModified() {
public void initTsBlockBuilder(List<TSDataType> dataTypes) {
// non-aligned page reader don't need to init TsBlockBuilder at the very
beginning
}
+
+ private void getTsBlock() {
+ if (tsBlock == null) {
+ InitializeOffsets();
+ tsBlock = tsBlockSupplier.get();
+ if (pageMetadata.getStatistics() == null) {
+ initPageStatistics();
+ }
+ }
+ }
+
+ private void InitializeOffsets() {
+ if (pageStartOffsets != null) {
+ mergeSortTvListIterator.setTVListOffsets(pageStartOffsets);
+ }
+ ((MemChunkReader.TsBlockSupplier)
tsBlockSupplier).setPageEndOffsets(pageEndOffsets);
+ }
+
+ // memory page statistics should be initialized when constructing
ReadOnlyMemChunk object.
+ // We do the initialization if it is not set, especially in test cases.
+ private void initPageStatistics() {
+ Statistics statistics = Statistics.getStatsByType(tsDataType);
+ updatePageStatisticsFromTsBlock(statistics, tsDataType);
+ statistics.setEmpty(tsBlock.isEmpty());
+ pageMetadata.setStatistics(statistics);
+ }
+
+ private void updatePageStatisticsFromTsBlock(Statistics statistics,
TSDataType dataType) {
+ if (!tsBlock.isEmpty()) {
+ switch (dataType) {
+ case BOOLEAN:
+ for (int i = 0; i < tsBlock.getPositionCount(); i++) {
+ statistics.update(tsBlock.getTimeByIndex(i),
tsBlock.getColumn(0).getBoolean(i));
+ }
+ break;
+ case TEXT:
+ case BLOB:
+ case STRING:
+ for (int i = 0; i < tsBlock.getPositionCount(); i++) {
+ statistics.update(tsBlock.getTimeByIndex(i),
tsBlock.getColumn(0).getBinary(i));
+ }
+ break;
+ case FLOAT:
+ for (int i = 0; i < tsBlock.getPositionCount(); i++) {
+ statistics.update(tsBlock.getTimeByIndex(i),
tsBlock.getColumn(0).getFloat(i));
+ }
+ break;
+ case INT32:
+ case DATE:
+ for (int i = 0; i < tsBlock.getPositionCount(); i++) {
+ statistics.update(tsBlock.getTimeByIndex(i),
tsBlock.getColumn(0).getInt(i));
+ }
+ break;
+ case INT64:
+ case TIMESTAMP:
+ for (int i = 0; i < tsBlock.getPositionCount(); i++) {
+ statistics.update(tsBlock.getTimeByIndex(i),
tsBlock.getColumn(0).getLong(i));
+ }
+ break;
+ case DOUBLE:
+ for (int i = 0; i < tsBlock.getPositionCount(); i++) {
+ statistics.update(tsBlock.getTimeByIndex(i),
tsBlock.getColumn(0).getDouble(i));
+ }
+ break;
+ default:
+ // do nothing
Review Comment:
Better to throw an exception so that it will be easier to trace bugs when
adding new types.
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/QuickBinaryTVList.java:
##########
@@ -31,9 +29,9 @@ public int compare(int idx1, int idx2) {
@Override
public void swap(int p, int q) {
- Binary valueP = getBinary(p);
+ int valueP = getValueIndex(p);
long timeP = getTime(p);
- Binary valueQ = getBinary(q);
+ int valueQ = getValueIndex(q);
Review Comment:
Better to rename the variables. The same below.
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java:
##########
@@ -136,6 +186,105 @@ public long getTime(int index) {
return timestamps.get(arrayIndex)[elementIndex];
}
+ protected void set(int index, long timestamp, int value) {
+ if (index >= rowCount) {
+ throw new ArrayIndexOutOfBoundsException(index);
+ }
+ int arrayIndex = index / ARRAY_SIZE;
+ int elementIndex = index % ARRAY_SIZE;
+ timestamps.get(arrayIndex)[elementIndex] = timestamp;
+ indices.get(arrayIndex)[elementIndex] = value;
+ }
+
+ @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity
warning
+ protected int[] cloneIndex(int[] array) {
+ int[] cloneArray = new int[array.length];
+ System.arraycopy(array, 0, cloneArray, 0, array.length);
+ return cloneArray;
+ }
+
+ /**
+ * Get the row index value in index column.
+ *
+ * @param index row index
+ */
+ public int getValueIndex(int index) {
+ if (index >= rowCount) {
+ throw new ArrayIndexOutOfBoundsException(index);
+ }
+ int arrayIndex = index / ARRAY_SIZE;
+ int elementIndex = index % ARRAY_SIZE;
+ return indices.get(arrayIndex)[elementIndex];
+ }
+
+ protected void markNullValue(int arrayIndex, int elementIndex) {
+ // init bitMap if doesn't have
+ if (bitMap == null) {
+ bitMap = new CopyOnWriteArrayList<>();
+ for (int i = 0; i < timestamps.size(); i++) {
+ bitMap.add(new BitMap(ARRAY_SIZE));
+ }
+ }
+ // if the bitmap in arrayIndex is null, init the bitmap
+ if (bitMap.get(arrayIndex) == null) {
+ bitMap.set(arrayIndex, new BitMap(ARRAY_SIZE));
+ }
+
+ // mark the null value in the current bitmap
+ bitMap.get(arrayIndex).mark(elementIndex);
+ }
+
+ /**
+ * Get whether value is null at the given position in TvList.
+ *
+ * @param rowIndex value index
+ * @return boolean
+ */
+ public boolean isNullValue(int rowIndex) {
+ if (rowIndex >= rowCount) {
+ return false;
+ }
Review Comment:
Maybe an IndexOutOfBoundException should be thrown?
##########
iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/chunk/MemChunkLoaderTest.java:
##########
@@ -105,20 +108,22 @@ public void testBooleanMemChunkLoader() throws
IOException {
memChunkLoader.close();
}
- private TsBlock buildBooleanTsBlock() {
- TsBlockBuilder builder = new
TsBlockBuilder(Collections.singletonList(TSDataType.BOOLEAN));
- builder.getTimeColumnBuilder().writeLong(1L);
- builder.getColumnBuilder(0).writeBoolean(true);
- builder.declarePosition();
- builder.getTimeColumnBuilder().writeLong(2L);
- builder.getColumnBuilder(0).writeBoolean(false);
- builder.declarePosition();
- return builder.build();
+ private Map<TVList, Integer> buildBooleanTvListMap() {
+ TVList tvList = TVList.newList(TSDataType.BOOLEAN);
+ if (tvList != null) {
+ tvList.putBoolean(1L, true);
+ tvList.putBoolean(2L, true);
+ }
+ Map<TVList, Integer> tvListMap = new LinkedHashMap<>();
+ tvListMap.put(tvList, 2);
+ return tvListMap;
}
Review Comment:
Better to add a few more lists in the map to avoid corner cases.
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java:
##########
@@ -136,6 +186,105 @@ public long getTime(int index) {
return timestamps.get(arrayIndex)[elementIndex];
}
+ protected void set(int index, long timestamp, int value) {
+ if (index >= rowCount) {
+ throw new ArrayIndexOutOfBoundsException(index);
+ }
+ int arrayIndex = index / ARRAY_SIZE;
+ int elementIndex = index % ARRAY_SIZE;
+ timestamps.get(arrayIndex)[elementIndex] = timestamp;
+ indices.get(arrayIndex)[elementIndex] = value;
+ }
+
+ @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity
warning
+ protected int[] cloneIndex(int[] array) {
+ int[] cloneArray = new int[array.length];
+ System.arraycopy(array, 0, cloneArray, 0, array.length);
+ return cloneArray;
+ }
+
+ /**
+ * Get the row index value in index column.
+ *
+ * @param index row index
+ */
+ public int getValueIndex(int index) {
+ if (index >= rowCount) {
+ throw new ArrayIndexOutOfBoundsException(index);
+ }
+ int arrayIndex = index / ARRAY_SIZE;
+ int elementIndex = index % ARRAY_SIZE;
+ return indices.get(arrayIndex)[elementIndex];
+ }
+
+ protected void markNullValue(int arrayIndex, int elementIndex) {
+ // init bitMap if doesn't have
+ if (bitMap == null) {
+ bitMap = new CopyOnWriteArrayList<>();
+ for (int i = 0; i < timestamps.size(); i++) {
+ bitMap.add(new BitMap(ARRAY_SIZE));
+ }
+ }
Review Comment:
Should not be one bitmap for one timestamp.
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/MergeSortTvListIterator.java:
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.utils.datastructure;
+
+import org.apache.iotdb.db.utils.MathUtils;
+
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.tsfile.read.TimeValuePair;
+import org.apache.tsfile.read.reader.IPointReader;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+public class MergeSortTvListIterator implements IPointReader {
+ private final List<TVList.TVListIterator> tvListIterators;
+ private final TSDataType tsDataType;
+ private TSEncoding encoding;
+ private int floatPrecision = -1;
+
+ private TimeValuePair currentTvPair;
+
+ public MergeSortTvListIterator(TSDataType tsDataType, List<TVList> tvLists) {
+ this.tsDataType = tsDataType;
+ tvListIterators = new ArrayList<>();
+ for (TVList tvList : tvLists) {
+ tvListIterators.add(tvList.iterator());
+ }
+ }
+
+ public MergeSortTvListIterator(
+ TSDataType tsDataType, TSEncoding encoding, int floatPrecision,
List<TVList> tvLists) {
+ this(tsDataType, tvLists);
+ this.encoding = encoding;
+ this.floatPrecision = floatPrecision;
+ }
+
+ private int getSelectedTVListIndex() {
+ long time = Long.MAX_VALUE;
+ int selectedTVListIndex = -1;
+ for (int i = 0; i < tvListIterators.size(); i++) {
+ TVList.TVListIterator iterator = tvListIterators.get(i);
+ TimeValuePair currTvPair = null;
+ if (iterator.hasNext()) {
+ currTvPair = iterator.current();
+ }
+
+ // update minimum time and remember selected TVList
+ if (currTvPair != null && currTvPair.getTimestamp() <= time) {
+ time = currTvPair.getTimestamp();
+ selectedTVListIndex = i;
+ }
+ }
Review Comment:
Maybe you can call `tvListIterators.get(selectedTVListIndex).next()` here so
that you will not need to iterate the lists again in `hasNextTimeValuePair()`.
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/MergeSortTvListIterator.java:
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.utils.datastructure;
+
+import org.apache.iotdb.db.utils.MathUtils;
+
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.tsfile.read.TimeValuePair;
+import org.apache.tsfile.read.reader.IPointReader;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+public class MergeSortTvListIterator implements IPointReader {
+ private final List<TVList.TVListIterator> tvListIterators;
+ private final TSDataType tsDataType;
+ private TSEncoding encoding;
+ private int floatPrecision = -1;
+
+ private TimeValuePair currentTvPair;
+
+ public MergeSortTvListIterator(TSDataType tsDataType, List<TVList> tvLists) {
+ this.tsDataType = tsDataType;
+ tvListIterators = new ArrayList<>();
+ for (TVList tvList : tvLists) {
+ tvListIterators.add(tvList.iterator());
+ }
+ }
+
+ public MergeSortTvListIterator(
+ TSDataType tsDataType, TSEncoding encoding, int floatPrecision,
List<TVList> tvLists) {
+ this(tsDataType, tvLists);
+ this.encoding = encoding;
+ this.floatPrecision = floatPrecision;
+ }
+
+ private int getSelectedTVListIndex() {
+ long time = Long.MAX_VALUE;
+ int selectedTVListIndex = -1;
+ for (int i = 0; i < tvListIterators.size(); i++) {
+ TVList.TVListIterator iterator = tvListIterators.get(i);
+ TimeValuePair currTvPair = null;
+ if (iterator.hasNext()) {
+ currTvPair = iterator.current();
+ }
+
+ // update minimum time and remember selected TVList
+ if (currTvPair != null && currTvPair.getTimestamp() <= time) {
+ time = currTvPair.getTimestamp();
+ selectedTVListIndex = i;
+ }
+ }
+ return selectedTVListIndex;
+ }
+
+ @Override
+ public boolean hasNextTimeValuePair() {
+ boolean hasNext = false;
+ int selectedTVListIndex = getSelectedTVListIndex();
+ if (selectedTVListIndex >= 0) {
+ currentTvPair = tvListIterators.get(selectedTVListIndex).next();
+ hasNext = true;
+
+ // call next to skip identical timestamp in other iterators
+ for (int i = 0; i < tvListIterators.size(); i++) {
+ TimeValuePair tvPair = tvListIterators.get(i).current();
+ if (tvPair != null && tvPair.getTimestamp() ==
currentTvPair.getTimestamp()) {
+ tvListIterators.get(i).next();
+ }
+ }
+ }
+ return hasNext;
+ }
+
+ @Override
+ public TimeValuePair nextTimeValuePair() {
+ return currentTimeValuePair();
+ }
+
+ @Override
+ public TimeValuePair currentTimeValuePair() {
+ if (encoding != null && floatPrecision != -1) {
+ if (tsDataType == TSDataType.FLOAT) {
+ float value = currentTvPair.getValue().getFloat();
+ if (!Float.isNaN(value)
+ && (encoding == TSEncoding.RLE || encoding ==
TSEncoding.TS_2DIFF)) {
+ currentTvPair
+ .getValue()
+ .setFloat(MathUtils.roundWithGivenPrecision(value,
floatPrecision));
+ }
+ } else if (tsDataType == TSDataType.DOUBLE) {
+ double value = currentTvPair.getValue().getDouble();
+ if (!Double.isNaN(value)
+ && (encoding == TSEncoding.RLE || encoding ==
TSEncoding.TS_2DIFF)) {
+ currentTvPair
+ .getValue()
+ .setDouble(MathUtils.roundWithGivenPrecision(value,
floatPrecision));
+ }
+ }
+ }
+ return currentTvPair;
+ }
+
+ @Override
+ public long getUsedMemorySize() {
+ return 0;
+ }
Review Comment:
Double-check this. And it would be better to add some comments to explain
why this reader uses no memory.
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/chunk/MemChunkReader.java:
##########
@@ -114,4 +151,91 @@ public void close() {
public List<IPageReader> loadPageReaderList() {
return this.pageReaderList;
}
+
+ /**
+ * TsBlockSupplier enables to read pages in MemTable lazily. All
MemPageReaders share one
+ * TsBlockSupplier object.
+ */
+ class TsBlockSupplier implements Supplier<TsBlock> {
+ private int[] pageEndOffsets;
+
+ public TsBlockSupplier() {}
+
+ public void setPageEndOffsets(int[] pageEndOffsets) {
+ this.pageEndOffsets = pageEndOffsets;
+ }
+
+ @Override
+ public TsBlock get() {
+ return buildTsBlock();
+ }
+
+ private TsBlock buildTsBlock() {
+ try {
+ TSDataType tsDataType = readableChunk.getDataType();
+ TsBlockBuilder builder = new
TsBlockBuilder(Collections.singletonList(tsDataType));
+ writeValidValuesIntoTsBlock(builder);
+ return builder.build();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private boolean isOutOfMemPageBounds() {
+ if (pageEndOffsets == null) {
+ return false;
+ }
+ int[] currTvListOffsets =
+ ((MergeSortTvListIterator) timeValuePairIterator).getTVListOffsets();
+ for (int i = 0; i < pageEndOffsets.length; i++) {
+ if (currTvListOffsets[i] > pageEndOffsets[i]) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ // read one page and write to tsblock
+ private synchronized void writeValidValuesIntoTsBlock(TsBlockBuilder
builder)
+ throws IOException {
+ TSDataType tsDataType = readableChunk.getDataType();
+ int[] deleteCursor = {0};
+ while (timeValuePairIterator.hasNextTimeValuePair()) {
+ if (isOutOfMemPageBounds()) {
+ break;
+ }
Review Comment:
Each point will call `isOutOfMemPageBounds` and thus `getTVListOffsets`, and
an array will be created.
I am concerned about the efficiency of the process.
Maybe MergeSortTvListIterator can cache the offsets as a member so that it
will not need to create them each time?
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/MergeSortTvListIterator.java:
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.utils.datastructure;
+
+import org.apache.iotdb.db.utils.MathUtils;
+
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.tsfile.read.TimeValuePair;
+import org.apache.tsfile.read.reader.IPointReader;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+public class MergeSortTvListIterator implements IPointReader {
+ private final List<TVList.TVListIterator> tvListIterators;
+ private final TSDataType tsDataType;
+ private TSEncoding encoding;
+ private int floatPrecision = -1;
+
+ private TimeValuePair currentTvPair;
+
+ public MergeSortTvListIterator(TSDataType tsDataType, List<TVList> tvLists) {
+ this.tsDataType = tsDataType;
+ tvListIterators = new ArrayList<>();
+ for (TVList tvList : tvLists) {
+ tvListIterators.add(tvList.iterator());
+ }
+ }
+
+ public MergeSortTvListIterator(
+ TSDataType tsDataType, TSEncoding encoding, int floatPrecision,
List<TVList> tvLists) {
+ this(tsDataType, tvLists);
+ this.encoding = encoding;
+ this.floatPrecision = floatPrecision;
+ }
+
+ private int getSelectedTVListIndex() {
+ long time = Long.MAX_VALUE;
+ int selectedTVListIndex = -1;
+ for (int i = 0; i < tvListIterators.size(); i++) {
+ TVList.TVListIterator iterator = tvListIterators.get(i);
+ TimeValuePair currTvPair = null;
+ if (iterator.hasNext()) {
+ currTvPair = iterator.current();
+ }
+
+ // update minimum time and remember selected TVList
+ if (currTvPair != null && currTvPair.getTimestamp() <= time) {
+ time = currTvPair.getTimestamp();
+ selectedTVListIndex = i;
+ }
+ }
+ return selectedTVListIndex;
+ }
+
+ @Override
+ public boolean hasNextTimeValuePair() {
+ boolean hasNext = false;
+ int selectedTVListIndex = getSelectedTVListIndex();
+ if (selectedTVListIndex >= 0) {
+ currentTvPair = tvListIterators.get(selectedTVListIndex).next();
+ hasNext = true;
+
+ // call next to skip identical timestamp in other iterators
+ for (int i = 0; i < tvListIterators.size(); i++) {
+ TimeValuePair tvPair = tvListIterators.get(i).current();
+ if (tvPair != null && tvPair.getTimestamp() ==
currentTvPair.getTimestamp()) {
+ tvListIterators.get(i).next();
+ }
+ }
+ }
+ return hasNext;
+ }
Review Comment:
The semantic is a bit weird.
If someone calls `hasNextTimeValuePair` without calling `nextTimeValuePair`,
the cursor should remain the same.
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java:
##########
@@ -136,6 +186,105 @@ public long getTime(int index) {
return timestamps.get(arrayIndex)[elementIndex];
}
+ protected void set(int index, long timestamp, int value) {
+ if (index >= rowCount) {
+ throw new ArrayIndexOutOfBoundsException(index);
+ }
+ int arrayIndex = index / ARRAY_SIZE;
+ int elementIndex = index % ARRAY_SIZE;
+ timestamps.get(arrayIndex)[elementIndex] = timestamp;
+ indices.get(arrayIndex)[elementIndex] = value;
+ }
+
+ @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity
warning
+ protected int[] cloneIndex(int[] array) {
+ int[] cloneArray = new int[array.length];
+ System.arraycopy(array, 0, cloneArray, 0, array.length);
+ return cloneArray;
+ }
Review Comment:
The annotation is unneeded.
How about Arrays.copyOf()?
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/ReadOnlyMemChunk.java:
##########
@@ -92,77 +115,223 @@ public ReadOnlyMemChunk(
floatPrecision =
TSFileDescriptor.getInstance().getConfig().getFloatPrecision();
}
}
- this.tsBlock = tvList.buildTsBlock(floatPrecision, encoding, deletionList);
- initChunkMetaFromTsBlock();
+ this.floatPrecision = floatPrecision;
+ this.encoding = encoding;
+ this.deletionList = deletionList;
+ this.tvListQueryMap = tvListQueryMap;
+ this.pageStatisticsList = new ArrayList<>();
+ this.pageOffsetsList = new ArrayList<>();
+ this.context.addTVListToSet(tvListQueryMap);
+
+ initChunkAndPageStatistics();
}
- private void initChunkMetaFromTsBlock() throws QueryProcessException {
- Statistics statsByType = Statistics.getStatsByType(dataType);
+ private void initChunkAndPageStatistics() {
+ // create chunk metadata
+ Statistics chunkStatistics = Statistics.getStatsByType(dataType);
IChunkMetadata metaData =
- new ChunkMetadata(measurementUid, dataType, null, null, 0,
statsByType);
- if (!isEmpty()) {
- switch (dataType) {
- case BOOLEAN:
- for (int i = 0; i < tsBlock.getPositionCount(); i++) {
- statsByType.update(tsBlock.getTimeByIndex(i),
tsBlock.getColumn(0).getBoolean(i));
- }
- break;
- case TEXT:
- case BLOB:
- case STRING:
- for (int i = 0; i < tsBlock.getPositionCount(); i++) {
- statsByType.update(tsBlock.getTimeByIndex(i),
tsBlock.getColumn(0).getBinary(i));
- }
- break;
- case FLOAT:
- for (int i = 0; i < tsBlock.getPositionCount(); i++) {
- statsByType.update(tsBlock.getTimeByIndex(i),
tsBlock.getColumn(0).getFloat(i));
- }
- break;
- case INT32:
- case DATE:
- for (int i = 0; i < tsBlock.getPositionCount(); i++) {
- statsByType.update(tsBlock.getTimeByIndex(i),
tsBlock.getColumn(0).getInt(i));
- }
- break;
- case INT64:
- case TIMESTAMP:
- for (int i = 0; i < tsBlock.getPositionCount(); i++) {
- statsByType.update(tsBlock.getTimeByIndex(i),
tsBlock.getColumn(0).getLong(i));
- }
- break;
- case DOUBLE:
- for (int i = 0; i < tsBlock.getPositionCount(); i++) {
- statsByType.update(tsBlock.getTimeByIndex(i),
tsBlock.getColumn(0).getDouble(i));
- }
- break;
- default:
- throw new QueryProcessException("Unsupported data type:" + dataType);
- }
- }
- statsByType.setEmpty(isEmpty());
+ new ChunkMetadata(measurementUid, dataType, null, null, 0,
chunkStatistics);
metaData.setChunkLoader(new MemChunkLoader(context, this));
metaData.setVersion(Long.MAX_VALUE);
cachedMetaData = metaData;
+
+ sortTvLists();
+ updateChunkAndPageStatisticsFromTvLists();
+ }
+
+ private void sortTvLists() {
+ for (Map.Entry<TVList, Integer> entry : getTvListQueryMap().entrySet()) {
+ TVList tvList = entry.getKey();
+ int queryRowCount = entry.getValue();
+ if (!tvList.isSorted() && queryRowCount > tvList.seqRowCount()) {
+ tvList.sort();
+ }
+ }
+ }
+
+ private void updateChunkAndPageStatisticsFromTvLists() {
+ Statistics chunkStatistics = cachedMetaData.getStatistics();
+
+ int cnt = 0;
+ int[] deleteCursor = {0};
+ List<TVList> tvLists = new ArrayList<>(tvListQueryMap.keySet());
+ MergeSortTvListIterator timeValuePairIterator =
+ new MergeSortTvListIterator(dataType, encoding, floatPrecision,
tvLists);
+ int[] tvListOffsets = timeValuePairIterator.getTVListOffsets();
+ while (timeValuePairIterator.hasNextTimeValuePair()) {
+ TimeValuePair tvPair = timeValuePairIterator.nextTimeValuePair();
+ if (!isPointDeleted(tvPair.getTimestamp(), deletionList, deleteCursor)) {
+ if (cnt % MAX_NUMBER_OF_POINTS_IN_PAGE == 0) {
+ Statistics stats = Statistics.getStatsByType(dataType);
+ pageStatisticsList.add(stats);
+ pageOffsetsList.add(tvListOffsets);
+ }
+
+ Statistics pageStatistics =
pageStatisticsList.get(pageStatisticsList.size() - 1);
+ switch (dataType) {
+ case BOOLEAN:
+ chunkStatistics.update(tvPair.getTimestamp(),
tvPair.getValue().getBoolean());
+ pageStatistics.update(tvPair.getTimestamp(),
tvPair.getValue().getBoolean());
+ break;
+ case INT32:
+ case DATE:
+ chunkStatistics.update(tvPair.getTimestamp(),
tvPair.getValue().getInt());
+ pageStatistics.update(tvPair.getTimestamp(),
tvPair.getValue().getInt());
+ break;
+ case INT64:
+ case TIMESTAMP:
+ chunkStatistics.update(tvPair.getTimestamp(),
tvPair.getValue().getLong());
+ pageStatistics.update(tvPair.getTimestamp(),
tvPair.getValue().getLong());
+ break;
+ case FLOAT:
+ chunkStatistics.update(tvPair.getTimestamp(),
tvPair.getValue().getFloat());
+ pageStatistics.update(tvPair.getTimestamp(),
tvPair.getValue().getFloat());
+ break;
+ case DOUBLE:
+ chunkStatistics.update(tvPair.getTimestamp(),
tvPair.getValue().getDouble());
+ pageStatistics.update(tvPair.getTimestamp(),
tvPair.getValue().getDouble());
+ break;
+ case TEXT:
+ case BLOB:
+ case STRING:
+ chunkStatistics.update(tvPair.getTimestamp(),
tvPair.getValue().getBinary());
+ pageStatistics.update(tvPair.getTimestamp(),
tvPair.getValue().getBinary());
+ break;
+ default:
+ // do nothing
+ }
+ pageStatistics.setEmpty(false);
+ }
+ tvListOffsets = timeValuePairIterator.getTVListOffsets();
+ cnt++;
+ }
+ chunkStatistics.setEmpty(cnt == 0);
}
public TSDataType getDataType() {
return dataType;
}
public boolean isEmpty() {
+ if (tsBlock == null) {
+ return count() == 0;
+ }
return tsBlock.isEmpty();
}
public IChunkMetadata getChunkMetaData() {
return cachedMetaData;
}
+ // we do not call getPointReader in MemChunkReader anymore. However, unit
testcases
+ // still test this method.
public IPointReader getPointReader() {
Review Comment:
In this case, you may mark it with @TestOnly.
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java:
##########
@@ -416,4 +575,76 @@ public static TVList deserialize(DataInputStream stream)
throws IOException {
public List<long[]> getTimestamps() {
return timestamps;
}
+
+ public void setOwnerQuery(QueryContext queryCtx) {
+ this.ownerQuery = queryCtx;
+ }
+
+ public QueryContext getOwnerQuery() {
+ return ownerQuery;
+ }
+
+ public List<QueryContext> getQueryContextList() {
+ return queryContextList;
+ }
+
+ public List<BitMap> getBitMap() {
+ return bitMap;
+ }
+
+ public void lockQueryList() {
+ queryListLock.lock();
+ }
+
+ public void unlockQueryList() {
+ queryListLock.unlock();
+ }
+
+ public TVListIterator iterator() {
+ return new TVListIterator();
+ }
+
+ /* TVList Iterator */
+ public class TVListIterator {
+ private int index;
+
+ public TVListIterator() {
+ index = 0;
+ }
+
+ public boolean hasNext() {
+ if (bitMap != null) {
+ // skip deleted & duplicated timestamp
+ while ((index < rowCount && isNullValue(getValueIndex(index)))
+ || (index + 1 < rowCount && getTime(index + 1) == getTime(index)))
{
+ index++;
+ }
+ } else {
+ // skip duplicated timestamp
+ while (index + 1 < rowCount && getTime(index + 1) == getTime(index)) {
+ index++;
+ }
+ }
+ return index < rowCount;
+ }
+
+ public TimeValuePair next() {
+ return getTimeValuePair(index++);
+ }
Review Comment:
Similarly here, if one calls `next` without calling `hasNext`, the result
could be incorrect.
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/ReadOnlyMemChunk.java:
##########
@@ -92,77 +115,223 @@ public ReadOnlyMemChunk(
floatPrecision =
TSFileDescriptor.getInstance().getConfig().getFloatPrecision();
}
}
- this.tsBlock = tvList.buildTsBlock(floatPrecision, encoding, deletionList);
- initChunkMetaFromTsBlock();
+ this.floatPrecision = floatPrecision;
+ this.encoding = encoding;
+ this.deletionList = deletionList;
+ this.tvListQueryMap = tvListQueryMap;
+ this.pageStatisticsList = new ArrayList<>();
+ this.pageOffsetsList = new ArrayList<>();
+ this.context.addTVListToSet(tvListQueryMap);
+
+ initChunkAndPageStatistics();
}
- private void initChunkMetaFromTsBlock() throws QueryProcessException {
- Statistics statsByType = Statistics.getStatsByType(dataType);
+ private void initChunkAndPageStatistics() {
+ // create chunk metadata
+ Statistics chunkStatistics = Statistics.getStatsByType(dataType);
IChunkMetadata metaData =
- new ChunkMetadata(measurementUid, dataType, null, null, 0,
statsByType);
- if (!isEmpty()) {
- switch (dataType) {
- case BOOLEAN:
- for (int i = 0; i < tsBlock.getPositionCount(); i++) {
- statsByType.update(tsBlock.getTimeByIndex(i),
tsBlock.getColumn(0).getBoolean(i));
- }
- break;
- case TEXT:
- case BLOB:
- case STRING:
- for (int i = 0; i < tsBlock.getPositionCount(); i++) {
- statsByType.update(tsBlock.getTimeByIndex(i),
tsBlock.getColumn(0).getBinary(i));
- }
- break;
- case FLOAT:
- for (int i = 0; i < tsBlock.getPositionCount(); i++) {
- statsByType.update(tsBlock.getTimeByIndex(i),
tsBlock.getColumn(0).getFloat(i));
- }
- break;
- case INT32:
- case DATE:
- for (int i = 0; i < tsBlock.getPositionCount(); i++) {
- statsByType.update(tsBlock.getTimeByIndex(i),
tsBlock.getColumn(0).getInt(i));
- }
- break;
- case INT64:
- case TIMESTAMP:
- for (int i = 0; i < tsBlock.getPositionCount(); i++) {
- statsByType.update(tsBlock.getTimeByIndex(i),
tsBlock.getColumn(0).getLong(i));
- }
- break;
- case DOUBLE:
- for (int i = 0; i < tsBlock.getPositionCount(); i++) {
- statsByType.update(tsBlock.getTimeByIndex(i),
tsBlock.getColumn(0).getDouble(i));
- }
- break;
- default:
- throw new QueryProcessException("Unsupported data type:" + dataType);
- }
- }
- statsByType.setEmpty(isEmpty());
+ new ChunkMetadata(measurementUid, dataType, null, null, 0,
chunkStatistics);
metaData.setChunkLoader(new MemChunkLoader(context, this));
metaData.setVersion(Long.MAX_VALUE);
cachedMetaData = metaData;
+
+ sortTvLists();
+ updateChunkAndPageStatisticsFromTvLists();
+ }
+
+ private void sortTvLists() {
+ for (Map.Entry<TVList, Integer> entry : getTvListQueryMap().entrySet()) {
+ TVList tvList = entry.getKey();
+ int queryRowCount = entry.getValue();
+ if (!tvList.isSorted() && queryRowCount > tvList.seqRowCount()) {
+ tvList.sort();
+ }
+ }
+ }
+
+ private void updateChunkAndPageStatisticsFromTvLists() {
+ Statistics chunkStatistics = cachedMetaData.getStatistics();
+
+ int cnt = 0;
+ int[] deleteCursor = {0};
+ List<TVList> tvLists = new ArrayList<>(tvListQueryMap.keySet());
+ MergeSortTvListIterator timeValuePairIterator =
+ new MergeSortTvListIterator(dataType, encoding, floatPrecision,
tvLists);
+ int[] tvListOffsets = timeValuePairIterator.getTVListOffsets();
+ while (timeValuePairIterator.hasNextTimeValuePair()) {
+ TimeValuePair tvPair = timeValuePairIterator.nextTimeValuePair();
+ if (!isPointDeleted(tvPair.getTimestamp(), deletionList, deleteCursor)) {
+ if (cnt % MAX_NUMBER_OF_POINTS_IN_PAGE == 0) {
+ Statistics stats = Statistics.getStatsByType(dataType);
+ pageStatisticsList.add(stats);
+ pageOffsetsList.add(tvListOffsets);
+ }
+
+ Statistics pageStatistics =
pageStatisticsList.get(pageStatisticsList.size() - 1);
+ switch (dataType) {
+ case BOOLEAN:
+ chunkStatistics.update(tvPair.getTimestamp(),
tvPair.getValue().getBoolean());
+ pageStatistics.update(tvPair.getTimestamp(),
tvPair.getValue().getBoolean());
+ break;
+ case INT32:
+ case DATE:
+ chunkStatistics.update(tvPair.getTimestamp(),
tvPair.getValue().getInt());
+ pageStatistics.update(tvPair.getTimestamp(),
tvPair.getValue().getInt());
+ break;
+ case INT64:
+ case TIMESTAMP:
+ chunkStatistics.update(tvPair.getTimestamp(),
tvPair.getValue().getLong());
+ pageStatistics.update(tvPair.getTimestamp(),
tvPair.getValue().getLong());
+ break;
+ case FLOAT:
+ chunkStatistics.update(tvPair.getTimestamp(),
tvPair.getValue().getFloat());
+ pageStatistics.update(tvPair.getTimestamp(),
tvPair.getValue().getFloat());
+ break;
+ case DOUBLE:
+ chunkStatistics.update(tvPair.getTimestamp(),
tvPair.getValue().getDouble());
+ pageStatistics.update(tvPair.getTimestamp(),
tvPair.getValue().getDouble());
+ break;
+ case TEXT:
+ case BLOB:
+ case STRING:
+ chunkStatistics.update(tvPair.getTimestamp(),
tvPair.getValue().getBinary());
+ pageStatistics.update(tvPair.getTimestamp(),
tvPair.getValue().getBinary());
+ break;
+ default:
+ // do nothing
+ }
+ pageStatistics.setEmpty(false);
+ }
+ tvListOffsets = timeValuePairIterator.getTVListOffsets();
Review Comment:
Is it necessary to update `tvListOffsets` for every point? I think it should
be updated only when a new page is generated.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]