This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch tsFile_v4
in repository https://gitbox.apache.org/repos/asf/tsfile.git
The following commit(s) were added to refs/heads/tsFile_v4 by this push:
new 07cc8658 Finish SingleDeviceTsBlockReader
07cc8658 is described below
commit 07cc86587c6906b88d853fc835114c36d2c9b3b6
Author: jt2594838 <[email protected]>
AuthorDate: Thu Apr 11 18:37:32 2024 +0800
Finish SingleDeviceTsBlockReader
---
.../org/apache/tsfile/block/column/Column.java | 18 ++
.../org/apache/tsfile/common/cache/LRUCache.java | 4 +
.../org/apache/tsfile/file/metadata/IDeviceID.java | 2 +-
.../tsfile/file/metadata/StringArrayDeviceID.java | 3 +-
.../apache/tsfile/file/metadata/TableSchema.java | 5 +
.../apache/tsfile/read/TsFileSequenceReader.java | 30 ++-
.../apache/tsfile/read/common/block/TsBlock.java | 44 +++-
.../read/common/block/column/BinaryColumn.java | 21 +-
.../read/common/block/column/BooleanColumn.java | 21 +-
.../read/common/block/column/ColumnFactory.java | 44 ++++
.../read/common/block/column/DoubleColumn.java | 21 +-
.../read/common/block/column/FloatColumn.java | 21 +-
.../tsfile/read/common/block/column/IntColumn.java | 21 +-
.../read/common/block/column/LongColumn.java | 21 +-
.../read/common/block/column/NullColumn.java | 7 +-
.../block/column/RunLengthEncodedColumn.java | 12 +-
.../read/common/block/column/TimeColumn.java | 16 +-
.../tsfile/read/controller/IMetadataQuerier.java | 12 +
.../read/controller/MetadataQuerierByFileImpl.java | 43 +++-
.../tsfile/read/expression/ExpressionTree.java | 3 +
.../read/query/executor/TableQueryExecutor.java | 36 ++-
.../read/query/executor/task/DeviceQueryTask.java | 27 ++-
.../query/executor/task/DeviceTaskIterator.java | 8 +-
.../reader/block/DeviceOrderedTsBlockReader.java | 20 +-
.../reader/block/SingleDeviceTsBlockReader.java | 243 ++++++++++++++++++++-
25 files changed, 661 insertions(+), 42 deletions(-)
diff --git a/common/src/main/java/org/apache/tsfile/block/column/Column.java
b/common/src/main/java/org/apache/tsfile/block/column/Column.java
index 6db30a27..8e438f70 100644
--- a/common/src/main/java/org/apache/tsfile/block/column/Column.java
+++ b/common/src/main/java/org/apache/tsfile/block/column/Column.java
@@ -19,6 +19,7 @@
package org.apache.tsfile.block.column;
+import java.util.Arrays;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.utils.Binary;
import org.apache.tsfile.utils.TsPrimitiveType;
@@ -124,6 +125,13 @@ public interface Column {
/** Returns the array to determine whether each position of the column is
null or not. */
boolean[] isNull();
+ /**
+ * Set the given range as null.
+ * @param start start position (inclusive)
+ * @param end end position (exclusive)
+ */
+ void setNull(int start, int end);
+
/** Returns the number of positions in this block. */
int getPositionCount();
@@ -152,4 +160,14 @@ public interface Column {
void reverse();
int getInstanceSize();
+
+ void setPositionCount(int count);
+
+ default void reset() {
+ setPositionCount(0);
+ final boolean[] isNulls = isNull();
+ if (isNulls != null) {
+ Arrays.fill(isNulls, false);
+ }
+ }
}
diff --git a/tsfile/src/main/java/org/apache/tsfile/common/cache/LRUCache.java
b/tsfile/src/main/java/org/apache/tsfile/common/cache/LRUCache.java
index 9b79825b..a00469a5 100644
--- a/tsfile/src/main/java/org/apache/tsfile/common/cache/LRUCache.java
+++ b/tsfile/src/main/java/org/apache/tsfile/common/cache/LRUCache.java
@@ -51,6 +51,10 @@ public abstract class LRUCache<K, T> implements Cache<K, T> {
}
}
+ public boolean containsKey(K key) {
+ return cache.containsKey(key);
+ }
+
@Override
public synchronized void clear() {
cache.clear();
diff --git
a/tsfile/src/main/java/org/apache/tsfile/file/metadata/IDeviceID.java
b/tsfile/src/main/java/org/apache/tsfile/file/metadata/IDeviceID.java
index 5b3e7255..5fadc0ab 100644
--- a/tsfile/src/main/java/org/apache/tsfile/file/metadata/IDeviceID.java
+++ b/tsfile/src/main/java/org/apache/tsfile/file/metadata/IDeviceID.java
@@ -57,7 +57,7 @@ public interface IDeviceID extends Comparable<IDeviceID>,
Accountable {
* @return i-th segment in this DeviceId.
* @throws ArrayIndexOutOfBoundsException if i >= segmentNum().
*/
- String segment(int i);
+ Object segment(int i);
static IDeviceID deserializeFrom(ByteBuffer byteBuffer) {
return new PlainDeviceID(ReadWriteIOUtils.readVarIntString(byteBuffer));
diff --git
a/tsfile/src/main/java/org/apache/tsfile/file/metadata/StringArrayDeviceID.java
b/tsfile/src/main/java/org/apache/tsfile/file/metadata/StringArrayDeviceID.java
index de71419c..14901c36 100644
---
a/tsfile/src/main/java/org/apache/tsfile/file/metadata/StringArrayDeviceID.java
+++
b/tsfile/src/main/java/org/apache/tsfile/file/metadata/StringArrayDeviceID.java
@@ -105,7 +105,8 @@ public class StringArrayDeviceID implements IDeviceID {
// the other ID is a prefix of this one
return 1;
}
- final int comp = Objects.compare(this.segment(i), o.segment(i),
WriteUtils::compareStrings);
+ final int comp = Objects.compare(this.segment(i), ((String)
o.segment(i)),
+ WriteUtils::compareStrings);
if (comp != 0) {
// the partial comparison has a result
return comp;
diff --git
a/tsfile/src/main/java/org/apache/tsfile/file/metadata/TableSchema.java
b/tsfile/src/main/java/org/apache/tsfile/file/metadata/TableSchema.java
index f498ed9d..870ef2e4 100644
--- a/tsfile/src/main/java/org/apache/tsfile/file/metadata/TableSchema.java
+++ b/tsfile/src/main/java/org/apache/tsfile/file/metadata/TableSchema.java
@@ -76,6 +76,11 @@ public class TableSchema {
});
}
+ public MeasurementSchema findColumnSchema(String columnName) {
+ final int columnIndex = findColumnIndex(columnName);
+ return columnIndex >= 0 ? columnSchemas.get(columnIndex) : null;
+ }
+
public void update(ChunkGroupMetadata chunkGroupMetadata) {
if (!updatable) {
return;
diff --git
a/tsfile/src/main/java/org/apache/tsfile/read/TsFileSequenceReader.java
b/tsfile/src/main/java/org/apache/tsfile/read/TsFileSequenceReader.java
index fb50e186..ad9478aa 100644
--- a/tsfile/src/main/java/org/apache/tsfile/read/TsFileSequenceReader.java
+++ b/tsfile/src/main/java/org/apache/tsfile/read/TsFileSequenceReader.java
@@ -570,9 +570,10 @@ public class TsFileSequenceReader implements AutoCloseable
{
// This method is only used for TsFile
public List<ITimeSeriesMetadata> readITimeseriesMetadata(
- IDeviceID device, Set<String> measurements) throws IOException {
+ IDeviceID device, Set<String> measurements, MetadataIndexNode root)
throws IOException {
readFileMetadata();
MetadataIndexNode deviceMetadataIndexNode =
+ root != null ? root :
tsFileMetaData.getTableMetadataIndexNodeMap().get(device.getTableName());
Pair<IMetadataIndexEntry, Long> metadataIndexPair =
getMetadataAndEndOffsetOfDeviceNode(deviceMetadataIndexNode, device,
false);
@@ -2169,6 +2170,33 @@ public class TsFileSequenceReader implements
AutoCloseable {
return chunkMetadataList;
}
+ public List<IChunkMetadata> getIChunkMetadataList(IDeviceID deviceID, String
measurementName) throws IOException {
+ List<ITimeSeriesMetadata> timeseriesMetaData =
readITimeseriesMetadata(deviceID,
+ Collections.singleton(measurementName), null);
+ if (timeseriesMetaData == null || timeseriesMetaData.isEmpty()) {
+ return Collections.emptyList();
+ }
+ List<IChunkMetadata> chunkMetadataList =
readIChunkMetaDataList(timeseriesMetaData.get(0));
+
chunkMetadataList.sort(Comparator.comparingLong(IChunkMetadata::getStartTime));
+ return chunkMetadataList;
+ }
+
+ public List<List<IChunkMetadata>> getIChunkMetadataList(IDeviceID deviceID,
+ Set<String> measurementNames, MetadataIndexNode root) throws IOException
{
+ List<ITimeSeriesMetadata> timeseriesMetaData =
readITimeseriesMetadata(deviceID,
+ measurementNames, root);
+ if (timeseriesMetaData == null || timeseriesMetaData.isEmpty()) {
+ return Collections.emptyList();
+ }
+ List<List<IChunkMetadata>> results = new
ArrayList<>(timeseriesMetaData.size());
+ for (ITimeSeriesMetadata timeseriesMetaDatum : timeseriesMetaData) {
+ List<IChunkMetadata> chunkMetadataList =
readIChunkMetaDataList(timeseriesMetaDatum);
+
chunkMetadataList.sort(Comparator.comparingLong(org.apache.tsfile.file.metadata.IChunkMetadata::getStartTime));
+ results.add(chunkMetadataList);
+ }
+ return results;
+ }
+
public List<ChunkMetadata> getChunkMetadataList(Path path) throws
IOException {
return getChunkMetadataList(path, false);
}
diff --git
a/tsfile/src/main/java/org/apache/tsfile/read/common/block/TsBlock.java
b/tsfile/src/main/java/org/apache/tsfile/read/common/block/TsBlock.java
index d6b7ca3d..43eb29b4 100644
--- a/tsfile/src/main/java/org/apache/tsfile/read/common/block/TsBlock.java
+++ b/tsfile/src/main/java/org/apache/tsfile/read/common/block/TsBlock.java
@@ -19,9 +19,12 @@
package org.apache.tsfile.read.common.block;
+import java.util.List;
import org.apache.tsfile.block.column.Column;
+import org.apache.tsfile.file.metadata.TableSchema;
import org.apache.tsfile.read.TimeValuePair;
import org.apache.tsfile.read.common.IBatchDataIterator;
+import org.apache.tsfile.read.common.block.column.ColumnFactory;
import org.apache.tsfile.read.common.block.column.TimeColumn;
import org.apache.tsfile.read.reader.IPointReader;
import org.apache.tsfile.utils.RamUsageEstimator;
@@ -31,6 +34,7 @@ import org.apache.tsfile.write.UnSupportedDataTypeException;
import java.util.Arrays;
import java.util.Iterator;
import java.util.NoSuchElementException;
+import org.apache.tsfile.write.schema.MeasurementSchema;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
@@ -61,7 +65,7 @@ public class TsBlock {
private final Column[] valueColumns;
/** How many rows in current TsBlock */
- private final int positionCount;
+ private int positionCount;
private volatile long retainedSizeInBytes = -1;
@@ -98,6 +102,10 @@ public class TsBlock {
return positionCount;
}
+ public void setPositionCount(int positionCount) {
+ this.positionCount = positionCount;
+ }
+
public long getStartTime() {
return timeColumn.getStartTime();
}
@@ -237,6 +245,17 @@ public class TsBlock {
return new TsBlockAlignedRowIterator(0);
}
+ public void reset() {
+ if (positionCount == 0) {
+ return;
+ }
+ positionCount = 0;
+ timeColumn.reset();
+ for (Column valueColumn : valueColumns) {
+ valueColumn.reset();
+ }
+ }
+
public class TsBlockSingleColumnIterator implements IPointReader,
IBatchDataIterator {
private int rowIndex;
@@ -553,4 +572,27 @@ public class TsBlock {
}
}
}
+
+ public static TsBlock buildTsBlock(List<String> columnNames, TableSchema
schema, int blockSize) {
+ TimeColumn timeColumn = new TimeColumn(blockSize);
+ Column[] columns = new Column[columnNames.size()];
+ for (int i = 0; i < columnNames.size(); i++) {
+ final String columnName = columnNames.get(i);
+ final MeasurementSchema columnSchema =
schema.findColumnSchema(columnName);
+ columns[i] = ColumnFactory.create(columnSchema.getType(), blockSize);
+ }
+ return new TsBlock(timeColumn, columns);
+ }
+
+ /**
+ * For each column, if its positionCount < this. positionCount, add nulls at
the end of the
+ * column.
+ */
+ public void fillTrailingNulls() {
+ for (Column valueColumn : valueColumns) {
+ if (valueColumn.getPositionCount() < this.positionCount) {
+ valueColumn.setNull(valueColumn.getPositionCount(),
this.positionCount);
+ }
+ }
+ }
}
diff --git
a/tsfile/src/main/java/org/apache/tsfile/read/common/block/column/BinaryColumn.java
b/tsfile/src/main/java/org/apache/tsfile/read/common/block/column/BinaryColumn.java
index facd7470..05c3956d 100644
---
a/tsfile/src/main/java/org/apache/tsfile/read/common/block/column/BinaryColumn.java
+++
b/tsfile/src/main/java/org/apache/tsfile/read/common/block/column/BinaryColumn.java
@@ -39,12 +39,16 @@ public class BinaryColumn implements Column {
(int) RamUsageEstimator.shallowSizeOfInstance(BinaryColumn.class);
private final int arrayOffset;
- private final int positionCount;
- private final boolean[] valueIsNull;
+ private int positionCount;
+ private boolean[] valueIsNull;
private final Binary[] values;
private final long retainedSizeInBytes;
+ public BinaryColumn(int initialCapacity) {
+ this(0, 0, null, new Binary[initialCapacity]);
+ }
+
public BinaryColumn(int positionCount, Optional<boolean[]> valueIsNull,
Binary[] values) {
this(0, positionCount, valueIsNull.orElse(null), values);
}
@@ -169,4 +173,17 @@ public class BinaryColumn implements Column {
public int getInstanceSize() {
return INSTANCE_SIZE;
}
+
+ @Override
+ public void setPositionCount(int count) {
+ positionCount = count;
+ }
+
+ @Override
+ public void setNull(int start, int end) {
+ if (valueIsNull == null) {
+ valueIsNull = new boolean[values.length];
+ }
+ Arrays.fill(valueIsNull, start, end, true);
+ }
}
diff --git
a/tsfile/src/main/java/org/apache/tsfile/read/common/block/column/BooleanColumn.java
b/tsfile/src/main/java/org/apache/tsfile/read/common/block/column/BooleanColumn.java
index d7d7df68..09b5ec52 100644
---
a/tsfile/src/main/java/org/apache/tsfile/read/common/block/column/BooleanColumn.java
+++
b/tsfile/src/main/java/org/apache/tsfile/read/common/block/column/BooleanColumn.java
@@ -38,12 +38,16 @@ public class BooleanColumn implements Column {
public static final int SIZE_IN_BYTES_PER_POSITION = Byte.BYTES + Byte.BYTES;
private final int arrayOffset;
- private final int positionCount;
- private final boolean[] valueIsNull;
+ private int positionCount;
+ private boolean[] valueIsNull;
private final boolean[] values;
private final long retainedSizeInBytes;
+ public BooleanColumn(int initialCapacity) {
+ this(0, 0, null, new boolean[initialCapacity]);
+ }
+
public BooleanColumn(int positionCount, Optional<boolean[]> valueIsNull,
boolean[] values) {
this(0, positionCount, valueIsNull.orElse(null), values);
}
@@ -167,4 +171,17 @@ public class BooleanColumn implements Column {
public int getInstanceSize() {
return INSTANCE_SIZE;
}
+
+ @Override
+ public void setPositionCount(int count) {
+ positionCount = count;
+ }
+
+ @Override
+ public void setNull(int start, int end) {
+ if (valueIsNull == null) {
+ valueIsNull = new boolean[values.length];
+ }
+ Arrays.fill(valueIsNull, start, end, true);
+ }
}
diff --git
a/tsfile/src/main/java/org/apache/tsfile/read/common/block/column/ColumnFactory.java
b/tsfile/src/main/java/org/apache/tsfile/read/common/block/column/ColumnFactory.java
new file mode 100644
index 00000000..42984c3d
--- /dev/null
+++
b/tsfile/src/main/java/org/apache/tsfile/read/common/block/column/ColumnFactory.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tsfile.read.common.block.column;
+
+import org.apache.tsfile.block.column.Column;
+import org.apache.tsfile.enums.TSDataType;
+
+public class ColumnFactory {
+ public static Column create(TSDataType dataType, int initialCapacity) {
+ switch (dataType) {
+ case INT64:
+ return new LongColumn(initialCapacity);
+ case DOUBLE:
+ return new DoubleColumn(initialCapacity);
+ case FLOAT:
+ return new FloatColumn(initialCapacity);
+ case TEXT:
+ return new BinaryColumn(initialCapacity);
+ case INT32:
+ return new IntColumn(initialCapacity);
+ case BOOLEAN:
+ return new BooleanColumn(initialCapacity);
+ default:
+ throw new IllegalArgumentException("Unsupported data type: " +
dataType);
+ }
+ }
+}
diff --git
a/tsfile/src/main/java/org/apache/tsfile/read/common/block/column/DoubleColumn.java
b/tsfile/src/main/java/org/apache/tsfile/read/common/block/column/DoubleColumn.java
index 5047028a..e643812f 100644
---
a/tsfile/src/main/java/org/apache/tsfile/read/common/block/column/DoubleColumn.java
+++
b/tsfile/src/main/java/org/apache/tsfile/read/common/block/column/DoubleColumn.java
@@ -39,12 +39,16 @@ public class DoubleColumn implements Column {
public static final int SIZE_IN_BYTES_PER_POSITION = Double.BYTES +
Byte.BYTES;
private final int arrayOffset;
- private final int positionCount;
- private final boolean[] valueIsNull;
+ private int positionCount;
+ private boolean[] valueIsNull;
private final double[] values;
private final long retainedSizeInBytes;
+ public DoubleColumn(int initialCapacity) {
+ this(0, 0, null, new double[initialCapacity]);
+ }
+
public DoubleColumn(int positionCount, Optional<boolean[]> valueIsNull,
double[] values) {
this(0, positionCount, valueIsNull.orElse(null), values);
}
@@ -168,4 +172,17 @@ public class DoubleColumn implements Column {
public int getInstanceSize() {
return INSTANCE_SIZE;
}
+
+ @Override
+ public void setPositionCount(int count) {
+ positionCount = count;
+ }
+
+ @Override
+ public void setNull(int start, int end) {
+ if (valueIsNull == null) {
+ valueIsNull = new boolean[values.length];
+ }
+ Arrays.fill(valueIsNull, start, end, true);
+ }
}
diff --git
a/tsfile/src/main/java/org/apache/tsfile/read/common/block/column/FloatColumn.java
b/tsfile/src/main/java/org/apache/tsfile/read/common/block/column/FloatColumn.java
index 4050ad17..c06f190b 100644
---
a/tsfile/src/main/java/org/apache/tsfile/read/common/block/column/FloatColumn.java
+++
b/tsfile/src/main/java/org/apache/tsfile/read/common/block/column/FloatColumn.java
@@ -39,12 +39,16 @@ public class FloatColumn implements Column {
public static final int SIZE_IN_BYTES_PER_POSITION = Float.BYTES +
Byte.BYTES;
private final int arrayOffset;
- private final int positionCount;
- private final boolean[] valueIsNull;
+ private int positionCount;
+ private boolean[] valueIsNull;
private final float[] values;
private final long retainedSizeInBytes;
+ public FloatColumn(int initialCapacity) {
+ this(0, 0, null, new float[initialCapacity]);
+ }
+
public FloatColumn(int positionCount, Optional<boolean[]> valueIsNull,
float[] values) {
this(0, positionCount, valueIsNull.orElse(null), values);
}
@@ -167,4 +171,17 @@ public class FloatColumn implements Column {
public int getInstanceSize() {
return INSTANCE_SIZE;
}
+
+ @Override
+ public void setPositionCount(int count) {
+ positionCount = count;
+ }
+
+ @Override
+ public void setNull(int start, int end) {
+ if (valueIsNull == null) {
+ valueIsNull = new boolean[values.length];
+ }
+ Arrays.fill(valueIsNull, start, end, true);
+ }
}
diff --git
a/tsfile/src/main/java/org/apache/tsfile/read/common/block/column/IntColumn.java
b/tsfile/src/main/java/org/apache/tsfile/read/common/block/column/IntColumn.java
index 8355f28d..7fdcc93f 100644
---
a/tsfile/src/main/java/org/apache/tsfile/read/common/block/column/IntColumn.java
+++
b/tsfile/src/main/java/org/apache/tsfile/read/common/block/column/IntColumn.java
@@ -39,12 +39,16 @@ public class IntColumn implements Column {
public static final int SIZE_IN_BYTES_PER_POSITION = Integer.BYTES +
Byte.BYTES;
private final int arrayOffset;
- private final int positionCount;
- private final boolean[] valueIsNull;
+ private int positionCount;
+ private boolean[] valueIsNull;
private final int[] values;
private final long retainedSizeInBytes;
+ public IntColumn(int initialCapacity) {
+ this(0, 0, null, new int[initialCapacity]);
+ }
+
public IntColumn(int positionCount, Optional<boolean[]> valueIsNull, int[]
values) {
this(0, positionCount, valueIsNull.orElse(null), values);
}
@@ -167,4 +171,17 @@ public class IntColumn implements Column {
public int getInstanceSize() {
return INSTANCE_SIZE;
}
+
+ @Override
+ public void setPositionCount(int count) {
+ this.positionCount = count;
+ }
+
+ @Override
+ public void setNull(int start, int end) {
+ if (valueIsNull == null) {
+ valueIsNull = new boolean[values.length];
+ }
+ Arrays.fill(valueIsNull, start, end, true);
+ }
}
diff --git
a/tsfile/src/main/java/org/apache/tsfile/read/common/block/column/LongColumn.java
b/tsfile/src/main/java/org/apache/tsfile/read/common/block/column/LongColumn.java
index 59b8e016..3045e581 100644
---
a/tsfile/src/main/java/org/apache/tsfile/read/common/block/column/LongColumn.java
+++
b/tsfile/src/main/java/org/apache/tsfile/read/common/block/column/LongColumn.java
@@ -39,12 +39,16 @@ public class LongColumn implements Column {
public static final int SIZE_IN_BYTES_PER_POSITION = Long.BYTES + Byte.BYTES;
private final int arrayOffset;
- private final int positionCount;
- private final boolean[] valueIsNull;
+ private int positionCount;
+ private boolean[] valueIsNull;
private final long[] values;
private final long retainedSizeInBytes;
+ public LongColumn(int initialCapacity) {
+ this(0, 0, null, new long[initialCapacity]);
+ }
+
public LongColumn(int positionCount, Optional<boolean[]> valueIsNull, long[]
values) {
this(0, positionCount, valueIsNull.orElse(null), values);
}
@@ -167,4 +171,17 @@ public class LongColumn implements Column {
public int getInstanceSize() {
return INSTANCE_SIZE;
}
+
+ @Override
+ public void setPositionCount(int count) {
+ this.positionCount = count;
+ }
+
+ @Override
+ public void setNull(int start, int end) {
+ if (valueIsNull == null) {
+ valueIsNull = new boolean[values.length];
+ }
+ Arrays.fill(valueIsNull, start, end, true);
+ }
}
diff --git
a/tsfile/src/main/java/org/apache/tsfile/read/common/block/column/NullColumn.java
b/tsfile/src/main/java/org/apache/tsfile/read/common/block/column/NullColumn.java
index be21cfa6..7e19b26b 100644
---
a/tsfile/src/main/java/org/apache/tsfile/read/common/block/column/NullColumn.java
+++
b/tsfile/src/main/java/org/apache/tsfile/read/common/block/column/NullColumn.java
@@ -36,7 +36,7 @@ public class NullColumn implements Column {
private static final int INSTANCE_SIZE =
(int) RamUsageEstimator.shallowSizeOfInstance(BooleanColumn.class);
- private final int positionCount;
+ private int positionCount;
private final long retainedSizeInBytes;
@@ -126,4 +126,9 @@ public class NullColumn implements Column {
public int getInstanceSize() {
return INSTANCE_SIZE;
}
+
+ @Override
+ public void setPositionCount(int count) {
+ this.positionCount = count;
+ }
}
diff --git
a/tsfile/src/main/java/org/apache/tsfile/read/common/block/column/RunLengthEncodedColumn.java
b/tsfile/src/main/java/org/apache/tsfile/read/common/block/column/RunLengthEncodedColumn.java
index dbca629f..bdb557f1 100644
---
a/tsfile/src/main/java/org/apache/tsfile/read/common/block/column/RunLengthEncodedColumn.java
+++
b/tsfile/src/main/java/org/apache/tsfile/read/common/block/column/RunLengthEncodedColumn.java
@@ -37,7 +37,7 @@ public class RunLengthEncodedColumn implements Column {
(int)
RamUsageEstimator.shallowSizeOfInstance(RunLengthEncodedColumn.class);
private final Column value;
- private final int positionCount;
+ private int positionCount;
public RunLengthEncodedColumn(Column value, int positionCount) {
requireNonNull(value, "value is null");
@@ -214,4 +214,14 @@ public class RunLengthEncodedColumn implements Column {
public int getInstanceSize() {
return INSTANCE_SIZE;
}
+
+ @Override
+ public void setPositionCount(int count) {
+ this.positionCount = count;
+ }
+
+ @Override
+ public void setNull(int start, int end) {
+ value.setNull(start, end);
+ }
}
diff --git
a/tsfile/src/main/java/org/apache/tsfile/read/common/block/column/TimeColumn.java
b/tsfile/src/main/java/org/apache/tsfile/read/common/block/column/TimeColumn.java
index aa629d52..2d2d364b 100644
---
a/tsfile/src/main/java/org/apache/tsfile/read/common/block/column/TimeColumn.java
+++
b/tsfile/src/main/java/org/apache/tsfile/read/common/block/column/TimeColumn.java
@@ -19,6 +19,7 @@
package org.apache.tsfile.read.common.block.column;
+import java.util.Arrays;
import org.apache.tsfile.block.column.Column;
import org.apache.tsfile.block.column.ColumnEncoding;
import org.apache.tsfile.enums.TSDataType;
@@ -33,12 +34,16 @@ public class TimeColumn implements Column {
public static final int SIZE_IN_BYTES_PER_POSITION = Long.BYTES;
private final int arrayOffset;
- private final int positionCount;
+ private int positionCount;
private final long[] values;
private final long retainedSizeInBytes;
+ public TimeColumn(int initialCapacity) {
+ this(0, 0, new long[initialCapacity]);
+ }
+
public TimeColumn(int positionCount, long[] values) {
this(0, positionCount, values);
}
@@ -154,4 +159,13 @@ public class TimeColumn implements Column {
public int getInstanceSize() {
return INSTANCE_SIZE;
}
+
+ @Override
+ public void setPositionCount(int count) {
+ this.positionCount = positionCount;
+ }
+
+ @Override
+ public void setNull(int start, int end) {
+ }
}
diff --git
a/tsfile/src/main/java/org/apache/tsfile/read/controller/IMetadataQuerier.java
b/tsfile/src/main/java/org/apache/tsfile/read/controller/IMetadataQuerier.java
index 09741943..844de99a 100644
---
a/tsfile/src/main/java/org/apache/tsfile/read/controller/IMetadataQuerier.java
+++
b/tsfile/src/main/java/org/apache/tsfile/read/controller/IMetadataQuerier.java
@@ -20,6 +20,7 @@
package org.apache.tsfile.read.controller;
import java.util.Iterator;
+import java.util.Set;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.exception.write.NoMeasurementException;
import org.apache.tsfile.file.metadata.IChunkMetadata;
@@ -39,6 +40,17 @@ public interface IMetadataQuerier {
List<IChunkMetadata> getChunkMetaDataList(Path path) throws IOException;
+ /**
+ *
+ * @param deviceID the deviceID to be queried
+ * @param measurementNames the measurementNames to be queried
+ * @param measurementNode nullable, if provided, the search will start from
the node
+ * @return each list is the ChunkMetadata of those timeseries who exists
+ * @throws IOException if IO error occurs
+ */
+ List<List<IChunkMetadata>> getChunkMetadataLists(IDeviceID deviceID,
+ Set<String> measurementNames, MetadataIndexNode measurementNode) throws
IOException;
+
Map<Path, List<IChunkMetadata>> getChunkMetaDataMap(List<Path> paths) throws
IOException;
TsFileMetadata getWholeFileMetadata();
diff --git
a/tsfile/src/main/java/org/apache/tsfile/read/controller/MetadataQuerierByFileImpl.java
b/tsfile/src/main/java/org/apache/tsfile/read/controller/MetadataQuerierByFileImpl.java
index d7fa458f..18426390 100644
---
a/tsfile/src/main/java/org/apache/tsfile/read/controller/MetadataQuerierByFileImpl.java
+++
b/tsfile/src/main/java/org/apache/tsfile/read/controller/MetadataQuerierByFileImpl.java
@@ -30,6 +30,7 @@ import org.apache.tsfile.file.metadata.ITimeSeriesMetadata;
import org.apache.tsfile.file.metadata.MetadataIndexNode;
import org.apache.tsfile.file.metadata.TimeseriesMetadata;
import org.apache.tsfile.file.metadata.TsFileMetadata;
+import org.apache.tsfile.file.metadata.enums.MetadataIndexNodeType;
import org.apache.tsfile.read.TsFileSequenceReader;
import org.apache.tsfile.read.TsFileSequenceReader.LocateStatus;
import org.apache.tsfile.read.common.Path;
@@ -58,6 +59,7 @@ public class MetadataQuerierByFileImpl implements
IMetadataQuerier {
// TimeseriesPath -> List<IChunkMetadata>
private LRUCache<Path, List<IChunkMetadata>> chunkMetaDataCache;
+ private LRUCache<Pair<IDeviceID, String>, List<IChunkMetadata>>
deviceIdChunkMetadataCache;
private TsFileSequenceReader tsFileReader;
@@ -72,6 +74,14 @@ public class MetadataQuerierByFileImpl implements
IMetadataQuerier {
return loadChunkMetadata(key);
}
};
+ deviceIdChunkMetadataCache =
+ new LRUCache<Pair<IDeviceID, String>,
List<IChunkMetadata>>(CACHED_ENTRY_NUMBER) {
+ @Override
+ protected List<IChunkMetadata> loadObjectByKey(Pair<IDeviceID,
String> key)
+ throws IOException {
+ return loadChunkMetadata(key);
+ }
+ };
}
@Override
@@ -79,6 +89,33 @@ public class MetadataQuerierByFileImpl implements
IMetadataQuerier {
return new ArrayList<>(chunkMetaDataCache.get(timeseriesPath));
}
+ public List<List<IChunkMetadata>> getChunkMetadataLists(IDeviceID deviceID,
+ Set<String> measurementNames, MetadataIndexNode measurementNode) throws
IOException {
+ List<List<IChunkMetadata>> results = new
ArrayList<>(measurementNames.size());
+ final Iterator<String> iterator = measurementNames.iterator();
+ // use cache when possible
+ while (iterator.hasNext()) {
+ final String measurementName = iterator.next();
+ // check first to avoid loading
+ final Pair<IDeviceID, String> key = new Pair<>(deviceID,
measurementName);
+ if (deviceIdChunkMetadataCache.containsKey(key)) {
+ final List<IChunkMetadata> metadataList =
deviceIdChunkMetadataCache.get(key);
+ results.add(metadataList);
+ iterator.remove();
+ }
+ }
+ // the remaining is not in the cache, search them in file
+ final List<List<IChunkMetadata>> iChunkMetadataList =
tsFileReader.getIChunkMetadataList(
+ deviceID, measurementNames, measurementNode);
+ for (List<IChunkMetadata> metadataList : iChunkMetadataList) {
+ final String measurementUid = metadataList.get(0).getMeasurementUid();
+ // cache the result
+ deviceIdChunkMetadataCache.put(new Pair<>(deviceID, measurementUid),
metadataList);
+ results.add(metadataList);
+ }
+ return results;
+ }
+
@Override
public Map<Path, List<IChunkMetadata>> getChunkMetaDataMap(List<Path> paths)
throws IOException {
Map<Path, List<IChunkMetadata>> chunkMetaDatas = new HashMap<>();
@@ -122,7 +159,7 @@ public class MetadataQuerierByFileImpl implements
IMetadataQuerier {
}
List<ITimeSeriesMetadata> timeseriesMetaDataList =
- tsFileReader.readITimeseriesMetadata(selectedDevice,
selectedMeasurements);
+ tsFileReader.readITimeseriesMetadata(selectedDevice,
selectedMeasurements, null);
for (ITimeSeriesMetadata timeseriesMetadata : timeseriesMetaDataList) {
List<IChunkMetadata> chunkMetadataList =
tsFileReader.readIChunkMetaDataList(timeseriesMetadata);
@@ -160,6 +197,10 @@ public class MetadataQuerierByFileImpl implements
IMetadataQuerier {
return tsFileReader.getIChunkMetadataList(path);
}
+ private List<IChunkMetadata> loadChunkMetadata(Pair<IDeviceID, String> key)
throws IOException {
+ return tsFileReader.getIChunkMetadataList(key.getLeft(), key.right);
+ }
+
@Override
@SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity
warning
public List<TimeRange> convertSpace2TimePartition(
diff --git
a/tsfile/src/main/java/org/apache/tsfile/read/expression/ExpressionTree.java
b/tsfile/src/main/java/org/apache/tsfile/read/expression/ExpressionTree.java
index 7dd6e427..b22dc872 100644
--- a/tsfile/src/main/java/org/apache/tsfile/read/expression/ExpressionTree.java
+++ b/tsfile/src/main/java/org/apache/tsfile/read/expression/ExpressionTree.java
@@ -1,5 +1,8 @@
package org.apache.tsfile.read.expression;
+import org.apache.tsfile.read.filter.basic.Filter;
+
public interface ExpressionTree {
boolean satisfy(Object value);
+ Filter toFilter();
}
diff --git
a/tsfile/src/main/java/org/apache/tsfile/read/query/executor/TableQueryExecutor.java
b/tsfile/src/main/java/org/apache/tsfile/read/query/executor/TableQueryExecutor.java
index 112acf66..954719a4 100644
---
a/tsfile/src/main/java/org/apache/tsfile/read/query/executor/TableQueryExecutor.java
+++
b/tsfile/src/main/java/org/apache/tsfile/read/query/executor/TableQueryExecutor.java
@@ -3,8 +3,10 @@ package org.apache.tsfile.read.query.executor;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import org.apache.tsfile.exception.read.NoColumnException;
import org.apache.tsfile.exception.read.ReadProcessException;
import org.apache.tsfile.exception.read.UnsupportedOrderingException;
@@ -25,6 +27,7 @@ public class TableQueryExecutor {
private IMetadataQuerier metadataQuerier;
private IChunkLoader chunkLoader;
private TableQueryOrdering tableQueryOrdering;
+ private int blockSize = 1024;
public TableQueryExecutor(IMetadataQuerier metadataQuerier, IChunkLoader
chunkLoader,
TableQueryOrdering tableQueryOrdering) {
@@ -48,12 +51,14 @@ public class TableQueryExecutor {
String column = columns.get(i);
columnMapping.add(column, i, tableSchema);
}
+ columnMapping.add(measurementFilter);
DeviceTaskIterator deviceTaskIterator = new DeviceTaskIterator(columns,
tableRoot,
- columnMapping, metadataQuerier, idFilter);
+ columnMapping, metadataQuerier, idFilter, tableSchema);
switch (tableQueryOrdering) {
case DEVICE:
- return new DeviceOrderedTsBlockReader(deviceTaskIterator,
metadataQuerier, chunkLoader);
+ return new DeviceOrderedTsBlockReader(deviceTaskIterator,
metadataQuerier, chunkLoader,
+ timeFilter, measurementFilter, blockSize);
case TIME:
default:
throw new UnsupportedOrderingException(tableQueryOrdering.toString());
@@ -66,7 +71,8 @@ public class TableQueryExecutor {
* This mapping is used to put data of the same series into multiple
columns.
*/
private Map<String, List<Integer>> columnPosMap = new HashMap<>();
- private Map<String, Boolean> isIdMap = new HashMap<>();
+ private Set<String> idColumns = new HashSet<>();
+ private Set<String> measurementColumns = new HashSet<>();
public void add(String columnName, int i, TableSchema schema) throws
NoColumnException {
final int columnIndex = schema.findColumnIndex(columnName);
@@ -76,7 +82,15 @@ public class TableQueryExecutor {
final ColumnType columnType = schema.getColumnTypes().get(columnIndex);
columnPosMap.computeIfAbsent(columnName, k -> new ArrayList<>()).add(i);
- isIdMap.put(columnName, columnType.equals(ColumnType.ID));
+ if (columnType.equals(ColumnType.ID)) {
+ idColumns.add(columnName);
+ } else {
+ measurementColumns.add(columnName);
+ }
+ }
+
+ public void add(ExpressionTree measurementFilter) {
+ //TODO: get measurements in the filter and add them to measurementColumns
}
public List<Integer> getColumnPos(String columnName) {
@@ -84,7 +98,19 @@ public class TableQueryExecutor {
}
public boolean isId(String columnName) {
- return isIdMap.getOrDefault(columnName, false);
+ return idColumns.contains(columnName);
+ }
+
+ public boolean isMeasurement(String columnName) {
+ return measurementColumns.contains(columnName);
+ }
+
+ public Set<String> getIdColumns() {
+ return idColumns;
+ }
+
+ public Set<String> getMeasurementColumns() {
+ return measurementColumns;
}
}
diff --git
a/tsfile/src/main/java/org/apache/tsfile/read/query/executor/task/DeviceQueryTask.java
b/tsfile/src/main/java/org/apache/tsfile/read/query/executor/task/DeviceQueryTask.java
index 3744fe9d..6b1510c5 100644
---
a/tsfile/src/main/java/org/apache/tsfile/read/query/executor/task/DeviceQueryTask.java
+++
b/tsfile/src/main/java/org/apache/tsfile/read/query/executor/task/DeviceQueryTask.java
@@ -22,20 +22,23 @@ package org.apache.tsfile.read.query.executor.task;
import java.util.List;
import org.apache.tsfile.file.metadata.IDeviceID;
import org.apache.tsfile.file.metadata.MetadataIndexNode;
-import org.apache.tsfile.read.query.executor.TsFileExecutor.ColumnMapping;
+import org.apache.tsfile.file.metadata.TableSchema;
+import org.apache.tsfile.read.query.executor.TableQueryExecutor.ColumnMapping;
public class DeviceQueryTask {
- private IDeviceID deviceID;
- private List<String> columnNames;
- private ColumnMapping columnMapping;
- private MetadataIndexNode indexRoot;
+ private final IDeviceID deviceID;
+ private final List<String> columnNames;
+ private final ColumnMapping columnMapping;
+ private final MetadataIndexNode indexRoot;
+ private final TableSchema tableSchema;
public DeviceQueryTask(IDeviceID deviceID, List<String> columnNames,
ColumnMapping columnMapping,
- MetadataIndexNode indexRoot) {
+ MetadataIndexNode indexRoot, TableSchema tableSchema) {
this.deviceID = deviceID;
this.columnNames = columnNames;
this.columnMapping = columnMapping;
this.indexRoot = indexRoot;
+ this.tableSchema = tableSchema;
}
public IDeviceID getDeviceID() {
@@ -53,4 +56,16 @@ public class DeviceQueryTask {
public MetadataIndexNode getIndexRoot() {
return indexRoot;
}
+
+ public TableSchema getTableSchema() {
+ return tableSchema;
+ }
+
+ @Override
+ public String toString() {
+ return "DeviceQueryTask{" +
+ "deviceID=" + deviceID +
+ ", columnNames=" + columnNames +
+ '}';
+ }
}
diff --git
a/tsfile/src/main/java/org/apache/tsfile/read/query/executor/task/DeviceTaskIterator.java
b/tsfile/src/main/java/org/apache/tsfile/read/query/executor/task/DeviceTaskIterator.java
index 9e387ff8..69bdefae 100644
---
a/tsfile/src/main/java/org/apache/tsfile/read/query/executor/task/DeviceTaskIterator.java
+++
b/tsfile/src/main/java/org/apache/tsfile/read/query/executor/task/DeviceTaskIterator.java
@@ -4,6 +4,7 @@ import java.util.Iterator;
import java.util.List;
import org.apache.tsfile.file.metadata.IDeviceID;
import org.apache.tsfile.file.metadata.MetadataIndexNode;
+import org.apache.tsfile.file.metadata.TableSchema;
import org.apache.tsfile.read.controller.IMetadataQuerier;
import org.apache.tsfile.read.expression.ExpressionTree;
import org.apache.tsfile.read.query.executor.TableQueryExecutor.ColumnMapping;
@@ -12,13 +13,16 @@ import org.apache.tsfile.utils.Pair;
public class DeviceTaskIterator implements Iterator<DeviceQueryTask> {
private List<String> columnNames;
private ColumnMapping columnMapping;
+ private TableSchema tableSchema;
private Iterator<Pair<IDeviceID, MetadataIndexNode>> deviceMetaIterator;
public DeviceTaskIterator(List<String> columnNames, MetadataIndexNode
indexRoot,
- ColumnMapping columnMapping, IMetadataQuerier metadataQuerier,
ExpressionTree idFilter) {
+ ColumnMapping columnMapping, IMetadataQuerier metadataQuerier,
ExpressionTree idFilter,
+ TableSchema tableSchema) {
this.columnNames = columnNames;
this.columnMapping = columnMapping;
this.deviceMetaIterator = metadataQuerier.deviceIterator(indexRoot,
idFilter);
+ this.tableSchema = tableSchema;
}
@Override
@@ -29,6 +33,6 @@ public class DeviceTaskIterator implements
Iterator<DeviceQueryTask> {
@Override
public DeviceQueryTask next() {
final Pair<IDeviceID, MetadataIndexNode> next = deviceMetaIterator.next();
- return new DeviceQueryTask(next.left, columnNames, columnMapping,
next.right);
+ return new DeviceQueryTask(next.left, columnNames, columnMapping,
next.right, tableSchema);
}
}
diff --git
a/tsfile/src/main/java/org/apache/tsfile/read/reader/block/DeviceOrderedTsBlockReader.java
b/tsfile/src/main/java/org/apache/tsfile/read/reader/block/DeviceOrderedTsBlockReader.java
index a86a628b..8a8c981c 100644
---
a/tsfile/src/main/java/org/apache/tsfile/read/reader/block/DeviceOrderedTsBlockReader.java
+++
b/tsfile/src/main/java/org/apache/tsfile/read/reader/block/DeviceOrderedTsBlockReader.java
@@ -5,22 +5,33 @@ import java.util.NoSuchElementException;
import org.apache.tsfile.read.common.block.TsBlock;
import org.apache.tsfile.read.controller.IChunkLoader;
import org.apache.tsfile.read.controller.IMetadataQuerier;
+import org.apache.tsfile.read.expression.ExpressionTree;
import org.apache.tsfile.read.query.executor.task.DeviceQueryTask;
import org.apache.tsfile.read.query.executor.task.DeviceTaskIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class DeviceOrderedTsBlockReader implements TsBlockReader {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(DeviceOrderedTsBlockReader.class);
private final DeviceTaskIterator taskIterator;
private final IMetadataQuerier metadataQuerier;
private final IChunkLoader chunkLoader;
+ private final int blockSize;
private SingleDeviceTsBlockReader currentReader;
+ private ExpressionTree timeFilter;
+ private ExpressionTree measurementFilter;
public DeviceOrderedTsBlockReader(DeviceTaskIterator taskIterator,
IMetadataQuerier metadataQuerier,
- IChunkLoader chunkLoader) {
+ IChunkLoader chunkLoader, ExpressionTree timeFilter, ExpressionTree
measurementFilter,
+ int blockSize) {
this.taskIterator = taskIterator;
this.metadataQuerier = metadataQuerier;
this.chunkLoader = chunkLoader;
+ this.blockSize = blockSize;
+ this.timeFilter = timeFilter;
+ this.measurementFilter = measurementFilter;
}
@Override
@@ -30,7 +41,12 @@ public class DeviceOrderedTsBlockReader implements
TsBlockReader {
}
while (taskIterator.hasNext()) {
final DeviceQueryTask nextTask = taskIterator.next();
- currentReader = new SingleDeviceTsBlockReader(nextTask, metadataQuerier,
chunkLoader);
+ try {
+ currentReader = new SingleDeviceTsBlockReader(nextTask,
metadataQuerier, chunkLoader,
+ blockSize, timeFilter, measurementFilter);
+ } catch (IOException e) {
+ LOGGER.error("Failed to construct reader for {}", nextTask, e);
+ }
if (currentReader.hasNext()) {
return true;
}
diff --git
a/tsfile/src/main/java/org/apache/tsfile/read/reader/block/SingleDeviceTsBlockReader.java
b/tsfile/src/main/java/org/apache/tsfile/read/reader/block/SingleDeviceTsBlockReader.java
index f61ec47f..2f2c1783 100644
---
a/tsfile/src/main/java/org/apache/tsfile/read/reader/block/SingleDeviceTsBlockReader.java
+++
b/tsfile/src/main/java/org/apache/tsfile/read/reader/block/SingleDeviceTsBlockReader.java
@@ -1,38 +1,267 @@
package org.apache.tsfile.read.reader.block;
import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.NoSuchElementException;
+import org.apache.tsfile.block.column.Column;
+import org.apache.tsfile.file.metadata.IChunkMetadata;
+import org.apache.tsfile.read.common.BatchData;
import org.apache.tsfile.read.common.block.TsBlock;
import org.apache.tsfile.read.controller.IChunkLoader;
import org.apache.tsfile.read.controller.IMetadataQuerier;
+import org.apache.tsfile.read.expression.ExpressionTree;
+import org.apache.tsfile.read.filter.basic.Filter;
import org.apache.tsfile.read.query.executor.task.DeviceQueryTask;
+import org.apache.tsfile.read.reader.series.AbstractFileSeriesReader;
+import org.apache.tsfile.read.reader.series.FileSeriesReader;
+import org.apache.tsfile.utils.Binary;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class SingleDeviceTsBlockReader implements TsBlockReader {
- private DeviceQueryTask task;
- private IMetadataQuerier metadataQuerier;
- private IChunkLoader chunkLoader;
+ private static final Logger LOGGER =
LoggerFactory.getLogger(SingleDeviceTsBlockReader.class);
+ private final DeviceQueryTask task;
+ private final ExpressionTree measurementExpression;
+ private final int blockSize;
+
+ private final TsBlock currentBlock;
+ private boolean lastBlockReturned = true;
+ private final Map<String, MeasurementColumnContext> measureColumnContextMap;
+ private final Map<String, IdColumnContext> idColumnContextMap;
+
+ private long nextTime;
public SingleDeviceTsBlockReader(DeviceQueryTask task, IMetadataQuerier
metadataQuerier,
- IChunkLoader chunkLoader) {
+ IChunkLoader chunkLoader, int blockSize, ExpressionTree timeExpression,
+ ExpressionTree measurementFilter) throws IOException {
this.task = task;
- this.metadataQuerier = metadataQuerier;
- this.chunkLoader = chunkLoader;
+ this.blockSize = blockSize;
+ this.measurementExpression = measurementFilter;
+
+ this.currentBlock = TsBlock.buildTsBlock(task.getColumnNames(),
task.getTableSchema(),
+ blockSize);
+ this.measureColumnContextMap = new HashMap<>();
+ this.idColumnContextMap = new HashMap<>();
+
+ final List<List<IChunkMetadata>> chunkMetadataLists =
metadataQuerier.getChunkMetadataLists(
+ task.getDeviceID(), task.getColumnMapping()
+ .getMeasurementColumns(), task.getIndexRoot());
+ Filter timeFilter = timeExpression == null ? null :
timeExpression.toFilter();
+ for (List<IChunkMetadata> chunkMetadataList : chunkMetadataLists) {
+ if (!chunkMetadataList.isEmpty()) {
+ final String measurementUid =
chunkMetadataList.get(0).getMeasurementUid();
+ AbstractFileSeriesReader seriesReader = new
FileSeriesReader(chunkLoader,
+ chunkMetadataList, timeFilter);
+ if (seriesReader.hasNextBatch()) {
+ measureColumnContextMap.put(measurementUid, new
MeasurementColumnContext(measurementUid,
+ task.getColumnMapping().getColumnPos(measurementUid),
seriesReader.nextBatch(),
+ seriesReader));
+ }
+ }
+ }
+ for (String idColumn : task.getColumnMapping().getIdColumns()) {
+ final List<Integer> columnPosInResult =
task.getColumnMapping().getColumnPos(idColumn);
+ final int columnPosInId =
task.getTableSchema().findColumnIndex(idColumn);
+ idColumnContextMap.put(idColumn, new IdColumnContext(columnPosInResult,
columnPosInId));
+ }
}
@Override
public boolean hasNext() {
+ if (!lastBlockReturned) {
+ return true;
+ }
+
+ if (measureColumnContextMap.isEmpty()) {
+ return false;
+ }
+
+ currentBlock.reset();
+ nextTime = Long.MAX_VALUE;
+ List<MeasurementColumnContext> alignedColumns = new ArrayList<>();
+
+ while (currentBlock.getPositionCount() < blockSize) {
+ // find the minimum time among the batches and the associated columns
+ for (Entry<String, MeasurementColumnContext> entry :
measureColumnContextMap.entrySet()) {
+ final BatchData batchData = entry.getValue().currentBatch;
+ final long currentTime = batchData.currentTime();
+ if (nextTime > currentTime) {
+ nextTime = currentTime;
+ alignedColumns.clear();
+ } else if (nextTime == currentTime) {
+ alignedColumns.add(entry.getValue());
+ }
+ }
+
+ try {
+ fillMeasurements(alignedColumns);
+ } catch (IOException e) {
+ LOGGER.error("Cannot fill measurements", e);
+ return false;
+ }
+
+ // all columns have exhausted
+ if (measureColumnContextMap.isEmpty()) {
+ break;
+ }
+ }
+
+ if (currentBlock.getPositionCount() > 0) {
+ fillIds();
+ currentBlock.fillTrailingNulls();
+ lastBlockReturned = false;
+ return true;
+ }
+
return false;
}
+ private void fillIds() {
+ for (Entry<String, IdColumnContext> entry : idColumnContextMap.entrySet())
{
+ final IdColumnContext idColumnContext = entry.getValue();
+ for (Integer pos : idColumnContext.posInResult) {
+ final Column column = currentBlock.getColumn(pos);
+ fillIdColumn(column,
task.getDeviceID().segment(idColumnContext.posInDeviceId), 0,
+ currentBlock.getPositionCount());
+ }
+ }
+ }
+
+ private void fillMeasurements(List<MeasurementColumnContext> alignedColumns)
throws IOException {
+ if (measurementExpression == null || measurementExpression.satisfy(this)) {
+ // use the time to fill the block
+ final int positionCount = currentBlock.getPositionCount();
+ currentBlock.getTimeColumn().getTimes()[positionCount] = nextTime;
+ // project the value columns to the result
+ for (final MeasurementColumnContext columnContext : alignedColumns) {
+ final BatchData batchData = columnContext.currentBatch;
+ final List<Integer> posInResult = columnContext.posInResult;
+ for (Integer pos : posInResult) {
+ final Column column = currentBlock.getColumn(pos);
+ fillMeasurementColumn(column, batchData, positionCount);
+ }
+
+ batchData.next();
+ if (!batchData.hasCurrent()) {
+ // get next batch of the column
+ if (columnContext.seriesReader.hasNextBatch()) {
+ columnContext.currentBatch =
columnContext.seriesReader.nextBatch();
+ } else {
+ // no more data in this column
+ measureColumnContextMap.remove(columnContext.columnName);
+ }
+ }
+ }
+ currentBlock.setPositionCount(positionCount + 1);
+ }
+ }
+
+ private void fillIdColumn(Column column, Object val, int startPos, int
endPos) {
+ switch (column.getDataType()) {
+ case TEXT:
+ if (val instanceof String) {
+ val = new Binary(((String) val), StandardCharsets.UTF_8);
+ }
+ Arrays.fill(column.getBinaries(), startPos, endPos, val);
+ break;
+ case BOOLEAN:
+ Arrays.fill(column.getBooleans(), startPos, endPos, ((boolean) val));
+ break;
+ case INT32:
+ Arrays.fill(column.getInts(), startPos, endPos, ((int) val));
+ break;
+ case INT64:
+ Arrays.fill(column.getLongs(), startPos, endPos, ((long) val));
+ break;
+ case FLOAT:
+ Arrays.fill(column.getFloats(), startPos, endPos, ((float) val));
+ break;
+ case DOUBLE:
+ Arrays.fill(column.getDoubles(), startPos, endPos, ((double) val));
+ break;
+ default:
+ throw new IllegalArgumentException("Unsupported data type: " +
column.getDataType());
+ }
+ column.setPositionCount(endPos);
+ }
+
+ private void fillMeasurementColumn(Column column, BatchData batchData, int
pos) {
+ switch (batchData.getDataType()) {
+ case BOOLEAN:
+ column.getBooleans()[pos] = batchData.getBoolean();
+ break;
+ case DOUBLE:
+ column.getDoubles()[pos] = batchData.getDouble();
+ break;
+ case FLOAT:
+ column.getFloats()[pos] = batchData.getFloat();
+ break;
+ case INT32:
+ column.getInts()[pos] = batchData.getInt();
+ break;
+ case TEXT:
+ column.getBinaries()[pos] = batchData.getBinary();
+ break;
+ case INT64:
+ column.getLongs()[pos] = batchData.getLong();
+ break;
+ default:
+ throw new IllegalArgumentException("Unsupported data type: " +
batchData.getDataType());
+ }
+ column.setPositionCount(pos + 1);
+ }
+
+
@Override
public TsBlock next() throws IOException {
- return null;
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+ lastBlockReturned = true;
+ return currentBlock;
}
@Override
public void close() throws Exception {
// nothing to be done
}
+
+ // gather necessary fields in this class to avoid redundant map access
+ public static class MeasurementColumnContext {
+
+ private final String columnName;
+ private final List<Integer> posInResult;
+ private BatchData currentBatch;
+ private final AbstractFileSeriesReader seriesReader;
+
+ public MeasurementColumnContext(String columnName, List<Integer>
posInResult,
+ BatchData currentBatch,
+ AbstractFileSeriesReader seriesReader) {
+ this.columnName = columnName;
+ this.posInResult = posInResult;
+ this.currentBatch = currentBatch;
+ this.seriesReader = seriesReader;
+ }
+ }
+
+ public static class IdColumnContext {
+
+ private final List<Integer> posInResult;
+ private final int posInDeviceId;
+
+ public IdColumnContext(List<Integer> posInResult,
+ int posInDeviceId) {
+ this.posInResult = posInResult;
+ this.posInDeviceId = posInDeviceId;
+ }
+ }
}