This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new b2f3c4b668 [IOTDB-3093] Use TsBlock to query data in TVList (#5772)
b2f3c4b668 is described below
commit b2f3c4b668e97b07f235807e9dc0d3c763afddc8
Author: Haonan <[email protected]>
AuthorDate: Sun May 8 15:16:17 2022 +0800
[IOTDB-3093] Use TsBlock to query data in TVList (#5772)
---
.../engine/memtable/AlignedWritableMemChunk.java | 6 +-
.../querycontext/AlignedReadOnlyMemChunk.java | 170 +++++++--------
.../db/engine/querycontext/ReadOnlyMemChunk.java | 104 ++++-----
.../db/metadata/utils/ResourceByPathUtils.java | 6 +-
.../query/reader/chunk/MemAlignedChunkReader.java | 52 +----
.../query/reader/chunk/MemAlignedPageReader.java | 86 ++++----
.../db/query/reader/chunk/MemChunkReader.java | 3 +-
.../iotdb/db/query/reader/chunk/MemPageReader.java | 122 ++++++-----
.../db/utils/datastructure/AlignedTVList.java | 239 ++++++++++-----------
.../iotdb/db/utils/datastructure/BinaryTVList.java | 19 ++
.../db/utils/datastructure/BooleanTVList.java | 19 ++
.../iotdb/db/utils/datastructure/DoubleTVList.java | 21 ++
.../iotdb/db/utils/datastructure/FloatTVList.java | 21 ++
.../iotdb/db/utils/datastructure/IntTVList.java | 19 ++
.../iotdb/db/utils/datastructure/LongTVList.java | 19 ++
.../iotdb/db/utils/datastructure/TVList.java | 139 ++++--------
.../db/engine/memtable/PrimitiveMemTableTest.java | 3 +-
.../db/utils/datastructure/VectorTVListTest.java | 2 +-
.../iotdb/tsfile/read/common/block/TsBlock.java | 16 +-
.../read/common/block/column/ColumnBuilder.java | 8 +-
20 files changed, 548 insertions(+), 526 deletions(-)
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AlignedWritableMemChunk.java
b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AlignedWritableMemChunk.java
index 92f6adc314..d40915792b 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AlignedWritableMemChunk.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AlignedWritableMemChunk.java
@@ -234,11 +234,13 @@ public class AlignedWritableMemChunk implements
IWritableMemChunk {
// increase reference count
list.increaseReferenceCount();
List<Integer> columnIndexList = new ArrayList<>();
+ List<TSDataType> dataTypeList = new ArrayList<>();
for (IMeasurementSchema measurementSchema : schemaList) {
columnIndexList.add(
measurementIndexMap.getOrDefault(measurementSchema.getMeasurementId(), -1));
+ dataTypeList.add(measurementSchema.getType());
}
- return list.getTvListByColumnIndex(columnIndexList);
+ return list.getTvListByColumnIndex(columnIndexList, dataTypeList);
}
private void sortTVList() {
@@ -309,7 +311,7 @@ public class AlignedWritableMemChunk implements
IWritableMemChunk {
list.getValidRowIndexForTimeDuplicatedRows(
timeDuplicateAlignedRowIndexList, columnIndex);
}
- boolean isNull = list.isValueMarked(originRowIndex, columnIndex);
+ boolean isNull = list.isNullValue(originRowIndex, columnIndex);
switch (dataTypes.get(columnIndex)) {
case BOOLEAN:
alignedChunkWriter.write(
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/querycontext/AlignedReadOnlyMemChunk.java
b/server/src/main/java/org/apache/iotdb/db/engine/querycontext/AlignedReadOnlyMemChunk.java
index f0d7cb1be5..14dd30b7e2 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/querycontext/AlignedReadOnlyMemChunk.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/querycontext/AlignedReadOnlyMemChunk.java
@@ -33,10 +33,6 @@ import
org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
import org.apache.iotdb.tsfile.read.common.TimeRange;
import org.apache.iotdb.tsfile.read.reader.IPointReader;
import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
-import org.apache.iotdb.tsfile.write.schema.VectorMeasurementSchema;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
@@ -44,121 +40,119 @@ import java.util.List;
public class AlignedReadOnlyMemChunk extends ReadOnlyMemChunk {
- // deletion list for this chunk
- private final List<List<TimeRange>> deletionList;
-
- private String measurementUid;
- private TSDataType dataType;
- private List<TSEncoding> encodingList;
-
- private static final Logger logger =
LoggerFactory.getLogger(AlignedReadOnlyMemChunk.class);
+ private final String timeChunkName;
- private int floatPrecision =
TSFileDescriptor.getInstance().getConfig().getFloatPrecision();
+ private final List<String> valueChunkNames;
- private AlignedTVList chunkData;
-
- private int chunkDataSize;
+ private final List<TSDataType> dataTypes;
/**
* The constructor for Aligned type.
*
* @param schema VectorMeasurementSchema
* @param tvList VectorTvList
- * @param size The Number of Chunk data points
* @param deletionList The timeRange of deletionList
*/
public AlignedReadOnlyMemChunk(
- IMeasurementSchema schema, TVList tvList, int size,
List<List<TimeRange>> deletionList)
- throws IOException, QueryProcessException {
+ IMeasurementSchema schema, TVList tvList, List<List<TimeRange>>
deletionList)
+ throws QueryProcessException {
super();
- this.measurementUid = schema.getMeasurementId();
- this.dataType = schema.getType();
-
- this.encodingList = ((VectorMeasurementSchema)
schema).getSubMeasurementsTSEncodingList();
- this.chunkData = (AlignedTVList) tvList;
- this.chunkDataSize = size;
- this.deletionList = deletionList;
-
- this.chunkPointReader =
- (chunkData).getAlignedIterator(floatPrecision, encodingList,
chunkDataSize, deletionList);
- initAlignedChunkMeta((VectorMeasurementSchema) schema);
+ this.timeChunkName = schema.getMeasurementId();
+ this.valueChunkNames = schema.getSubMeasurementsList();
+ this.dataTypes = schema.getSubMeasurementsTSDataTypeList();
+ int floatPrecision =
TSFileDescriptor.getInstance().getConfig().getFloatPrecision();
+ List<TSEncoding> encodingList = schema.getSubMeasurementsTSEncodingList();
+ this.tsBlock =
+ ((AlignedTVList) tvList).buildTsBlock(floatPrecision, encodingList,
deletionList);
+ initAlignedChunkMetaFromTsBlock();
}
- private void initAlignedChunkMeta(VectorMeasurementSchema schema)
- throws IOException, QueryProcessException {
- AlignedTVList alignedChunkData = (AlignedTVList) chunkData;
- List<String> measurementList = schema.getSubMeasurementsList();
- List<TSDataType> dataTypeList = schema.getSubMeasurementsTSDataTypeList();
+ private void initAlignedChunkMetaFromTsBlock() throws QueryProcessException {
// time chunk
Statistics timeStatistics = Statistics.getStatsByType(TSDataType.VECTOR);
IChunkMetadata timeChunkMetadata =
- new ChunkMetadata(measurementUid, TSDataType.VECTOR, 0,
timeStatistics);
+ new ChunkMetadata(timeChunkName, TSDataType.VECTOR, 0, timeStatistics);
List<IChunkMetadata> valueChunkMetadataList = new ArrayList<>();
// update time chunk
- for (int row = 0; row < alignedChunkData.rowCount(); row++) {
- timeStatistics.update(alignedChunkData.getTime(row));
+ for (int row = 0; row < tsBlock.getPositionCount(); row++) {
+ timeStatistics.update(tsBlock.getTimeColumn().getLong(row));
}
timeStatistics.setEmpty(false);
// update value chunk
- for (int column = 0; column < measurementList.size(); column++) {
- Statistics valueStatistics =
Statistics.getStatsByType(dataTypeList.get(column));
+ for (int column = 0; column < tsBlock.getValueColumnCount(); column++) {
+ Statistics valueStatistics =
Statistics.getStatsByType(dataTypes.get(column));
+ valueStatistics.setEmpty(true);
IChunkMetadata valueChunkMetadata =
- new ChunkMetadata(
- measurementList.get(column), dataTypeList.get(column), 0,
valueStatistics);
+ new ChunkMetadata(valueChunkNames.get(column),
dataTypes.get(column), 0, valueStatistics);
valueChunkMetadataList.add(valueChunkMetadata);
- if (alignedChunkData.getValues().get(column) == null) {
- valueStatistics.setEmpty(true);
- continue;
- }
- for (int row = 0; row < alignedChunkData.rowCount(); row++) {
- long time = alignedChunkData.getTime(row);
- int originRowIndex = alignedChunkData.getValueIndex(row);
- boolean isNull = alignedChunkData.isValueMarked(originRowIndex,
column);
- if (isNull) {
- continue;
- }
- switch (dataTypeList.get(column)) {
- case BOOLEAN:
- valueStatistics.update(
- time, alignedChunkData.getBooleanByValueIndex(originRowIndex,
column));
- break;
- case TEXT:
- valueStatistics.update(
- time, alignedChunkData.getBinaryByValueIndex(originRowIndex,
column));
- break;
- case FLOAT:
- valueStatistics.update(
- time, alignedChunkData.getFloatByValueIndex(originRowIndex,
column));
- break;
- case INT32:
- valueStatistics.update(
- time, alignedChunkData.getIntByValueIndex(originRowIndex,
column));
- break;
- case INT64:
- valueStatistics.update(
- time, alignedChunkData.getLongByValueIndex(originRowIndex,
column));
- break;
- case DOUBLE:
- valueStatistics.update(
- time, alignedChunkData.getDoubleByValueIndex(originRowIndex,
column));
- break;
- default:
- throw new QueryProcessException("Unsupported data type:" +
dataType);
- }
+ switch (dataTypes.get(column)) {
+ case BOOLEAN:
+ for (int row = 0; row < tsBlock.getPositionCount(); row++) {
+ if (!tsBlock.getColumn(column).isNull(row)) {
+ long time = tsBlock.getTimeColumn().getLong(row);
+ valueStatistics.update(time,
tsBlock.getColumn(column).getBoolean(row));
+ }
+ }
+ break;
+ case TEXT:
+ for (int row = 0; row < tsBlock.getPositionCount(); row++) {
+ if (!tsBlock.getColumn(column).isNull(row)) {
+ long time = tsBlock.getTimeColumn().getLong(row);
+ valueStatistics.update(time,
tsBlock.getColumn(column).getBinary(row));
+ }
+ }
+ break;
+ case FLOAT:
+ for (int row = 0; row < tsBlock.getPositionCount(); row++) {
+ if (!tsBlock.getColumn(column).isNull(row)) {
+ long time = tsBlock.getTimeColumn().getLong(row);
+ valueStatistics.update(time,
tsBlock.getColumn(column).getFloat(row));
+ }
+ }
+ break;
+ case INT32:
+ for (int row = 0; row < tsBlock.getPositionCount(); row++) {
+ if (!tsBlock.getColumn(column).isNull(row)) {
+ long time = tsBlock.getTimeColumn().getLong(row);
+ valueStatistics.update(time,
tsBlock.getColumn(column).getInt(row));
+ }
+ }
+ break;
+ case INT64:
+ for (int row = 0; row < tsBlock.getPositionCount(); row++) {
+ if (!tsBlock.getColumn(column).isNull(row)) {
+ long time = tsBlock.getTimeColumn().getLong(row);
+ valueStatistics.update(time,
tsBlock.getColumn(column).getLong(row));
+ }
+ }
+ break;
+ case DOUBLE:
+ for (int row = 0; row < tsBlock.getPositionCount(); row++) {
+ if (!tsBlock.getColumn(column).isNull(row)) {
+ long time = tsBlock.getTimeColumn().getLong(row);
+ valueStatistics.update(time,
tsBlock.getColumn(column).getDouble(row));
+ }
+ }
+ break;
+ default:
+ throw new QueryProcessException("Unsupported data type:" +
dataTypes.get(column));
}
valueStatistics.setEmpty(false);
}
- IChunkMetadata vectorChunkMetadata =
+ IChunkMetadata alignedChunkMetadata =
new AlignedChunkMetadata(timeChunkMetadata, valueChunkMetadataList);
- vectorChunkMetadata.setChunkLoader(new MemAlignedChunkLoader(this));
- vectorChunkMetadata.setVersion(Long.MAX_VALUE);
- cachedMetaData = vectorChunkMetadata;
+ alignedChunkMetadata.setChunkLoader(new MemAlignedChunkLoader(this));
+ alignedChunkMetadata.setVersion(Long.MAX_VALUE);
+ cachedMetaData = alignedChunkMetadata;
+ }
+
+ @Override
+ public boolean isEmpty() throws IOException {
+ return tsBlock.isEmpty();
}
@Override
public IPointReader getPointReader() {
- chunkPointReader =
- chunkData.getAlignedIterator(floatPrecision, encodingList,
chunkDataSize, deletionList);
- return chunkPointReader;
+ return tsBlock.getTsBlockAlignedRowIterator();
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/querycontext/ReadOnlyMemChunk.java
b/server/src/main/java/org/apache/iotdb/db/engine/querycontext/ReadOnlyMemChunk.java
index b24313f4f4..68bedbbd83 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/querycontext/ReadOnlyMemChunk.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/querycontext/ReadOnlyMemChunk.java
@@ -28,8 +28,8 @@ import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
-import org.apache.iotdb.tsfile.read.TimeValuePair;
import org.apache.iotdb.tsfile.read.common.TimeRange;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import org.apache.iotdb.tsfile.read.reader.IPointReader;
import org.slf4j.Logger;
@@ -45,28 +45,17 @@ import java.util.Map;
*/
public class ReadOnlyMemChunk {
- // deletion list for this chunk
- private final List<TimeRange> deletionList;
-
private String measurementUid;
+
private TSDataType dataType;
- private TSEncoding encoding;
private static final Logger logger =
LoggerFactory.getLogger(ReadOnlyMemChunk.class);
- private int floatPrecision =
TSFileDescriptor.getInstance().getConfig().getFloatPrecision();
-
protected IChunkMetadata cachedMetaData;
- private TVList chunkData;
-
- protected IPointReader chunkPointReader;
+ protected TsBlock tsBlock;
- private int chunkDataSize;
-
- public ReadOnlyMemChunk() {
- this.deletionList = null;
- }
+ protected ReadOnlyMemChunk() {}
public ReadOnlyMemChunk(
String measurementUid,
@@ -74,15 +63,14 @@ public class ReadOnlyMemChunk {
TSEncoding encoding,
TVList tvList,
Map<String, String> props,
- int size,
List<TimeRange> deletionList)
throws IOException, QueryProcessException {
this.measurementUid = measurementUid;
this.dataType = dataType;
- this.encoding = encoding;
+ int floatPrecision =
TSFileDescriptor.getInstance().getConfig().getFloatPrecision();
if (props != null && props.containsKey(Encoder.MAX_POINT_NUMBER)) {
try {
- this.floatPrecision =
Integer.parseInt(props.get(Encoder.MAX_POINT_NUMBER));
+ floatPrecision = Integer.parseInt(props.get(Encoder.MAX_POINT_NUMBER));
} catch (NumberFormatException e) {
logger.warn(
"The format of MAX_POINT_NUMBER {} is not correct."
@@ -96,46 +84,47 @@ public class ReadOnlyMemChunk {
floatPrecision =
TSFileDescriptor.getInstance().getConfig().getFloatPrecision();
}
}
-
- this.chunkData = tvList;
- this.chunkDataSize = size;
- this.deletionList = deletionList;
-
- this.chunkPointReader =
- tvList.getIterator(floatPrecision, encoding, chunkDataSize,
deletionList);
- initChunkMeta();
+ this.tsBlock = tvList.buildTsBlock(floatPrecision, encoding, deletionList);
+ initChunkMetaFromTsBlock();
}
- private void initChunkMeta() throws IOException, QueryProcessException {
+ private void initChunkMetaFromTsBlock() throws IOException,
QueryProcessException {
Statistics statsByType = Statistics.getStatsByType(dataType);
IChunkMetadata metaData = new ChunkMetadata(measurementUid, dataType, 0,
statsByType);
if (!isEmpty()) {
- IPointReader iterator =
- chunkData.getIterator(floatPrecision, encoding, chunkDataSize,
deletionList);
- while (iterator.hasNextTimeValuePair()) {
- TimeValuePair timeValuePair = iterator.nextTimeValuePair();
- switch (dataType) {
- case BOOLEAN:
- statsByType.update(timeValuePair.getTimestamp(),
timeValuePair.getValue().getBoolean());
- break;
- case TEXT:
- statsByType.update(timeValuePair.getTimestamp(),
timeValuePair.getValue().getBinary());
- break;
- case FLOAT:
- statsByType.update(timeValuePair.getTimestamp(),
timeValuePair.getValue().getFloat());
- break;
- case INT32:
- statsByType.update(timeValuePair.getTimestamp(),
timeValuePair.getValue().getInt());
- break;
- case INT64:
- statsByType.update(timeValuePair.getTimestamp(),
timeValuePair.getValue().getLong());
- break;
- case DOUBLE:
- statsByType.update(timeValuePair.getTimestamp(),
timeValuePair.getValue().getDouble());
- break;
- default:
- throw new QueryProcessException("Unsupported data type:" +
dataType);
- }
+ switch (dataType) {
+ case BOOLEAN:
+ for (int i = 0; i < tsBlock.getPositionCount(); i++) {
+ statsByType.update(tsBlock.getTimeByIndex(i),
tsBlock.getColumn(0).getBoolean(i));
+ }
+ break;
+ case TEXT:
+ for (int i = 0; i < tsBlock.getPositionCount(); i++) {
+ statsByType.update(tsBlock.getTimeByIndex(i),
tsBlock.getColumn(0).getBinary(i));
+ }
+ break;
+ case FLOAT:
+ for (int i = 0; i < tsBlock.getPositionCount(); i++) {
+ statsByType.update(tsBlock.getTimeByIndex(i),
tsBlock.getColumn(0).getFloat(i));
+ }
+ break;
+ case INT32:
+ for (int i = 0; i < tsBlock.getPositionCount(); i++) {
+ statsByType.update(tsBlock.getTimeByIndex(i),
tsBlock.getColumn(0).getInt(i));
+ }
+ break;
+ case INT64:
+ for (int i = 0; i < tsBlock.getPositionCount(); i++) {
+ statsByType.update(tsBlock.getTimeByIndex(i),
tsBlock.getColumn(0).getLong(i));
+ }
+ break;
+ case DOUBLE:
+ for (int i = 0; i < tsBlock.getPositionCount(); i++) {
+ statsByType.update(tsBlock.getTimeByIndex(i),
tsBlock.getColumn(0).getDouble(i));
+ }
+ break;
+ default:
+ throw new QueryProcessException("Unsupported data type:" + dataType);
}
}
statsByType.setEmpty(isEmpty());
@@ -149,7 +138,7 @@ public class ReadOnlyMemChunk {
}
public boolean isEmpty() throws IOException {
- return !chunkPointReader.hasNextTimeValuePair();
+ return tsBlock.isEmpty();
}
public IChunkMetadata getChunkMetaData() {
@@ -157,11 +146,10 @@ public class ReadOnlyMemChunk {
}
public IPointReader getPointReader() {
- chunkPointReader = chunkData.getIterator(floatPrecision, encoding,
chunkDataSize, deletionList);
- return chunkPointReader;
+ return tsBlock.getTsBlockSingleColumnIterator();
}
- public String getMeasurementUid() {
- return measurementUid;
+ public TsBlock getTsBlock() {
+ return tsBlock;
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/metadata/utils/ResourceByPathUtils.java
b/server/src/main/java/org/apache/iotdb/db/metadata/utils/ResourceByPathUtils.java
index 63a27833ce..b3f46a7968 100644
---
a/server/src/main/java/org/apache/iotdb/db/metadata/utils/ResourceByPathUtils.java
+++
b/server/src/main/java/org/apache/iotdb/db/metadata/utils/ResourceByPathUtils.java
@@ -322,13 +322,11 @@ class AlignedResourceByPathUtils extends
ResourceByPathUtils {
}
// get sorted tv list is synchronized so different query can get right
sorted list reference
TVList alignedTvListCopy =
alignedMemChunk.getSortedTvListForQuery(partialPath.getSchemaList());
- int curSize = alignedTvListCopy.rowCount();
List<List<TimeRange>> deletionList = null;
if (modsToMemtable != null) {
deletionList = constructDeletionList(memTable, modsToMemtable,
timeLowerBound);
}
- return new AlignedReadOnlyMemChunk(
- getMeasurementSchema(), alignedTvListCopy, curSize, deletionList);
+ return new AlignedReadOnlyMemChunk(getMeasurementSchema(),
alignedTvListCopy, deletionList);
}
public VectorMeasurementSchema getMeasurementSchema() {
@@ -539,7 +537,6 @@ class MeasurementResourceByPathUtils extends
ResourceByPathUtils {
memTableMap.get(deviceID).getMemChunkMap().get(partialPath.getMeasurement());
// get sorted tv list is synchronized so different query can get right
sorted list reference
TVList chunkCopy = memChunk.getSortedTvListForQuery();
- int curSize = chunkCopy.rowCount();
List<TimeRange> deletionList = null;
if (modsToMemtable != null) {
deletionList = constructDeletionList(memTable, modsToMemtable,
timeLowerBound);
@@ -550,7 +547,6 @@ class MeasurementResourceByPathUtils extends
ResourceByPathUtils {
partialPath.getMeasurementSchema().getEncodingType(),
chunkCopy,
partialPath.getMeasurementSchema().getProps(),
- curSize,
deletionList);
}
/**
diff --git
a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemAlignedChunkReader.java
b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemAlignedChunkReader.java
index f52602b3a9..65ce131a96 100644
---
a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemAlignedChunkReader.java
+++
b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemAlignedChunkReader.java
@@ -20,82 +20,38 @@ package org.apache.iotdb.db.query.reader.chunk;
import org.apache.iotdb.db.engine.querycontext.AlignedReadOnlyMemChunk;
import org.apache.iotdb.tsfile.file.metadata.AlignedChunkMetadata;
-import org.apache.iotdb.tsfile.read.TimeValuePair;
import org.apache.iotdb.tsfile.read.common.BatchData;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.read.reader.IChunkReader;
import org.apache.iotdb.tsfile.read.reader.IPageReader;
-import org.apache.iotdb.tsfile.read.reader.IPointReader;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
/** To read aligned chunk data in memory */
-public class MemAlignedChunkReader implements IChunkReader, IPointReader {
+public class MemAlignedChunkReader implements IChunkReader {
- private IPointReader timeValuePairIterator;
- private Filter filter;
- private boolean hasCachedTimeValuePair;
- private TimeValuePair cachedTimeValuePair;
private List<IPageReader> pageReaderList;
public MemAlignedChunkReader(AlignedReadOnlyMemChunk readableChunk, Filter
filter) {
- timeValuePairIterator = readableChunk.getPointReader();
- this.filter = filter;
// we treat one ReadOnlyMemChunk as one Page
this.pageReaderList =
Collections.singletonList(
new MemAlignedPageReader(
- timeValuePairIterator,
+ readableChunk.getTsBlock(),
(AlignedChunkMetadata) readableChunk.getChunkMetaData(),
filter));
}
- @Override
- public boolean hasNextTimeValuePair() throws IOException {
- if (hasCachedTimeValuePair) {
- return true;
- }
- while (timeValuePairIterator.hasNextTimeValuePair()) {
- TimeValuePair timeValuePair = timeValuePairIterator.nextTimeValuePair();
- if (filter == null
- || filter.satisfy(timeValuePair.getTimestamp(),
timeValuePair.getValue().getValue())) {
- hasCachedTimeValuePair = true;
- cachedTimeValuePair = timeValuePair;
- break;
- }
- }
- return hasCachedTimeValuePair;
- }
-
- @Override
- public TimeValuePair nextTimeValuePair() throws IOException {
- if (hasCachedTimeValuePair) {
- hasCachedTimeValuePair = false;
- return cachedTimeValuePair;
- } else {
- return timeValuePairIterator.nextTimeValuePair();
- }
- }
-
- @Override
- public TimeValuePair currentTimeValuePair() throws IOException {
- if (!hasCachedTimeValuePair) {
- cachedTimeValuePair = timeValuePairIterator.nextTimeValuePair();
- hasCachedTimeValuePair = true;
- }
- return cachedTimeValuePair;
- }
-
@Override
public boolean hasNextSatisfiedPage() throws IOException {
- return hasNextTimeValuePair();
+ throw new IOException("mem chunk reader does not support this method");
}
@Override
public BatchData nextPageData() throws IOException {
- return pageReaderList.remove(0).getAllSatisfiedPageData();
+ throw new IOException("mem chunk reader does not support this method");
}
@Override
diff --git
a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemAlignedPageReader.java
b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemAlignedPageReader.java
index f0f1134d3b..30162e7c82 100644
---
a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemAlignedPageReader.java
+++
b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemAlignedPageReader.java
@@ -22,16 +22,16 @@ import
org.apache.iotdb.tsfile.file.metadata.AlignedChunkMetadata;
import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
-import org.apache.iotdb.tsfile.read.TimeValuePair;
import org.apache.iotdb.tsfile.read.common.BatchData;
import org.apache.iotdb.tsfile.read.common.BatchDataFactory;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.read.filter.operator.AndFilter;
import org.apache.iotdb.tsfile.read.reader.IAlignedPageReader;
import org.apache.iotdb.tsfile.read.reader.IPageReader;
-import org.apache.iotdb.tsfile.read.reader.IPointReader;
import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
import java.io.IOException;
@@ -39,13 +39,12 @@ import java.util.stream.Collectors;
public class MemAlignedPageReader implements IPageReader, IAlignedPageReader {
- private final IPointReader timeValuePairIterator;
+ private final TsBlock tsBlock;
private final AlignedChunkMetadata chunkMetadata;
private Filter valueFilter;
- public MemAlignedPageReader(
- IPointReader timeValuePairIterator, AlignedChunkMetadata chunkMetadata,
Filter filter) {
- this.timeValuePairIterator = timeValuePairIterator;
+ public MemAlignedPageReader(TsBlock tsBlock, AlignedChunkMetadata
chunkMetadata, Filter filter) {
+ this.tsBlock = tsBlock;
this.chunkMetadata = chunkMetadata;
this.valueFilter = filter;
}
@@ -58,14 +57,12 @@ public class MemAlignedPageReader implements IPageReader,
IAlignedPageReader {
@Override
public BatchData getAllSatisfiedPageData(boolean ascending) throws
IOException {
BatchData batchData = BatchDataFactory.createBatchData(TSDataType.VECTOR,
ascending, false);
- while (timeValuePairIterator.hasNextTimeValuePair()) {
- TimeValuePair timeValuePair = timeValuePairIterator.nextTimeValuePair();
- TsPrimitiveType[] values = timeValuePair.getValue().getVector();
+ for (int row = 0; row < tsBlock.getPositionCount(); row++) {
// save the first not null value of each row
Object firstNotNullObject = null;
- for (TsPrimitiveType value : values) {
- if (value != null) {
- firstNotNullObject = value.getValue();
+ for (int column = 0; column < tsBlock.getValueColumnCount(); column++) {
+ if (!tsBlock.getColumn(column).isNull(row)) {
+ firstNotNullObject = tsBlock.getColumn(column).getObject(row);
break;
}
}
@@ -75,44 +72,59 @@ public class MemAlignedPageReader implements IPageReader,
IAlignedPageReader {
// accept AlignedPath with only one sub sensor
if (firstNotNullObject != null
&& (valueFilter == null
- || valueFilter.satisfy(timeValuePair.getTimestamp(),
firstNotNullObject))) {
- batchData.putVector(timeValuePair.getTimestamp(), values);
+ || valueFilter.satisfy(tsBlock.getTimeByIndex(row),
firstNotNullObject))) {
+ TsPrimitiveType[] values = new
TsPrimitiveType[tsBlock.getValueColumnCount()];
+ for (int column = 0; column < tsBlock.getValueColumnCount(); column++)
{
+ if (tsBlock.getColumn(column) != null &&
!tsBlock.getColumn(column).isNull(row)) {
+ values[column] = tsBlock.getColumn(column).getTsPrimitiveType(row);
+ }
+ }
+ batchData.putVector(tsBlock.getTimeByIndex(row), values);
}
}
return batchData.flip();
}
@Override
- public TsBlock getAllSatisfiedData() throws IOException {
- // TODO change from the row-based style to column-based style
+ public TsBlock getAllSatisfiedData() {
TsBlockBuilder builder =
new TsBlockBuilder(
chunkMetadata.getValueChunkMetadataList().stream()
.map(IChunkMetadata::getDataType)
.collect(Collectors.toList()));
- while (timeValuePairIterator.hasNextTimeValuePair()) {
- TimeValuePair timeValuePair = timeValuePairIterator.nextTimeValuePair();
- TsPrimitiveType[] values = timeValuePair.getValue().getVector();
- // save the first not null value of each row
- Object firstNotNullObject = null;
- for (TsPrimitiveType value : values) {
- if (value != null) {
- firstNotNullObject = value.getValue();
- break;
- }
+
+ boolean[] satisfyInfo = new boolean[tsBlock.getPositionCount()];
+
+ // Time column and first value column
+ // if all the sub sensors' value are null in current time
+ // or current row is not satisfied with the filter, just discard it
+ // currently, if it's a value filter, it will only
+ // accept AlignedPath with only one sub sensor
+ TimeColumn timeColumn = tsBlock.getTimeColumn();
+ Column valueColumn = tsBlock.getColumn(0);
+ for (int row = 0; row < tsBlock.getPositionCount(); row++) {
+ long time = tsBlock.getTimeByIndex(row);
+ Object value = tsBlock.getColumn(0).getObject(row);
+ if (!tsBlock.getColumn(0).isNull(row)
+ && (valueFilter == null || valueFilter.satisfy(time, value))) {
+ builder.getTimeColumnBuilder().write(timeColumn, row);
+ builder.getColumnBuilder(0).write(valueColumn, row);
+ satisfyInfo[row] = true;
+ builder.declarePosition();
}
- // if all the sub sensors' value are null in current time
- // or current row is not satisfied with the filter, just discard it
- // TODO fix value filter firstNotNullObject, currently, if it's a value
filter, it will only
- // accept AlignedPath with only one sub sensor
- if (firstNotNullObject != null
- && (valueFilter == null
- || valueFilter.satisfy(timeValuePair.getTimestamp(),
firstNotNullObject))) {
- builder.getTimeColumnBuilder().writeLong(timeValuePair.getTimestamp());
- for (int i = 0; i < values.length; i++) {
- builder.getColumnBuilder(i).writeTsPrimitiveType(values[i]);
+ }
+
+ // other value column
+ for (int column = 1; column < tsBlock.getValueColumnCount(); column++) {
+ valueColumn = tsBlock.getColumn(column);
+ for (int row = 0; row < tsBlock.getPositionCount(); row++) {
+ if (satisfyInfo[row]) {
+ if (!tsBlock.getColumn(column).isNull(row)) {
+ builder.getColumnBuilder(column).write(valueColumn, row);
+ } else {
+ builder.getColumnBuilder(column).appendNull();
+ }
}
- builder.declarePosition();
}
}
return builder.build();
diff --git
a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemChunkReader.java
b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemChunkReader.java
index 7f20c439fb..2d8394f4af 100644
---
a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemChunkReader.java
+++
b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemChunkReader.java
@@ -45,7 +45,8 @@ public class MemChunkReader implements IChunkReader,
IPointReader {
// we treat one ReadOnlyMemChunk as one Page
this.pageReaderList =
Collections.singletonList(
- new MemPageReader(timeValuePairIterator,
readableChunk.getChunkMetaData(), filter));
+ new MemPageReader(
+ readableChunk.getTsBlock(), readableChunk.getChunkMetaData(),
filter));
}
@Override
diff --git
a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemPageReader.java
b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemPageReader.java
index 2032314ea0..9d1d9822e1 100644
---
a/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemPageReader.java
+++
b/server/src/main/java/org/apache/iotdb/db/query/reader/chunk/MemPageReader.java
@@ -22,7 +22,6 @@ import
org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
-import org.apache.iotdb.tsfile.read.TimeValuePair;
import org.apache.iotdb.tsfile.read.common.BatchData;
import org.apache.iotdb.tsfile.read.common.BatchDataFactory;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
@@ -32,20 +31,19 @@ import
org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.read.filter.operator.AndFilter;
import org.apache.iotdb.tsfile.read.reader.IPageReader;
-import org.apache.iotdb.tsfile.read.reader.IPointReader;
+import org.apache.iotdb.tsfile.utils.Binary;
import java.io.IOException;
import java.util.Collections;
public class MemPageReader implements IPageReader {
- private final IPointReader timeValuePairIterator;
+ private final TsBlock tsBlock;
private final IChunkMetadata chunkMetadata;
private Filter valueFilter;
- public MemPageReader(
- IPointReader timeValuePairIterator, IChunkMetadata chunkMetadata, Filter
filter) {
- this.timeValuePairIterator = timeValuePairIterator;
+ public MemPageReader(TsBlock tsBlock, IChunkMetadata chunkMetadata, Filter
filter) {
+ this.tsBlock = tsBlock;
this.chunkMetadata = chunkMetadata;
this.valueFilter = filter;
}
@@ -54,92 +52,110 @@ public class MemPageReader implements IPageReader {
public BatchData getAllSatisfiedPageData(boolean ascending) throws
IOException {
TSDataType dataType = chunkMetadata.getDataType();
BatchData batchData = BatchDataFactory.createBatchData(dataType,
ascending, false);
- while (timeValuePairIterator.hasNextTimeValuePair()) {
- TimeValuePair timeValuePair = timeValuePairIterator.nextTimeValuePair();
+ for (int i = 0; i < tsBlock.getPositionCount(); i++) {
if (valueFilter == null
|| valueFilter.satisfy(
- timeValuePair.getTimestamp(),
timeValuePair.getValue().getValue())) {
- batchData.putAnObject(timeValuePair.getTimestamp(),
timeValuePair.getValue().getValue());
+ tsBlock.getTimeColumn().getLong(i),
tsBlock.getColumn(0).getObject(i))) {
+ switch (dataType) {
+ case BOOLEAN:
+ batchData.putBoolean(
+ tsBlock.getTimeColumn().getLong(i),
tsBlock.getColumn(0).getBoolean(i));
+ break;
+ case INT32:
+ batchData.putInt(tsBlock.getTimeColumn().getLong(i),
tsBlock.getColumn(0).getInt(i));
+ break;
+ case INT64:
+ batchData.putLong(tsBlock.getTimeColumn().getLong(i),
tsBlock.getColumn(0).getLong(i));
+ break;
+ case DOUBLE:
+ batchData.putDouble(
+ tsBlock.getTimeColumn().getLong(i),
tsBlock.getColumn(0).getDouble(i));
+ break;
+ case FLOAT:
+ batchData.putFloat(
+ tsBlock.getTimeColumn().getLong(i),
tsBlock.getColumn(0).getFloat(i));
+ break;
+ case TEXT:
+ batchData.putBinary(
+ tsBlock.getTimeColumn().getLong(i),
tsBlock.getColumn(0).getBinary(i));
+ break;
+ default:
+ throw new UnSupportedDataTypeException(String.valueOf(dataType));
+ }
}
}
return batchData.flip();
}
@Override
- public TsBlock getAllSatisfiedData() throws IOException {
+ public TsBlock getAllSatisfiedData() {
TSDataType dataType = chunkMetadata.getDataType();
TsBlockBuilder builder = new
TsBlockBuilder(Collections.singletonList(dataType));
TimeColumnBuilder timeBuilder = builder.getTimeColumnBuilder();
ColumnBuilder valueBuilder = builder.getColumnBuilder(0);
switch (dataType) {
case BOOLEAN:
- while (timeValuePairIterator.hasNextTimeValuePair()) {
- TimeValuePair timeValuePair =
timeValuePairIterator.nextTimeValuePair();
- if (valueFilter == null
- || valueFilter.satisfy(
- timeValuePair.getTimestamp(),
timeValuePair.getValue().getValue())) {
- timeBuilder.writeLong(timeValuePair.getTimestamp());
- valueBuilder.writeBoolean(timeValuePair.getValue().getBoolean());
+ for (int i = 0; i < tsBlock.getPositionCount(); i++) {
+ long time = tsBlock.getTimeColumn().getLong(i);
+ boolean value = tsBlock.getColumn(0).getBoolean(i);
+ if (valueFilter == null || valueFilter.satisfy(time, value)) {
+ timeBuilder.writeLong(time);
+ valueBuilder.writeBoolean(value);
builder.declarePosition();
}
}
break;
case INT32:
- while (timeValuePairIterator.hasNextTimeValuePair()) {
- TimeValuePair timeValuePair =
timeValuePairIterator.nextTimeValuePair();
- if (valueFilter == null
- || valueFilter.satisfy(
- timeValuePair.getTimestamp(),
timeValuePair.getValue().getValue())) {
- timeBuilder.writeLong(timeValuePair.getTimestamp());
- valueBuilder.writeInt(timeValuePair.getValue().getInt());
+ for (int i = 0; i < tsBlock.getPositionCount(); i++) {
+ long time = tsBlock.getTimeColumn().getLong(i);
+ int value = tsBlock.getColumn(0).getInt(i);
+ if (valueFilter == null || valueFilter.satisfy(time, value)) {
+ timeBuilder.writeLong(time);
+ valueBuilder.writeInt(value);
builder.declarePosition();
}
}
break;
case INT64:
- while (timeValuePairIterator.hasNextTimeValuePair()) {
- TimeValuePair timeValuePair =
timeValuePairIterator.nextTimeValuePair();
- if (valueFilter == null
- || valueFilter.satisfy(
- timeValuePair.getTimestamp(),
timeValuePair.getValue().getValue())) {
- timeBuilder.writeLong(timeValuePair.getTimestamp());
- valueBuilder.writeLong(timeValuePair.getValue().getLong());
+ for (int i = 0; i < tsBlock.getPositionCount(); i++) {
+ long time = tsBlock.getTimeColumn().getLong(i);
+ long value = tsBlock.getColumn(0).getLong(i);
+ if (valueFilter == null || valueFilter.satisfy(time, value)) {
+ timeBuilder.writeLong(time);
+ valueBuilder.writeLong(value);
builder.declarePosition();
}
}
break;
case FLOAT:
- while (timeValuePairIterator.hasNextTimeValuePair()) {
- TimeValuePair timeValuePair =
timeValuePairIterator.nextTimeValuePair();
- if (valueFilter == null
- || valueFilter.satisfy(
- timeValuePair.getTimestamp(),
timeValuePair.getValue().getValue())) {
- timeBuilder.writeLong(timeValuePair.getTimestamp());
- valueBuilder.writeFloat(timeValuePair.getValue().getFloat());
+ for (int i = 0; i < tsBlock.getPositionCount(); i++) {
+ long time = tsBlock.getTimeColumn().getLong(i);
+ float value = tsBlock.getColumn(0).getFloat(i);
+ if (valueFilter == null || valueFilter.satisfy(time, value)) {
+ timeBuilder.writeLong(time);
+ valueBuilder.writeFloat(value);
builder.declarePosition();
}
}
break;
case DOUBLE:
- while (timeValuePairIterator.hasNextTimeValuePair()) {
- TimeValuePair timeValuePair =
timeValuePairIterator.nextTimeValuePair();
- if (valueFilter == null
- || valueFilter.satisfy(
- timeValuePair.getTimestamp(),
timeValuePair.getValue().getValue())) {
- timeBuilder.writeLong(timeValuePair.getTimestamp());
- valueBuilder.writeDouble(timeValuePair.getValue().getDouble());
+ for (int i = 0; i < tsBlock.getPositionCount(); i++) {
+ long time = tsBlock.getTimeColumn().getLong(i);
+ double value = tsBlock.getColumn(0).getDouble(i);
+ if (valueFilter == null || valueFilter.satisfy(time, value)) {
+ timeBuilder.writeLong(time);
+ valueBuilder.writeDouble(value);
builder.declarePosition();
}
}
break;
case TEXT:
- while (timeValuePairIterator.hasNextTimeValuePair()) {
- TimeValuePair timeValuePair =
timeValuePairIterator.nextTimeValuePair();
- if (valueFilter == null
- || valueFilter.satisfy(
- timeValuePair.getTimestamp(),
timeValuePair.getValue().getValue())) {
- timeBuilder.writeLong(timeValuePair.getTimestamp());
- valueBuilder.writeBinary(timeValuePair.getValue().getBinary());
+ for (int i = 0; i < tsBlock.getPositionCount(); i++) {
+ long time = tsBlock.getTimeColumn().getLong(i);
+ Binary value = tsBlock.getColumn(0).getBinary(i);
+ if (valueFilter == null || valueFilter.satisfy(time, value)) {
+ timeBuilder.writeLong(time);
+ valueBuilder.writeBinary(value);
builder.declarePosition();
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
index e8d4af943e..d827d48d2f 100644
---
a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
+++
b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
@@ -19,7 +19,6 @@
package org.apache.iotdb.db.utils.datastructure;
-import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.rescon.PrimitiveArrayManager;
import org.apache.iotdb.db.utils.MathUtils;
import org.apache.iotdb.db.wal.buffer.IWALByteBufferView;
@@ -28,7 +27,10 @@ import
org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.read.TimeValuePair;
import org.apache.iotdb.tsfile.read.common.TimeRange;
-import org.apache.iotdb.tsfile.read.reader.IPointReader;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.iotdb.tsfile.utils.BitMap;
import org.apache.iotdb.tsfile.utils.Pair;
@@ -39,6 +41,7 @@ import java.io.DataInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import java.util.Objects;
import static org.apache.iotdb.db.rescon.PrimitiveArrayManager.ARRAY_SIZE;
import static
org.apache.iotdb.tsfile.utils.RamUsageEstimator.NUM_BYTES_ARRAY_HEADER;
@@ -47,7 +50,7 @@ import static
org.apache.iotdb.tsfile.utils.RamUsageEstimator.NUM_BYTES_OBJECT_R
public class AlignedTVList extends TVList {
private static final int NULL_FLAG = -1;
- // data types of this aligned tvlist
+ // data types of this aligned tvList
private List<TSDataType> dataTypes;
// data type list -> list of TVList, add 1 when expanded -> primitive array
of basic type
@@ -78,8 +81,8 @@ public class AlignedTVList extends TVList {
}
}
- public static AlignedTVList newAlignedList(List<TSDataType> datatypes) {
- return new AlignedTVList(datatypes);
+ public static AlignedTVList newAlignedList(List<TSDataType> dataTypes) {
+ return new AlignedTVList(dataTypes);
}
@SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity
warning
@@ -154,22 +157,6 @@ public class AlignedTVList extends TVList {
return getAlignedValueByValueIndex(valueIndex, null, floatPrecision,
encodingList);
}
- public TsPrimitiveType getAlignedValue(
- List<Integer> timeDuplicatedIndexList,
- Integer floatPrecision,
- List<TSEncoding> encodingList) {
- int[] validIndexesForTimeDuplicatedRows = new int[values.size()];
- for (int i = 0; i < values.size(); i++) {
- validIndexesForTimeDuplicatedRows[i] =
- getValidRowIndexForTimeDuplicatedRows(timeDuplicatedIndexList, i);
- }
- return getAlignedValueByValueIndex(
- timeDuplicatedIndexList.get(timeDuplicatedIndexList.size() - 1),
- validIndexesForTimeDuplicatedRows,
- floatPrecision,
- encodingList);
- }
-
private TsPrimitiveType getAlignedValueByValueIndex(
int valueIndex,
int[] validIndexesForTimeDuplicatedRows,
@@ -189,7 +176,7 @@ public class AlignedTVList extends TVList {
}
int arrayIndex = validValueIndex / ARRAY_SIZE;
int elementIndex = validValueIndex % ARRAY_SIZE;
- if (columnValues == null || isValueMarked(validValueIndex, columnIndex))
{
+ if (columnValues == null || isNullValue(validValueIndex, columnIndex)) {
continue;
}
switch (dataTypes.get(columnIndex)) {
@@ -239,17 +226,14 @@ public class AlignedTVList extends TVList {
}
@Override
- public TVList getTvListByColumnIndex(List<Integer> columnIndex) {
- List<TSDataType> types = new ArrayList<>();
+ public TVList getTvListByColumnIndex(List<Integer> columnIndex,
List<TSDataType> dataTypeList) {
List<List<Object>> values = new ArrayList<>();
List<List<BitMap>> bitMaps = null;
for (int i = 0; i < columnIndex.size(); i++) {
// columnIndex == -1 means querying a non-exist column, add null column
here
if (columnIndex.get(i) == -1) {
- types.add(null);
values.add(null);
} else {
- types.add(this.dataTypes.get(columnIndex.get(i)));
values.add(this.values.get(columnIndex.get(i)));
if (this.bitMaps != null && this.bitMaps.get(columnIndex.get(i)) !=
null) {
if (bitMaps == null) {
@@ -262,7 +246,7 @@ public class AlignedTVList extends TVList {
}
}
}
- AlignedTVList alignedTvList = new AlignedTVList(types);
+ AlignedTVList alignedTvList = new AlignedTVList(dataTypeList);
alignedTvList.timestamps = this.timestamps;
alignedTvList.indices = this.indices;
alignedTvList.values = values;
@@ -412,16 +396,19 @@ public class AlignedTVList extends TVList {
}
/**
- * Get whether value is marked at the given position in VectorTvList.
+ * Get whether value is null at the given position in AlignedTvList.
*
* @param rowIndex value index inside this column
* @param columnIndex index of the column
* @return boolean
*/
- public boolean isValueMarked(int rowIndex, int columnIndex) {
+ public boolean isNullValue(int rowIndex, int columnIndex) {
if (rowIndex >= rowCount) {
return false;
}
+ if (values.get(columnIndex) == null) {
+ return true;
+ }
if (bitMaps == null
|| bitMaps.get(columnIndex) == null
|| bitMaps.get(columnIndex).get(rowIndex / ARRAY_SIZE) == null) {
@@ -697,7 +684,7 @@ public class AlignedTVList extends TVList {
List<Integer> timeDuplicatedOriginRowIndexList, int columnIndex) {
int validRowIndex = timeDuplicatedOriginRowIndexList.get(0);
for (int originRowIndex : timeDuplicatedOriginRowIndexList) {
- if (!isValueMarked(originRowIndex, columnIndex)) {
+ if (!isNullValue(originRowIndex, columnIndex)) {
validRowIndex = originRowIndex;
}
}
@@ -721,11 +708,6 @@ public class AlignedTVList extends TVList {
time, (TsPrimitiveType) getAlignedValueForQuery(index, floatPrecision,
encodingList));
}
- public TimeValuePair getTimeValuePairForTimeDuplicatedRows(
- List<Integer> indexList, long time, Integer floatPrecision,
List<TSEncoding> encodingList) {
- return new TimeValuePair(time, getAlignedValue(indexList, floatPrecision,
encodingList));
- }
-
@Override
protected void releaseLastValueArray() {
PrimitiveArrayManager.release(indices.remove(indices.size() - 1));
@@ -886,9 +868,9 @@ public class AlignedTVList extends TVList {
// index array mem size
size += (long) PrimitiveArrayManager.ARRAY_SIZE * 4L;
// array headers mem size
- size += NUM_BYTES_ARRAY_HEADER * (2 + types.length);
+ size += (long) NUM_BYTES_ARRAY_HEADER * (2 + types.length);
// Object references size in ArrayList
- size += NUM_BYTES_OBJECT_REF * (2 + types.length);
+ size += (long) NUM_BYTES_OBJECT_REF * (2 + types.length);
return size;
}
@@ -903,111 +885,112 @@ public class AlignedTVList extends TVList {
clearSortedValue();
}
- @Override
- @TestOnly
- public IPointReader getIterator() {
- return new AlignedIte();
- }
-
- @Override
- public IPointReader getIterator(
- int floatPrecision, TSEncoding encoding, int size, List<TimeRange>
deletionList) {
- throw new UnsupportedOperationException(ERR_DATATYPE_NOT_CONSISTENT);
- }
-
- public IPointReader getAlignedIterator(
- int floatPrecision,
- List<TSEncoding> encodingList,
- int size,
- List<List<TimeRange>> deletionList) {
- return new AlignedIte(floatPrecision, encodingList, size, deletionList);
- }
-
- private class AlignedIte extends Ite {
-
- private List<TSEncoding> encodingList;
- private int[] deleteCursors;
- /** this field is effective only in the AlignedTvlist in a
AlignedRealOnlyMemChunk. */
- private List<List<TimeRange>> deletionList;
-
- public AlignedIte() {
- super();
- }
-
- public AlignedIte(
- int floatPrecision,
- List<TSEncoding> encodingList,
- int size,
- List<List<TimeRange>> deletionList) {
- super(floatPrecision, null, size, null);
- this.encodingList = encodingList;
- this.deletionList = deletionList;
- if (deletionList != null) {
- deleteCursors = new int[deletionList.size()];
+ /** Build TsBlock by column. */
+ public TsBlock buildTsBlock(
+ int floatPrecision, List<TSEncoding> encodingList, List<List<TimeRange>>
deletionList) {
+ TsBlockBuilder builder = new TsBlockBuilder(dataTypes);
+ // Time column
+ TimeColumnBuilder timeBuilder = builder.getTimeColumnBuilder();
+ int validRowCount = 0;
+ boolean[] timeDuplicateInfo = null;
+ // time column
+ for (int sortedRowIndex = 0; sortedRowIndex < rowCount; sortedRowIndex++) {
+ if (sortedRowIndex == rowCount - 1
+ || getTime(sortedRowIndex) != getTime(sortedRowIndex + 1)) {
+ timeBuilder.writeLong(getTime(sortedRowIndex));
+ validRowCount++;
+ } else {
+ if (Objects.isNull(timeDuplicateInfo)) {
+ timeDuplicateInfo = new boolean[rowCount];
+ }
+ timeDuplicateInfo[sortedRowIndex] = true;
}
}
- @Override
- public boolean hasNextTimeValuePair() {
- if (hasCachedPair) {
- return true;
+ // value columns
+ for (int columnIndex = 0; columnIndex < dataTypes.size(); columnIndex++) {
+ int deleteCursor = 0;
+ // Pair of Time and Index
+ Pair<Long, Integer> lastValidPointIndexForTimeDupCheck = null;
+ if (Objects.nonNull(timeDuplicateInfo)) {
+ lastValidPointIndexForTimeDupCheck = new Pair<>(Long.MIN_VALUE, null);
}
-
- List<Integer> timeDuplicatedAlignedRowIndexList = null;
- while (cur < iteSize) {
- long time = getTime(cur);
- if (cur + 1 < rowCount() && (time == getTime(cur + 1))) {
- if (timeDuplicatedAlignedRowIndexList == null) {
- timeDuplicatedAlignedRowIndexList = new ArrayList<>();
- timeDuplicatedAlignedRowIndexList.add(getValueIndex(cur));
+ ColumnBuilder valueBuilder = builder.getColumnBuilder(columnIndex);
+ for (int sortedRowIndex = 0; sortedRowIndex < rowCount;
sortedRowIndex++) {
+ // skip time duplicated rows
+ if (Objects.nonNull(timeDuplicateInfo)) {
+ if (!isNullValue(getValueIndex(sortedRowIndex), columnIndex)) {
+ lastValidPointIndexForTimeDupCheck.left = getTime(sortedRowIndex);
+ lastValidPointIndexForTimeDupCheck.right =
getValueIndex(sortedRowIndex);
+ }
+ if (timeDuplicateInfo[sortedRowIndex]) {
+ continue;
}
- timeDuplicatedAlignedRowIndexList.add(getValueIndex(cur + 1));
- cur++;
- continue;
}
- TimeValuePair tvPair;
- if (timeDuplicatedAlignedRowIndexList != null) {
- tvPair =
- getTimeValuePairForTimeDuplicatedRows(
- timeDuplicatedAlignedRowIndexList, time, floatPrecision,
encodingList);
- timeDuplicatedAlignedRowIndexList = null;
+ // The part of code solves the following problem:
+ // Time: 1,2,2,3
+ // Value: 1,2,null,null
+ // When rowIndex:1, pair(min,null), timeDuplicateInfo:false,
write(T:1,V:1)
+ // When rowIndex:2, pair(2,2), timeDuplicateInfo:true, skip writing
value
+ // When rowIndex:3, pair(2,2), timeDuplicateInfo:false,
T:2!=air.left:2, write(T:2,V:2)
+ // When rowIndex:4, pair(2,2), timeDuplicateInfo:false,
T:3!=pair.left:2, write(T:3,V:null)
+ int originRowIndex;
+ if (Objects.nonNull(lastValidPointIndexForTimeDupCheck)
+ && (getTime(sortedRowIndex) ==
lastValidPointIndexForTimeDupCheck.left)) {
+ originRowIndex = lastValidPointIndexForTimeDupCheck.right;
} else {
- tvPair = getTimeValuePair(cur, time, floatPrecision, encodingList);
+ originRowIndex = getValueIndex(sortedRowIndex);
}
- cur++;
- if (deletePointsInDeletionList(time, tvPair)) {
+ if (isNullValue(originRowIndex, columnIndex)
+ || isPointDeleted(
+ getTime(sortedRowIndex),
+ Objects.isNull(deletionList) ? null :
deletionList.get(columnIndex),
+ deleteCursor)) {
+ valueBuilder.appendNull();
continue;
}
- if (tvPair.getValue() != null) {
- cachedTimeValuePair = tvPair;
- hasCachedPair = true;
- return true;
- }
- }
-
- return false;
- }
-
- private boolean deletePointsInDeletionList(long timestamp, TimeValuePair
tvPair) {
- if (deletionList == null) {
- return false;
- }
- boolean deletedAll = true;
- for (int i = 0; i < deleteCursors.length; i++) {
- while (deletionList.get(i) != null && deleteCursors[i] <
deletionList.get(i).size()) {
- if (deletionList.get(i).get(deleteCursors[i]).contains(timestamp)) {
- tvPair.getValue().getVector()[i] = null;
+ switch (dataTypes.get(columnIndex)) {
+ case BOOLEAN:
+ valueBuilder.writeBoolean(getBooleanByValueIndex(originRowIndex,
columnIndex));
break;
- } else if (deletionList.get(i).get(deleteCursors[i]).getMax() <
timestamp) {
- deleteCursors[i]++;
- } else {
- deletedAll = false;
+ case INT32:
+ valueBuilder.writeInt(getIntByValueIndex(originRowIndex,
columnIndex));
+ break;
+ case INT64:
+ valueBuilder.writeLong(getLongByValueIndex(originRowIndex,
columnIndex));
+ break;
+ case FLOAT:
+ valueBuilder.writeFloat(
+ roundValueWithGivenPrecision(
+ getFloatByValueIndex(originRowIndex, columnIndex),
+ floatPrecision,
+ encodingList.get(columnIndex)));
+ break;
+ case DOUBLE:
+ valueBuilder.writeDouble(
+ roundValueWithGivenPrecision(
+ getDoubleByValueIndex(originRowIndex, columnIndex),
+ floatPrecision,
+ encodingList.get(columnIndex)));
+ break;
+ case TEXT:
+ valueBuilder.writeBinary(getBinaryByValueIndex(originRowIndex,
columnIndex));
+ break;
+ default:
break;
- }
}
}
- return deletedAll;
}
+ builder.declarePositions(validRowCount);
+ return builder.build();
+ }
+
+ protected void writeValidValuesIntoTsBlock(
+ TsBlockBuilder builder,
+ int floatPrecision,
+ TSEncoding encoding,
+ List<TimeRange> deletionList) {
+ throw new UnsupportedOperationException(ERR_DATATYPE_NOT_CONSISTENT);
}
@Override
@@ -1095,7 +1078,7 @@ public class AlignedTVList extends TVList {
throw new
UnsupportedOperationException(ERR_DATATYPE_NOT_CONSISTENT);
}
// bitmap
- WALWriteUtils.write(isValueMarked(rowIndex, columnIndex), buffer);
+ WALWriteUtils.write(isNullValue(rowIndex, columnIndex), buffer);
}
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/BinaryTVList.java
b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/BinaryTVList.java
index 4a711a7b0f..b7e3dbf197 100644
---
a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/BinaryTVList.java
+++
b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/BinaryTVList.java
@@ -24,6 +24,8 @@ import org.apache.iotdb.db.wal.utils.WALWriteUtils;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.read.TimeValuePair;
+import org.apache.iotdb.tsfile.read.common.TimeRange;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.iotdb.tsfile.utils.BitMap;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
@@ -196,6 +198,23 @@ public class BinaryTVList extends TVList {
return new TimeValuePair(time, TsPrimitiveType.getByType(TSDataType.TEXT,
getBinary(index)));
}
+ @Override
+ protected void writeValidValuesIntoTsBlock(
+ TsBlockBuilder builder,
+ int floatPrecision,
+ TSEncoding encoding,
+ List<TimeRange> deletionList) {
+ Integer deleteCursor = 0;
+ for (int i = 0; i < rowCount; i++) {
+ if (!isPointDeleted(getTime(i), deletionList, deleteCursor)
+ && (i == rowCount - 1 || getTime(i) != getTime(i + 1))) {
+ builder.getTimeColumnBuilder().writeLong(getTime(i));
+ builder.getColumnBuilder(0).writeBinary(getBinary(i));
+ builder.declarePosition();
+ }
+ }
+ }
+
@Override
protected void releaseLastValueArray() {
PrimitiveArrayManager.release(values.remove(values.size() - 1));
diff --git
a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/BooleanTVList.java
b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/BooleanTVList.java
index 2b15a4e286..8ebeb189db 100644
---
a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/BooleanTVList.java
+++
b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/BooleanTVList.java
@@ -24,6 +24,8 @@ import org.apache.iotdb.db.wal.utils.WALWriteUtils;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.read.TimeValuePair;
+import org.apache.iotdb.tsfile.read.common.TimeRange;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
import org.apache.iotdb.tsfile.utils.BitMap;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
@@ -196,6 +198,23 @@ public class BooleanTVList extends TVList {
time, TsPrimitiveType.getByType(TSDataType.BOOLEAN,
getBoolean(index)));
}
+ @Override
+ protected void writeValidValuesIntoTsBlock(
+ TsBlockBuilder builder,
+ int floatPrecision,
+ TSEncoding encoding,
+ List<TimeRange> deletionList) {
+ Integer deleteCursor = 0;
+ for (int i = 0; i < rowCount; i++) {
+ if (!isPointDeleted(getTime(i), deletionList, deleteCursor)
+ && (i == rowCount - 1 || getTime(i) != getTime(i + 1))) {
+ builder.getTimeColumnBuilder().writeLong(getTime(i));
+ builder.getColumnBuilder(0).writeBoolean(getBoolean(i));
+ builder.declarePosition();
+ }
+ }
+ }
+
@Override
protected void releaseLastValueArray() {
PrimitiveArrayManager.release(values.remove(values.size() - 1));
diff --git
a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/DoubleTVList.java
b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/DoubleTVList.java
index 3fe9f016d6..9f4295054b 100644
---
a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/DoubleTVList.java
+++
b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/DoubleTVList.java
@@ -25,6 +25,8 @@ import org.apache.iotdb.db.wal.utils.WALWriteUtils;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.read.TimeValuePair;
+import org.apache.iotdb.tsfile.read.common.TimeRange;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
import org.apache.iotdb.tsfile.utils.BitMap;
import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
@@ -199,6 +201,25 @@ public class DoubleTVList extends TVList {
return new TimeValuePair(time,
TsPrimitiveType.getByType(TSDataType.DOUBLE, value));
}
+ @Override
+ protected void writeValidValuesIntoTsBlock(
+ TsBlockBuilder builder,
+ int floatPrecision,
+ TSEncoding encoding,
+ List<TimeRange> deletionList) {
+ Integer deleteCursor = 0;
+ for (int i = 0; i < rowCount; i++) {
+ if (!isPointDeleted(getTime(i), deletionList, deleteCursor)
+ && (i == rowCount - 1 || getTime(i) != getTime(i + 1))) {
+ builder.getTimeColumnBuilder().writeLong(getTime(i));
+ builder
+ .getColumnBuilder(0)
+ .writeDouble(roundValueWithGivenPrecision(getDouble(i),
floatPrecision, encoding));
+ builder.declarePosition();
+ }
+ }
+ }
+
@Override
protected void releaseLastValueArray() {
PrimitiveArrayManager.release(values.remove(values.size() - 1));
diff --git
a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/FloatTVList.java
b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/FloatTVList.java
index b11d26360d..c157c7ee53 100644
---
a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/FloatTVList.java
+++
b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/FloatTVList.java
@@ -25,6 +25,8 @@ import org.apache.iotdb.db.wal.utils.WALWriteUtils;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.read.TimeValuePair;
+import org.apache.iotdb.tsfile.read.common.TimeRange;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
import org.apache.iotdb.tsfile.utils.BitMap;
import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
@@ -199,6 +201,25 @@ public class FloatTVList extends TVList {
return new TimeValuePair(time, TsPrimitiveType.getByType(TSDataType.FLOAT,
value));
}
+ @Override
+ protected void writeValidValuesIntoTsBlock(
+ TsBlockBuilder builder,
+ int floatPrecision,
+ TSEncoding encoding,
+ List<TimeRange> deletionList) {
+ Integer deleteCursor = 0;
+ for (int i = 0; i < rowCount; i++) {
+ if (!isPointDeleted(getTime(i), deletionList, deleteCursor)
+ && (i == rowCount - 1 || getTime(i) != getTime(i + 1))) {
+ builder.getTimeColumnBuilder().writeLong(getTime(i));
+ builder
+ .getColumnBuilder(0)
+ .writeFloat(roundValueWithGivenPrecision(getFloat(i),
floatPrecision, encoding));
+ builder.declarePosition();
+ }
+ }
+ }
+
@Override
protected void releaseLastValueArray() {
PrimitiveArrayManager.release(values.remove(values.size() - 1));
diff --git
a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/IntTVList.java
b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/IntTVList.java
index c6779acf17..3876009ec0 100644
---
a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/IntTVList.java
+++
b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/IntTVList.java
@@ -24,6 +24,8 @@ import org.apache.iotdb.db.wal.utils.WALWriteUtils;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.read.TimeValuePair;
+import org.apache.iotdb.tsfile.read.common.TimeRange;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
import org.apache.iotdb.tsfile.utils.BitMap;
import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
@@ -194,6 +196,23 @@ public class IntTVList extends TVList {
return new TimeValuePair(time, TsPrimitiveType.getByType(TSDataType.INT32,
getInt(index)));
}
+ @Override
+ protected void writeValidValuesIntoTsBlock(
+ TsBlockBuilder builder,
+ int floatPrecision,
+ TSEncoding encoding,
+ List<TimeRange> deletionList) {
+ Integer deleteCursor = 0;
+ for (int i = 0; i < rowCount; i++) {
+ if (!isPointDeleted(getTime(i), deletionList, deleteCursor)
+ && (i == rowCount - 1 || getTime(i) != getTime(i + 1))) {
+ builder.getTimeColumnBuilder().writeLong(getTime(i));
+ builder.getColumnBuilder(0).writeInt(getInt(i));
+ builder.declarePosition();
+ }
+ }
+ }
+
@Override
protected void releaseLastValueArray() {
PrimitiveArrayManager.release(values.remove(values.size() - 1));
diff --git
a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/LongTVList.java
b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/LongTVList.java
index cd7e1e5124..d28bc76e56 100644
---
a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/LongTVList.java
+++
b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/LongTVList.java
@@ -24,6 +24,8 @@ import org.apache.iotdb.db.wal.utils.WALWriteUtils;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.read.TimeValuePair;
+import org.apache.iotdb.tsfile.read.common.TimeRange;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
import org.apache.iotdb.tsfile.utils.BitMap;
import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
@@ -194,6 +196,23 @@ public class LongTVList extends TVList {
return new TimeValuePair(time, TsPrimitiveType.getByType(TSDataType.INT64,
getLong(index)));
}
+ @Override
+ protected void writeValidValuesIntoTsBlock(
+ TsBlockBuilder builder,
+ int floatPrecision,
+ TSEncoding encoding,
+ List<TimeRange> deletionList) {
+ Integer deleteCursor = 0;
+ for (int i = 0; i < rowCount; i++) {
+ if (!isPointDeleted(getTime(i), deletionList, deleteCursor)
+ && (i == rowCount - 1 || getTime(i) != getTime(i + 1))) {
+ builder.getTimeColumnBuilder().writeLong(getTime(i));
+ builder.getColumnBuilder(0).writeLong(getLong(i));
+ builder.declarePosition();
+ }
+ }
+ }
+
@Override
protected void releaseLastValueArray() {
PrimitiveArrayManager.release(values.remove(values.size() - 1));
diff --git
a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
index 56ac6e1205..f08d4679a8 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
@@ -21,12 +21,14 @@ package org.apache.iotdb.db.utils.datastructure;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.rescon.PrimitiveArrayManager;
+import org.apache.iotdb.db.utils.MathUtils;
import org.apache.iotdb.db.wal.buffer.WALEntryValue;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.read.TimeValuePair;
import org.apache.iotdb.tsfile.read.common.TimeRange;
-import org.apache.iotdb.tsfile.read.reader.IPointReader;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.iotdb.tsfile.utils.BitMap;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
@@ -34,6 +36,7 @@ import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import java.io.DataInputStream;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
@@ -94,9 +97,9 @@ public abstract class TVList implements WALEntryValue {
// value array mem size
size += (long) PrimitiveArrayManager.ARRAY_SIZE * (long)
type.getDataTypeSize();
// two array headers mem size
- size += NUM_BYTES_ARRAY_HEADER * 2;
+ size += NUM_BYTES_ARRAY_HEADER * 2L;
// Object references size in ArrayList
- size += NUM_BYTES_OBJECT_REF * 2;
+ size += NUM_BYTES_OBJECT_REF * 2L;
return size;
}
@@ -210,11 +213,8 @@ public abstract class TVList implements WALEntryValue {
throw new UnsupportedOperationException(ERR_DATATYPE_NOT_CONSISTENT);
}
- public Object getAlignedValue(int index, Integer floatPrecision, TSEncoding
encoding) {
- throw new UnsupportedOperationException(ERR_DATATYPE_NOT_CONSISTENT);
- }
-
- public TVList getTvListByColumnIndex(List<Integer> columnIndexList) {
+ public TVList getTvListByColumnIndex(
+ List<Integer> columnIndexList, List<TSDataType> dataTypeList) {
throw new UnsupportedOperationException(ERR_DATATYPE_NOT_CONSISTENT);
}
@@ -489,10 +489,6 @@ public abstract class TVList implements WALEntryValue {
}
}
- void updateMinTimeAndSorted(long[] time) {
- updateMinTimeAndSorted(time, 0, time.length);
- }
-
void updateMinTimeAndSorted(long[] time, int start, int end) {
int length = time.length;
long inPutMinTime = Long.MAX_VALUE;
@@ -513,107 +509,52 @@ public abstract class TVList implements WALEntryValue {
protected abstract TimeValuePair getTimeValuePair(
int index, long time, Integer floatPrecision, TSEncoding encoding);
- public TimeValuePair getTimeValuePairForTimeDuplicatedRows(
- List<Integer> timeDuplicatedVectorRowIndexList,
- long time,
- Integer floatPrecision,
- TSEncoding encoding) {
- throw new UnsupportedOperationException(ERR_DATATYPE_NOT_CONSISTENT);
- }
-
@TestOnly
- public IPointReader getIterator() {
- return new Ite();
+ public TsBlock buildTsBlock() {
+ return buildTsBlock(0, TSEncoding.PLAIN, null);
}
- public IPointReader getIterator(
- int floatPrecision, TSEncoding encoding, int size, List<TimeRange>
deletionList) {
- return new Ite(floatPrecision, encoding, size, deletionList);
+ public TsBlock buildTsBlock(
+ int floatPrecision, TSEncoding encoding, List<TimeRange> deletionList) {
+ TsBlockBuilder builder = new
TsBlockBuilder(Collections.singletonList(this.getDataType()));
+ writeValidValuesIntoTsBlock(builder, floatPrecision, encoding,
deletionList);
+ return builder.build();
}
- protected class Ite implements IPointReader {
-
- protected TimeValuePair cachedTimeValuePair;
- protected boolean hasCachedPair;
- protected int cur;
- protected Integer floatPrecision;
- private TSEncoding encoding;
- private int deleteCursor = 0;
- /**
- * because TV list may be share with different query, each iterator has to
record it's own size
- */
- protected int iteSize = 0;
- /** this field is effective only in the Tvlist in a RealOnlyMemChunk. */
- private List<TimeRange> deletionList;
-
- public Ite() {
- this.iteSize = TVList.this.rowCount;
- }
+ protected abstract void writeValidValuesIntoTsBlock(
+ TsBlockBuilder builder,
+ int floatPrecision,
+ TSEncoding encoding,
+ List<TimeRange> deletionList);
- public Ite(int floatPrecision, TSEncoding encoding, int size,
List<TimeRange> deletionList) {
- this.floatPrecision = floatPrecision;
- this.encoding = encoding;
- this.iteSize = size;
- this.deletionList = deletionList;
- }
-
- @Override
- public boolean hasNextTimeValuePair() {
- if (hasCachedPair) {
+ protected boolean isPointDeleted(
+ long timestamp, List<TimeRange> deletionList, Integer deleteCursor) {
+ while (deletionList != null && deleteCursor < deletionList.size()) {
+ if (deletionList.get(deleteCursor).contains(timestamp)) {
return true;
- }
-
- while (cur < iteSize) {
- long time = getTime(cur);
- if (isPointDeleted(time) || (cur + 1 < rowCount() && (time ==
getTime(cur + 1)))) {
- cur++;
- continue;
- }
- TimeValuePair tvPair;
- tvPair = getTimeValuePair(cur, time, floatPrecision, encoding);
- cur++;
- if (tvPair.getValue() != null) {
- cachedTimeValuePair = tvPair;
- hasCachedPair = true;
- return true;
- }
- }
-
- return false;
- }
-
- protected boolean isPointDeleted(long timestamp) {
- while (deletionList != null && deleteCursor < deletionList.size()) {
- if (deletionList.get(deleteCursor).contains(timestamp)) {
- return true;
- } else if (deletionList.get(deleteCursor).getMax() < timestamp) {
- deleteCursor++;
- } else {
- return false;
- }
- }
- return false;
- }
-
- @Override
- public TimeValuePair nextTimeValuePair() throws IOException {
- if (hasCachedPair || hasNextTimeValuePair()) {
- hasCachedPair = false;
- return cachedTimeValuePair;
+ } else if (deletionList.get(deleteCursor).getMax() < timestamp) {
+ deleteCursor++;
} else {
- throw new IOException("no next time value pair");
+ return false;
}
}
+ return false;
+ }
- @Override
- public TimeValuePair currentTimeValuePair() {
- return cachedTimeValuePair;
+ protected float roundValueWithGivenPrecision(
+ float value, int floatPrecision, TSEncoding encoding) {
+ if (!Float.isNaN(value) && (encoding == TSEncoding.RLE || encoding ==
TSEncoding.TS_2DIFF)) {
+ return MathUtils.roundWithGivenPrecision(value, floatPrecision);
}
+ return value;
+ }
- @Override
- public void close() throws IOException {
- // Do nothing because of this is an in memory object
+ protected double roundValueWithGivenPrecision(
+ double value, int floatPrecision, TSEncoding encoding) {
+ if (!Double.isNaN(value) && (encoding == TSEncoding.RLE || encoding ==
TSEncoding.TS_2DIFF)) {
+ return MathUtils.roundWithGivenPrecision(value, floatPrecision);
}
+ return value;
}
public abstract TSDataType getDataType();
diff --git
a/server/src/test/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTableTest.java
b/server/src/test/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTableTest.java
index 9ef07c11b5..fd01cd0f93 100644
---
a/server/src/test/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTableTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/engine/memtable/PrimitiveMemTableTest.java
@@ -78,7 +78,8 @@ public class PrimitiveMemTableTest {
for (int i = 0; i < count; i++) {
series.write(i, i);
}
- IPointReader it = series.getSortedTvListForQuery().getIterator();
+ IPointReader it =
+
series.getSortedTvListForQuery().buildTsBlock().getTsBlockSingleColumnIterator();
int i = 0;
while (it.hasNextTimeValuePair()) {
Assert.assertEquals(i, it.nextTimeValuePair().getTimestamp());
diff --git
a/server/src/test/java/org/apache/iotdb/db/utils/datastructure/VectorTVListTest.java
b/server/src/test/java/org/apache/iotdb/db/utils/datastructure/VectorTVListTest.java
index d9c7ff3b19..820a461bdf 100644
---
a/server/src/test/java/org/apache/iotdb/db/utils/datastructure/VectorTVListTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/utils/datastructure/VectorTVListTest.java
@@ -201,7 +201,7 @@ public class VectorTVListTest {
clonedTvList.getAlignedValue((int) i).toString());
for (int column = 0; i < 5; i++) {
Assert.assertEquals(
- tvList.isValueMarked((int) i, column),
clonedTvList.isValueMarked((int) i, column));
+ tvList.isNullValue((int) i, column),
clonedTvList.isNullValue((int) i, column));
}
}
}
diff --git
a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlock.java
b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlock.java
index c19d4f2c06..18e61c2ea3 100644
---
a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlock.java
+++
b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/TsBlock.java
@@ -395,7 +395,9 @@ public class TsBlock {
public TsPrimitiveType[] currentValue() {
TsPrimitiveType[] tsPrimitiveTypes = new
TsPrimitiveType[valueColumns.length];
for (int i = 0; i < valueColumns.length; i++) {
- tsPrimitiveTypes[i] = valueColumns[i].getTsPrimitiveType(rowIndex);
+ if (!valueColumns[i].isNull(rowIndex)) {
+ tsPrimitiveTypes[i] = valueColumns[i].getTsPrimitiveType(rowIndex);
+ }
}
return tsPrimitiveTypes;
}
@@ -412,6 +414,9 @@ public class TsBlock {
@Override
public boolean hasNextTimeValuePair() {
+ while (hasNext() && isCurrentValueAllNull()) {
+ next();
+ }
return hasNext();
}
@@ -446,6 +451,15 @@ public class TsBlock {
public void setRowIndex(int rowIndex) {
this.rowIndex = rowIndex;
}
+
+ private boolean isCurrentValueAllNull() {
+ for (int i = 0; i < valueColumns.length; i++) {
+ if (!valueColumns[i].isNull(rowIndex)) {
+ return false;
+ }
+ }
+ return true;
+ }
}
private long updateRetainedSize() {
diff --git
a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/ColumnBuilder.java
b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/ColumnBuilder.java
index 95e71edc06..6bc1f727f1 100644
---
a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/ColumnBuilder.java
+++
b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/block/column/ColumnBuilder.java
@@ -29,22 +29,22 @@ public interface ColumnBuilder {
throw new UnsupportedOperationException(getClass().getName());
}
- /** Write a short to the current entry; */
+ /** Write an int to the current entry; */
default ColumnBuilder writeInt(int value) {
throw new UnsupportedOperationException(getClass().getName());
}
- /** Write a int to the current entry; */
+ /** Write a long to the current entry; */
default ColumnBuilder writeLong(long value) {
throw new UnsupportedOperationException(getClass().getName());
}
- /** Write a long to the current entry; */
+ /** Write a float to the current entry; */
default ColumnBuilder writeFloat(float value) {
throw new UnsupportedOperationException(getClass().getName());
}
- /** Write a byte sequences to the current entry; */
+ /** Write a double to the current entry; */
default ColumnBuilder writeDouble(double value) {
throw new UnsupportedOperationException(getClass().getName());
}