This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch tsFile_v4
in repository https://gitbox.apache.org/repos/asf/tsfile.git


The following commit(s) were added to refs/heads/tsFile_v4 by this push:
     new 07cc8658 Finish SingleDeviceTsBlockReader
07cc8658 is described below

commit 07cc86587c6906b88d853fc835114c36d2c9b3b6
Author: jt2594838 <[email protected]>
AuthorDate: Thu Apr 11 18:37:32 2024 +0800

    Finish SingleDeviceTsBlockReader
---
 .../org/apache/tsfile/block/column/Column.java     |  18 ++
 .../org/apache/tsfile/common/cache/LRUCache.java   |   4 +
 .../org/apache/tsfile/file/metadata/IDeviceID.java |   2 +-
 .../tsfile/file/metadata/StringArrayDeviceID.java  |   3 +-
 .../apache/tsfile/file/metadata/TableSchema.java   |   5 +
 .../apache/tsfile/read/TsFileSequenceReader.java   |  30 ++-
 .../apache/tsfile/read/common/block/TsBlock.java   |  44 +++-
 .../read/common/block/column/BinaryColumn.java     |  21 +-
 .../read/common/block/column/BooleanColumn.java    |  21 +-
 .../read/common/block/column/ColumnFactory.java    |  44 ++++
 .../read/common/block/column/DoubleColumn.java     |  21 +-
 .../read/common/block/column/FloatColumn.java      |  21 +-
 .../tsfile/read/common/block/column/IntColumn.java |  21 +-
 .../read/common/block/column/LongColumn.java       |  21 +-
 .../read/common/block/column/NullColumn.java       |   7 +-
 .../block/column/RunLengthEncodedColumn.java       |  12 +-
 .../read/common/block/column/TimeColumn.java       |  16 +-
 .../tsfile/read/controller/IMetadataQuerier.java   |  12 +
 .../read/controller/MetadataQuerierByFileImpl.java |  43 +++-
 .../tsfile/read/expression/ExpressionTree.java     |   3 +
 .../read/query/executor/TableQueryExecutor.java    |  36 ++-
 .../read/query/executor/task/DeviceQueryTask.java  |  27 ++-
 .../query/executor/task/DeviceTaskIterator.java    |   8 +-
 .../reader/block/DeviceOrderedTsBlockReader.java   |  20 +-
 .../reader/block/SingleDeviceTsBlockReader.java    | 243 ++++++++++++++++++++-
 25 files changed, 661 insertions(+), 42 deletions(-)

diff --git a/common/src/main/java/org/apache/tsfile/block/column/Column.java 
b/common/src/main/java/org/apache/tsfile/block/column/Column.java
index 6db30a27..8e438f70 100644
--- a/common/src/main/java/org/apache/tsfile/block/column/Column.java
+++ b/common/src/main/java/org/apache/tsfile/block/column/Column.java
@@ -19,6 +19,7 @@
 
 package org.apache.tsfile.block.column;
 
+import java.util.Arrays;
 import org.apache.tsfile.enums.TSDataType;
 import org.apache.tsfile.utils.Binary;
 import org.apache.tsfile.utils.TsPrimitiveType;
@@ -124,6 +125,13 @@ public interface Column {
   /** Returns the array to determine whether each position of the column is 
null or not. */
   boolean[] isNull();
 
+  /**
+   * Set the given range as null.
+   * @param start start position (inclusive)
+   * @param end end position (exclusive)
+   */
+  void setNull(int start, int end);
+
   /** Returns the number of positions in this block. */
   int getPositionCount();
 
@@ -152,4 +160,14 @@ public interface Column {
   void reverse();
 
   int getInstanceSize();
+
+  void setPositionCount(int count);
+
+  default void reset() {
+    setPositionCount(0);
+    final boolean[] isNulls = isNull();
+    if (isNulls != null) {
+      Arrays.fill(isNulls, false);
+    }
+  }
 }
diff --git a/tsfile/src/main/java/org/apache/tsfile/common/cache/LRUCache.java 
b/tsfile/src/main/java/org/apache/tsfile/common/cache/LRUCache.java
index 9b79825b..a00469a5 100644
--- a/tsfile/src/main/java/org/apache/tsfile/common/cache/LRUCache.java
+++ b/tsfile/src/main/java/org/apache/tsfile/common/cache/LRUCache.java
@@ -51,6 +51,10 @@ public abstract class LRUCache<K, T> implements Cache<K, T> {
     }
   }
 
+  public boolean containsKey(K key) {
+    return cache.containsKey(key);
+  }
+
   @Override
   public synchronized void clear() {
     cache.clear();
diff --git 
a/tsfile/src/main/java/org/apache/tsfile/file/metadata/IDeviceID.java 
b/tsfile/src/main/java/org/apache/tsfile/file/metadata/IDeviceID.java
index 5b3e7255..5fadc0ab 100644
--- a/tsfile/src/main/java/org/apache/tsfile/file/metadata/IDeviceID.java
+++ b/tsfile/src/main/java/org/apache/tsfile/file/metadata/IDeviceID.java
@@ -57,7 +57,7 @@ public interface IDeviceID extends Comparable<IDeviceID>, 
Accountable {
    * @return i-th segment in this DeviceId.
    * @throws ArrayIndexOutOfBoundsException if i >= segmentNum().
    */
-  String segment(int i);
+  Object segment(int i);
 
   static IDeviceID deserializeFrom(ByteBuffer byteBuffer) {
     return new PlainDeviceID(ReadWriteIOUtils.readVarIntString(byteBuffer));
diff --git 
a/tsfile/src/main/java/org/apache/tsfile/file/metadata/StringArrayDeviceID.java 
b/tsfile/src/main/java/org/apache/tsfile/file/metadata/StringArrayDeviceID.java
index de71419c..14901c36 100644
--- 
a/tsfile/src/main/java/org/apache/tsfile/file/metadata/StringArrayDeviceID.java
+++ 
b/tsfile/src/main/java/org/apache/tsfile/file/metadata/StringArrayDeviceID.java
@@ -105,7 +105,8 @@ public class StringArrayDeviceID implements IDeviceID {
         // the other ID is a prefix of this one
         return 1;
       }
-      final int comp = Objects.compare(this.segment(i), o.segment(i), 
WriteUtils::compareStrings);
+      final int comp = Objects.compare(this.segment(i), ((String) 
o.segment(i)),
+          WriteUtils::compareStrings);
       if (comp != 0) {
         // the partial comparison has a result
         return comp;
diff --git 
a/tsfile/src/main/java/org/apache/tsfile/file/metadata/TableSchema.java 
b/tsfile/src/main/java/org/apache/tsfile/file/metadata/TableSchema.java
index f498ed9d..870ef2e4 100644
--- a/tsfile/src/main/java/org/apache/tsfile/file/metadata/TableSchema.java
+++ b/tsfile/src/main/java/org/apache/tsfile/file/metadata/TableSchema.java
@@ -76,6 +76,11 @@ public class TableSchema {
             });
   }
 
+  public MeasurementSchema findColumnSchema(String columnName) {
+    final int columnIndex = findColumnIndex(columnName);
+    return columnIndex >= 0 ? columnSchemas.get(columnIndex) : null;
+  }
+
   public void update(ChunkGroupMetadata chunkGroupMetadata) {
     if (!updatable) {
       return;
diff --git 
a/tsfile/src/main/java/org/apache/tsfile/read/TsFileSequenceReader.java 
b/tsfile/src/main/java/org/apache/tsfile/read/TsFileSequenceReader.java
index fb50e186..ad9478aa 100644
--- a/tsfile/src/main/java/org/apache/tsfile/read/TsFileSequenceReader.java
+++ b/tsfile/src/main/java/org/apache/tsfile/read/TsFileSequenceReader.java
@@ -570,9 +570,10 @@ public class TsFileSequenceReader implements AutoCloseable 
{
 
   // This method is only used for TsFile
   public List<ITimeSeriesMetadata> readITimeseriesMetadata(
-      IDeviceID device, Set<String> measurements) throws IOException {
+      IDeviceID device, Set<String> measurements, MetadataIndexNode root) 
throws IOException {
     readFileMetadata();
     MetadataIndexNode deviceMetadataIndexNode =
+        root != null ? root :
         
tsFileMetaData.getTableMetadataIndexNodeMap().get(device.getTableName());
     Pair<IMetadataIndexEntry, Long> metadataIndexPair =
         getMetadataAndEndOffsetOfDeviceNode(deviceMetadataIndexNode, device, 
false);
@@ -2169,6 +2170,33 @@ public class TsFileSequenceReader implements 
AutoCloseable {
     return chunkMetadataList;
   }
 
+  public List<IChunkMetadata> getIChunkMetadataList(IDeviceID deviceID, String 
measurementName) throws IOException {
+    List<ITimeSeriesMetadata> timeseriesMetaData = 
readITimeseriesMetadata(deviceID,
+        Collections.singleton(measurementName), null);
+    if (timeseriesMetaData == null || timeseriesMetaData.isEmpty()) {
+      return Collections.emptyList();
+    }
+    List<IChunkMetadata> chunkMetadataList = 
readIChunkMetaDataList(timeseriesMetaData.get(0));
+    
chunkMetadataList.sort(Comparator.comparingLong(IChunkMetadata::getStartTime));
+    return chunkMetadataList;
+  }
+
+  public List<List<IChunkMetadata>> getIChunkMetadataList(IDeviceID deviceID,
+      Set<String> measurementNames, MetadataIndexNode root) throws IOException 
{
+    List<ITimeSeriesMetadata> timeseriesMetaData = 
readITimeseriesMetadata(deviceID,
+        measurementNames, root);
+    if (timeseriesMetaData == null || timeseriesMetaData.isEmpty()) {
+      return Collections.emptyList();
+    }
+    List<List<IChunkMetadata>> results = new 
ArrayList<>(timeseriesMetaData.size());
+    for (ITimeSeriesMetadata timeseriesMetaDatum : timeseriesMetaData) {
+      List<IChunkMetadata> chunkMetadataList = 
readIChunkMetaDataList(timeseriesMetaDatum);
+      
chunkMetadataList.sort(Comparator.comparingLong(org.apache.tsfile.file.metadata.IChunkMetadata::getStartTime));
+      results.add(chunkMetadataList);
+    }
+    return results;
+  }
+
   public List<ChunkMetadata> getChunkMetadataList(Path path) throws 
IOException {
     return getChunkMetadataList(path, false);
   }
diff --git 
a/tsfile/src/main/java/org/apache/tsfile/read/common/block/TsBlock.java 
b/tsfile/src/main/java/org/apache/tsfile/read/common/block/TsBlock.java
index d6b7ca3d..43eb29b4 100644
--- a/tsfile/src/main/java/org/apache/tsfile/read/common/block/TsBlock.java
+++ b/tsfile/src/main/java/org/apache/tsfile/read/common/block/TsBlock.java
@@ -19,9 +19,12 @@
 
 package org.apache.tsfile.read.common.block;
 
+import java.util.List;
 import org.apache.tsfile.block.column.Column;
+import org.apache.tsfile.file.metadata.TableSchema;
 import org.apache.tsfile.read.TimeValuePair;
 import org.apache.tsfile.read.common.IBatchDataIterator;
+import org.apache.tsfile.read.common.block.column.ColumnFactory;
 import org.apache.tsfile.read.common.block.column.TimeColumn;
 import org.apache.tsfile.read.reader.IPointReader;
 import org.apache.tsfile.utils.RamUsageEstimator;
@@ -31,6 +34,7 @@ import org.apache.tsfile.write.UnSupportedDataTypeException;
 import java.util.Arrays;
 import java.util.Iterator;
 import java.util.NoSuchElementException;
+import org.apache.tsfile.write.schema.MeasurementSchema;
 
 import static java.lang.String.format;
 import static java.util.Objects.requireNonNull;
@@ -61,7 +65,7 @@ public class TsBlock {
   private final Column[] valueColumns;
 
   /** How many rows in current TsBlock */
-  private final int positionCount;
+  private int positionCount;
 
   private volatile long retainedSizeInBytes = -1;
 
@@ -98,6 +102,10 @@ public class TsBlock {
     return positionCount;
   }
 
+  public void setPositionCount(int positionCount) {
+    this.positionCount = positionCount;
+  }
+
   public long getStartTime() {
     return timeColumn.getStartTime();
   }
@@ -237,6 +245,17 @@ public class TsBlock {
     return new TsBlockAlignedRowIterator(0);
   }
 
+  public void reset() {
+    if (positionCount == 0) {
+      return;
+    }
+    positionCount = 0;
+    timeColumn.reset();
+    for (Column valueColumn : valueColumns) {
+      valueColumn.reset();
+    }
+  }
+
   public class TsBlockSingleColumnIterator implements IPointReader, 
IBatchDataIterator {
 
     private int rowIndex;
@@ -553,4 +572,27 @@ public class TsBlock {
       }
     }
   }
+
+  public static TsBlock buildTsBlock(List<String> columnNames, TableSchema 
schema, int blockSize) {
+    TimeColumn timeColumn = new TimeColumn(blockSize);
+    Column[] columns = new Column[columnNames.size()];
+    for (int i = 0; i < columnNames.size(); i++) {
+      final String columnName = columnNames.get(i);
+      final MeasurementSchema columnSchema = 
schema.findColumnSchema(columnName);
+      columns[i] = ColumnFactory.create(columnSchema.getType(), blockSize);
+    }
+    return new TsBlock(timeColumn, columns);
+  }
+
+  /**
+   * For each column, if its positionCount < this. positionCount, add nulls at 
the end of the
+   * column.
+   */
+  public void fillTrailingNulls() {
+    for (Column valueColumn : valueColumns) {
+      if (valueColumn.getPositionCount() < this.positionCount) {
+        valueColumn.setNull(valueColumn.getPositionCount(), 
this.positionCount);
+      }
+    }
+  }
 }
diff --git 
a/tsfile/src/main/java/org/apache/tsfile/read/common/block/column/BinaryColumn.java
 
b/tsfile/src/main/java/org/apache/tsfile/read/common/block/column/BinaryColumn.java
index facd7470..05c3956d 100644
--- 
a/tsfile/src/main/java/org/apache/tsfile/read/common/block/column/BinaryColumn.java
+++ 
b/tsfile/src/main/java/org/apache/tsfile/read/common/block/column/BinaryColumn.java
@@ -39,12 +39,16 @@ public class BinaryColumn implements Column {
       (int) RamUsageEstimator.shallowSizeOfInstance(BinaryColumn.class);
 
   private final int arrayOffset;
-  private final int positionCount;
-  private final boolean[] valueIsNull;
+  private int positionCount;
+  private boolean[] valueIsNull;
   private final Binary[] values;
 
   private final long retainedSizeInBytes;
 
+  public BinaryColumn(int initialCapacity) {
+    this(0, 0, null, new Binary[initialCapacity]);
+  }
+
   public BinaryColumn(int positionCount, Optional<boolean[]> valueIsNull, 
Binary[] values) {
     this(0, positionCount, valueIsNull.orElse(null), values);
   }
@@ -169,4 +173,17 @@ public class BinaryColumn implements Column {
   public int getInstanceSize() {
     return INSTANCE_SIZE;
   }
+
+  @Override
+  public void setPositionCount(int count) {
+    positionCount = count;
+  }
+
+  @Override
+  public void setNull(int start, int end) {
+    if (valueIsNull == null) {
+      valueIsNull = new boolean[values.length];
+    }
+    Arrays.fill(valueIsNull, start, end, true);
+  }
 }
diff --git 
a/tsfile/src/main/java/org/apache/tsfile/read/common/block/column/BooleanColumn.java
 
b/tsfile/src/main/java/org/apache/tsfile/read/common/block/column/BooleanColumn.java
index d7d7df68..09b5ec52 100644
--- 
a/tsfile/src/main/java/org/apache/tsfile/read/common/block/column/BooleanColumn.java
+++ 
b/tsfile/src/main/java/org/apache/tsfile/read/common/block/column/BooleanColumn.java
@@ -38,12 +38,16 @@ public class BooleanColumn implements Column {
   public static final int SIZE_IN_BYTES_PER_POSITION = Byte.BYTES + Byte.BYTES;
 
   private final int arrayOffset;
-  private final int positionCount;
-  private final boolean[] valueIsNull;
+  private int positionCount;
+  private boolean[] valueIsNull;
   private final boolean[] values;
 
   private final long retainedSizeInBytes;
 
+  public BooleanColumn(int initialCapacity) {
+    this(0, 0, null, new boolean[initialCapacity]);
+  }
+
   public BooleanColumn(int positionCount, Optional<boolean[]> valueIsNull, 
boolean[] values) {
     this(0, positionCount, valueIsNull.orElse(null), values);
   }
@@ -167,4 +171,17 @@ public class BooleanColumn implements Column {
   public int getInstanceSize() {
     return INSTANCE_SIZE;
   }
+
+  @Override
+  public void setPositionCount(int count) {
+    positionCount = count;
+  }
+
+  @Override
+  public void setNull(int start, int end) {
+    if (valueIsNull == null) {
+      valueIsNull = new boolean[values.length];
+    }
+    Arrays.fill(valueIsNull, start, end, true);
+  }
 }
diff --git 
a/tsfile/src/main/java/org/apache/tsfile/read/common/block/column/ColumnFactory.java
 
b/tsfile/src/main/java/org/apache/tsfile/read/common/block/column/ColumnFactory.java
new file mode 100644
index 00000000..42984c3d
--- /dev/null
+++ 
b/tsfile/src/main/java/org/apache/tsfile/read/common/block/column/ColumnFactory.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tsfile.read.common.block.column;
+
+import org.apache.tsfile.block.column.Column;
+import org.apache.tsfile.enums.TSDataType;
+
+public class ColumnFactory {
+  public static Column create(TSDataType dataType, int initialCapacity) {
+    switch (dataType) {
+      case INT64:
+        return new LongColumn(initialCapacity);
+      case DOUBLE:
+        return new DoubleColumn(initialCapacity);
+      case FLOAT:
+        return new FloatColumn(initialCapacity);
+      case TEXT:
+        return new BinaryColumn(initialCapacity);
+      case INT32:
+        return new IntColumn(initialCapacity);
+      case BOOLEAN:
+        return new BooleanColumn(initialCapacity);
+      default:
+        throw new IllegalArgumentException("Unsupported data type: " + 
dataType);
+    }
+  }
+}
diff --git 
a/tsfile/src/main/java/org/apache/tsfile/read/common/block/column/DoubleColumn.java
 
b/tsfile/src/main/java/org/apache/tsfile/read/common/block/column/DoubleColumn.java
index 5047028a..e643812f 100644
--- 
a/tsfile/src/main/java/org/apache/tsfile/read/common/block/column/DoubleColumn.java
+++ 
b/tsfile/src/main/java/org/apache/tsfile/read/common/block/column/DoubleColumn.java
@@ -39,12 +39,16 @@ public class DoubleColumn implements Column {
   public static final int SIZE_IN_BYTES_PER_POSITION = Double.BYTES + 
Byte.BYTES;
 
   private final int arrayOffset;
-  private final int positionCount;
-  private final boolean[] valueIsNull;
+  private int positionCount;
+  private boolean[] valueIsNull;
   private final double[] values;
 
   private final long retainedSizeInBytes;
 
+  public DoubleColumn(int initialCapacity) {
+    this(0, 0, null, new double[initialCapacity]);
+  }
+
   public DoubleColumn(int positionCount, Optional<boolean[]> valueIsNull, 
double[] values) {
     this(0, positionCount, valueIsNull.orElse(null), values);
   }
@@ -168,4 +172,17 @@ public class DoubleColumn implements Column {
   public int getInstanceSize() {
     return INSTANCE_SIZE;
   }
+
+  @Override
+  public void setPositionCount(int count) {
+    positionCount = count;
+  }
+
+  @Override
+  public void setNull(int start, int end) {
+    if (valueIsNull == null) {
+      valueIsNull = new boolean[values.length];
+    }
+    Arrays.fill(valueIsNull, start, end, true);
+  }
 }
diff --git 
a/tsfile/src/main/java/org/apache/tsfile/read/common/block/column/FloatColumn.java
 
b/tsfile/src/main/java/org/apache/tsfile/read/common/block/column/FloatColumn.java
index 4050ad17..c06f190b 100644
--- 
a/tsfile/src/main/java/org/apache/tsfile/read/common/block/column/FloatColumn.java
+++ 
b/tsfile/src/main/java/org/apache/tsfile/read/common/block/column/FloatColumn.java
@@ -39,12 +39,16 @@ public class FloatColumn implements Column {
   public static final int SIZE_IN_BYTES_PER_POSITION = Float.BYTES + 
Byte.BYTES;
 
   private final int arrayOffset;
-  private final int positionCount;
-  private final boolean[] valueIsNull;
+  private int positionCount;
+  private boolean[] valueIsNull;
   private final float[] values;
 
   private final long retainedSizeInBytes;
 
+  public FloatColumn(int initialCapacity) {
+    this(0, 0, null, new float[initialCapacity]);
+  }
+
   public FloatColumn(int positionCount, Optional<boolean[]> valueIsNull, 
float[] values) {
     this(0, positionCount, valueIsNull.orElse(null), values);
   }
@@ -167,4 +171,17 @@ public class FloatColumn implements Column {
   public int getInstanceSize() {
     return INSTANCE_SIZE;
   }
+
+  @Override
+  public void setPositionCount(int count) {
+    positionCount = count;
+  }
+
+  @Override
+  public void setNull(int start, int end) {
+    if (valueIsNull == null) {
+      valueIsNull = new boolean[values.length];
+    }
+    Arrays.fill(valueIsNull, start, end, true);
+  }
 }
diff --git 
a/tsfile/src/main/java/org/apache/tsfile/read/common/block/column/IntColumn.java
 
b/tsfile/src/main/java/org/apache/tsfile/read/common/block/column/IntColumn.java
index 8355f28d..7fdcc93f 100644
--- 
a/tsfile/src/main/java/org/apache/tsfile/read/common/block/column/IntColumn.java
+++ 
b/tsfile/src/main/java/org/apache/tsfile/read/common/block/column/IntColumn.java
@@ -39,12 +39,16 @@ public class IntColumn implements Column {
   public static final int SIZE_IN_BYTES_PER_POSITION = Integer.BYTES + 
Byte.BYTES;
 
   private final int arrayOffset;
-  private final int positionCount;
-  private final boolean[] valueIsNull;
+  private int positionCount;
+  private boolean[] valueIsNull;
   private final int[] values;
 
   private final long retainedSizeInBytes;
 
+  public IntColumn(int initialCapacity) {
+    this(0, 0, null, new int[initialCapacity]);
+  }
+
   public IntColumn(int positionCount, Optional<boolean[]> valueIsNull, int[] 
values) {
     this(0, positionCount, valueIsNull.orElse(null), values);
   }
@@ -167,4 +171,17 @@ public class IntColumn implements Column {
   public int getInstanceSize() {
     return INSTANCE_SIZE;
   }
+
+  @Override
+  public void setPositionCount(int count) {
+    this.positionCount = count;
+  }
+
+  @Override
+  public void setNull(int start, int end) {
+    if (valueIsNull == null) {
+      valueIsNull = new boolean[values.length];
+    }
+    Arrays.fill(valueIsNull, start, end, true);
+  }
 }
diff --git 
a/tsfile/src/main/java/org/apache/tsfile/read/common/block/column/LongColumn.java
 
b/tsfile/src/main/java/org/apache/tsfile/read/common/block/column/LongColumn.java
index 59b8e016..3045e581 100644
--- 
a/tsfile/src/main/java/org/apache/tsfile/read/common/block/column/LongColumn.java
+++ 
b/tsfile/src/main/java/org/apache/tsfile/read/common/block/column/LongColumn.java
@@ -39,12 +39,16 @@ public class LongColumn implements Column {
   public static final int SIZE_IN_BYTES_PER_POSITION = Long.BYTES + Byte.BYTES;
 
   private final int arrayOffset;
-  private final int positionCount;
-  private final boolean[] valueIsNull;
+  private int positionCount;
+  private boolean[] valueIsNull;
   private final long[] values;
 
   private final long retainedSizeInBytes;
 
+  public LongColumn(int initialCapacity) {
+    this(0, 0, null, new long[initialCapacity]);
+  }
+
   public LongColumn(int positionCount, Optional<boolean[]> valueIsNull, long[] 
values) {
     this(0, positionCount, valueIsNull.orElse(null), values);
   }
@@ -167,4 +171,17 @@ public class LongColumn implements Column {
   public int getInstanceSize() {
     return INSTANCE_SIZE;
   }
+
+  @Override
+  public void setPositionCount(int count) {
+    this.positionCount = count;
+  }
+
+  @Override
+  public void setNull(int start, int end) {
+    if (valueIsNull == null) {
+      valueIsNull = new boolean[values.length];
+    }
+    Arrays.fill(valueIsNull, start, end, true);
+  }
 }
diff --git 
a/tsfile/src/main/java/org/apache/tsfile/read/common/block/column/NullColumn.java
 
b/tsfile/src/main/java/org/apache/tsfile/read/common/block/column/NullColumn.java
index be21cfa6..7e19b26b 100644
--- 
a/tsfile/src/main/java/org/apache/tsfile/read/common/block/column/NullColumn.java
+++ 
b/tsfile/src/main/java/org/apache/tsfile/read/common/block/column/NullColumn.java
@@ -36,7 +36,7 @@ public class NullColumn implements Column {
   private static final int INSTANCE_SIZE =
       (int) RamUsageEstimator.shallowSizeOfInstance(BooleanColumn.class);
 
-  private final int positionCount;
+  private int positionCount;
 
   private final long retainedSizeInBytes;
 
@@ -126,4 +126,9 @@ public class NullColumn implements Column {
   public int getInstanceSize() {
     return INSTANCE_SIZE;
   }
+
+  @Override
+  public void setPositionCount(int count) {
+    this.positionCount = count;
+  }
 }
diff --git 
a/tsfile/src/main/java/org/apache/tsfile/read/common/block/column/RunLengthEncodedColumn.java
 
b/tsfile/src/main/java/org/apache/tsfile/read/common/block/column/RunLengthEncodedColumn.java
index dbca629f..bdb557f1 100644
--- 
a/tsfile/src/main/java/org/apache/tsfile/read/common/block/column/RunLengthEncodedColumn.java
+++ 
b/tsfile/src/main/java/org/apache/tsfile/read/common/block/column/RunLengthEncodedColumn.java
@@ -37,7 +37,7 @@ public class RunLengthEncodedColumn implements Column {
       (int) 
RamUsageEstimator.shallowSizeOfInstance(RunLengthEncodedColumn.class);
 
   private final Column value;
-  private final int positionCount;
+  private int positionCount;
 
   public RunLengthEncodedColumn(Column value, int positionCount) {
     requireNonNull(value, "value is null");
@@ -214,4 +214,14 @@ public class RunLengthEncodedColumn implements Column {
   public int getInstanceSize() {
     return INSTANCE_SIZE;
   }
+
+  @Override
+  public void setPositionCount(int count) {
+    this.positionCount = count;
+  }
+
+  @Override
+  public void setNull(int start, int end) {
+    value.setNull(start, end);
+  }
 }
diff --git 
a/tsfile/src/main/java/org/apache/tsfile/read/common/block/column/TimeColumn.java
 
b/tsfile/src/main/java/org/apache/tsfile/read/common/block/column/TimeColumn.java
index aa629d52..2d2d364b 100644
--- 
a/tsfile/src/main/java/org/apache/tsfile/read/common/block/column/TimeColumn.java
+++ 
b/tsfile/src/main/java/org/apache/tsfile/read/common/block/column/TimeColumn.java
@@ -19,6 +19,7 @@
 
 package org.apache.tsfile.read.common.block.column;
 
+import java.util.Arrays;
 import org.apache.tsfile.block.column.Column;
 import org.apache.tsfile.block.column.ColumnEncoding;
 import org.apache.tsfile.enums.TSDataType;
@@ -33,12 +34,16 @@ public class TimeColumn implements Column {
   public static final int SIZE_IN_BYTES_PER_POSITION = Long.BYTES;
 
   private final int arrayOffset;
-  private final int positionCount;
+  private int positionCount;
 
   private final long[] values;
 
   private final long retainedSizeInBytes;
 
+  public TimeColumn(int initialCapacity) {
+    this(0, 0, new long[initialCapacity]);
+  }
+
   public TimeColumn(int positionCount, long[] values) {
     this(0, positionCount, values);
   }
@@ -154,4 +159,13 @@ public class TimeColumn implements Column {
   public int getInstanceSize() {
     return INSTANCE_SIZE;
   }
+
+  @Override
+  public void setPositionCount(int count) {
+    this.positionCount = positionCount;
+  }
+
+  @Override
+  public void setNull(int start, int end) {
+  }
 }
diff --git 
a/tsfile/src/main/java/org/apache/tsfile/read/controller/IMetadataQuerier.java 
b/tsfile/src/main/java/org/apache/tsfile/read/controller/IMetadataQuerier.java
index 09741943..844de99a 100644
--- 
a/tsfile/src/main/java/org/apache/tsfile/read/controller/IMetadataQuerier.java
+++ 
b/tsfile/src/main/java/org/apache/tsfile/read/controller/IMetadataQuerier.java
@@ -20,6 +20,7 @@
 package org.apache.tsfile.read.controller;
 
 import java.util.Iterator;
+import java.util.Set;
 import org.apache.tsfile.enums.TSDataType;
 import org.apache.tsfile.exception.write.NoMeasurementException;
 import org.apache.tsfile.file.metadata.IChunkMetadata;
@@ -39,6 +40,17 @@ public interface IMetadataQuerier {
 
   List<IChunkMetadata> getChunkMetaDataList(Path path) throws IOException;
 
+  /**
+   *
+   * @param deviceID the deviceID to be queried
+   * @param measurementNames the measurementNames to be queried
+   * @param measurementNode nullable, if provided, the search will start from 
the node
+   * @return each list is the ChunkMetadata of those timeseries who exists
+   * @throws IOException if IO error occurs
+   */
+  List<List<IChunkMetadata>> getChunkMetadataLists(IDeviceID deviceID,
+      Set<String> measurementNames, MetadataIndexNode measurementNode) throws 
IOException;
+
   Map<Path, List<IChunkMetadata>> getChunkMetaDataMap(List<Path> paths) throws 
IOException;
 
   TsFileMetadata getWholeFileMetadata();
diff --git 
a/tsfile/src/main/java/org/apache/tsfile/read/controller/MetadataQuerierByFileImpl.java
 
b/tsfile/src/main/java/org/apache/tsfile/read/controller/MetadataQuerierByFileImpl.java
index d7fa458f..18426390 100644
--- 
a/tsfile/src/main/java/org/apache/tsfile/read/controller/MetadataQuerierByFileImpl.java
+++ 
b/tsfile/src/main/java/org/apache/tsfile/read/controller/MetadataQuerierByFileImpl.java
@@ -30,6 +30,7 @@ import org.apache.tsfile.file.metadata.ITimeSeriesMetadata;
 import org.apache.tsfile.file.metadata.MetadataIndexNode;
 import org.apache.tsfile.file.metadata.TimeseriesMetadata;
 import org.apache.tsfile.file.metadata.TsFileMetadata;
+import org.apache.tsfile.file.metadata.enums.MetadataIndexNodeType;
 import org.apache.tsfile.read.TsFileSequenceReader;
 import org.apache.tsfile.read.TsFileSequenceReader.LocateStatus;
 import org.apache.tsfile.read.common.Path;
@@ -58,6 +59,7 @@ public class MetadataQuerierByFileImpl implements 
IMetadataQuerier {
 
   // TimeseriesPath -> List<IChunkMetadata>
   private LRUCache<Path, List<IChunkMetadata>> chunkMetaDataCache;
+  private LRUCache<Pair<IDeviceID, String>, List<IChunkMetadata>> 
deviceIdChunkMetadataCache;
 
   private TsFileSequenceReader tsFileReader;
 
@@ -72,6 +74,14 @@ public class MetadataQuerierByFileImpl implements 
IMetadataQuerier {
             return loadChunkMetadata(key);
           }
         };
+    deviceIdChunkMetadataCache =
+        new LRUCache<Pair<IDeviceID, String>, 
List<IChunkMetadata>>(CACHED_ENTRY_NUMBER) {
+          @Override
+          protected List<IChunkMetadata> loadObjectByKey(Pair<IDeviceID, 
String> key)
+              throws IOException {
+            return loadChunkMetadata(key);
+          }
+        };
   }
 
   @Override
@@ -79,6 +89,33 @@ public class MetadataQuerierByFileImpl implements 
IMetadataQuerier {
     return new ArrayList<>(chunkMetaDataCache.get(timeseriesPath));
   }
 
+  public List<List<IChunkMetadata>> getChunkMetadataLists(IDeviceID deviceID,
+      Set<String> measurementNames, MetadataIndexNode measurementNode) throws 
IOException {
+    List<List<IChunkMetadata>> results = new 
ArrayList<>(measurementNames.size());
+    final Iterator<String> iterator = measurementNames.iterator();
+    // use cache when possible
+    while (iterator.hasNext()) {
+      final String measurementName = iterator.next();
+      // check first to avoid loading
+      final Pair<IDeviceID, String> key = new Pair<>(deviceID, 
measurementName);
+      if (deviceIdChunkMetadataCache.containsKey(key)) {
+        final List<IChunkMetadata> metadataList = 
deviceIdChunkMetadataCache.get(key);
+        results.add(metadataList);
+        iterator.remove();
+      }
+    }
+    // the remaining is not in the cache, search them in file
+    final List<List<IChunkMetadata>> iChunkMetadataList = 
tsFileReader.getIChunkMetadataList(
+        deviceID, measurementNames, measurementNode);
+    for (List<IChunkMetadata> metadataList : iChunkMetadataList) {
+      final String measurementUid = metadataList.get(0).getMeasurementUid();
+      // cache the result
+      deviceIdChunkMetadataCache.put(new Pair<>(deviceID, measurementUid), 
metadataList);
+      results.add(metadataList);
+    }
+    return results;
+  }
+
   @Override
   public Map<Path, List<IChunkMetadata>> getChunkMetaDataMap(List<Path> paths) 
throws IOException {
     Map<Path, List<IChunkMetadata>> chunkMetaDatas = new HashMap<>();
@@ -122,7 +159,7 @@ public class MetadataQuerierByFileImpl implements 
IMetadataQuerier {
       }
 
       List<ITimeSeriesMetadata> timeseriesMetaDataList =
-          tsFileReader.readITimeseriesMetadata(selectedDevice, 
selectedMeasurements);
+          tsFileReader.readITimeseriesMetadata(selectedDevice, 
selectedMeasurements, null);
       for (ITimeSeriesMetadata timeseriesMetadata : timeseriesMetaDataList) {
         List<IChunkMetadata> chunkMetadataList =
             tsFileReader.readIChunkMetaDataList(timeseriesMetadata);
@@ -160,6 +197,10 @@ public class MetadataQuerierByFileImpl implements 
IMetadataQuerier {
     return tsFileReader.getIChunkMetadataList(path);
   }
 
+  private List<IChunkMetadata> loadChunkMetadata(Pair<IDeviceID, String> key) 
throws IOException {
+    return tsFileReader.getIChunkMetadataList(key.getLeft(), key.right);
+  }
+
   @Override
   @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity 
warning
   public List<TimeRange> convertSpace2TimePartition(
diff --git 
a/tsfile/src/main/java/org/apache/tsfile/read/expression/ExpressionTree.java 
b/tsfile/src/main/java/org/apache/tsfile/read/expression/ExpressionTree.java
index 7dd6e427..b22dc872 100644
--- a/tsfile/src/main/java/org/apache/tsfile/read/expression/ExpressionTree.java
+++ b/tsfile/src/main/java/org/apache/tsfile/read/expression/ExpressionTree.java
@@ -1,5 +1,8 @@
 package org.apache.tsfile.read.expression;
 
+import org.apache.tsfile.read.filter.basic.Filter;
+
 public interface ExpressionTree {
   boolean satisfy(Object value);
+  Filter toFilter();
 }
diff --git 
a/tsfile/src/main/java/org/apache/tsfile/read/query/executor/TableQueryExecutor.java
 
b/tsfile/src/main/java/org/apache/tsfile/read/query/executor/TableQueryExecutor.java
index 112acf66..954719a4 100644
--- 
a/tsfile/src/main/java/org/apache/tsfile/read/query/executor/TableQueryExecutor.java
+++ 
b/tsfile/src/main/java/org/apache/tsfile/read/query/executor/TableQueryExecutor.java
@@ -3,8 +3,10 @@ package org.apache.tsfile.read.query.executor;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import org.apache.tsfile.exception.read.NoColumnException;
 import org.apache.tsfile.exception.read.ReadProcessException;
 import org.apache.tsfile.exception.read.UnsupportedOrderingException;
@@ -25,6 +27,7 @@ public class TableQueryExecutor {
   private IMetadataQuerier metadataQuerier;
   private IChunkLoader chunkLoader;
   private TableQueryOrdering tableQueryOrdering;
+  private int blockSize = 1024;
 
   public TableQueryExecutor(IMetadataQuerier metadataQuerier, IChunkLoader 
chunkLoader,
       TableQueryOrdering tableQueryOrdering) {
@@ -48,12 +51,14 @@ public class TableQueryExecutor {
       String column = columns.get(i);
       columnMapping.add(column, i, tableSchema);
     }
+    columnMapping.add(measurementFilter);
 
     DeviceTaskIterator deviceTaskIterator = new DeviceTaskIterator(columns, 
tableRoot,
-        columnMapping, metadataQuerier, idFilter);
+        columnMapping, metadataQuerier, idFilter, tableSchema);
     switch (tableQueryOrdering) {
       case DEVICE:
-        return new DeviceOrderedTsBlockReader(deviceTaskIterator, 
metadataQuerier, chunkLoader);
+        return new DeviceOrderedTsBlockReader(deviceTaskIterator, 
metadataQuerier, chunkLoader,
+            timeFilter, measurementFilter, blockSize);
       case TIME:
       default:
         throw new UnsupportedOrderingException(tableQueryOrdering.toString());
@@ -66,7 +71,8 @@ public class TableQueryExecutor {
      * This mapping is used to put data of the same series into multiple 
columns.
      */
     private Map<String, List<Integer>> columnPosMap = new HashMap<>();
-    private Map<String, Boolean> isIdMap = new HashMap<>();
+    private Set<String> idColumns = new HashSet<>();
+    private Set<String> measurementColumns = new HashSet<>();
 
     public void add(String columnName, int i, TableSchema schema) throws 
NoColumnException {
       final int columnIndex = schema.findColumnIndex(columnName);
@@ -76,7 +82,15 @@ public class TableQueryExecutor {
 
       final ColumnType columnType = schema.getColumnTypes().get(columnIndex);
       columnPosMap.computeIfAbsent(columnName, k -> new ArrayList<>()).add(i);
-      isIdMap.put(columnName, columnType.equals(ColumnType.ID));
+      if (columnType.equals(ColumnType.ID)) {
+        idColumns.add(columnName);
+      } else {
+        measurementColumns.add(columnName);
+      }
+    }
+
+    public void add(ExpressionTree measurementFilter) {
+      //TODO: get measurements in the filter and add them to measurementColumns
     }
 
     public List<Integer> getColumnPos(String columnName) {
@@ -84,7 +98,19 @@ public class TableQueryExecutor {
     }
 
     public boolean isId(String columnName) {
-      return isIdMap.getOrDefault(columnName, false);
+      return idColumns.contains(columnName);
+    }
+
+    public boolean isMeasurement(String columnName) {
+      return measurementColumns.contains(columnName);
+    }
+
+    public Set<String> getIdColumns() {
+      return idColumns;
+    }
+
+    public Set<String> getMeasurementColumns() {
+      return measurementColumns;
     }
   }
 
diff --git 
a/tsfile/src/main/java/org/apache/tsfile/read/query/executor/task/DeviceQueryTask.java
 
b/tsfile/src/main/java/org/apache/tsfile/read/query/executor/task/DeviceQueryTask.java
index 3744fe9d..6b1510c5 100644
--- 
a/tsfile/src/main/java/org/apache/tsfile/read/query/executor/task/DeviceQueryTask.java
+++ 
b/tsfile/src/main/java/org/apache/tsfile/read/query/executor/task/DeviceQueryTask.java
@@ -22,20 +22,23 @@ package org.apache.tsfile.read.query.executor.task;
 import java.util.List;
 import org.apache.tsfile.file.metadata.IDeviceID;
 import org.apache.tsfile.file.metadata.MetadataIndexNode;
-import org.apache.tsfile.read.query.executor.TsFileExecutor.ColumnMapping;
+import org.apache.tsfile.file.metadata.TableSchema;
+import org.apache.tsfile.read.query.executor.TableQueryExecutor.ColumnMapping;
 
 public class DeviceQueryTask {
-  private IDeviceID deviceID;
-  private List<String> columnNames;
-  private ColumnMapping columnMapping;
-  private MetadataIndexNode indexRoot;
+  private final IDeviceID deviceID;
+  private final List<String> columnNames;
+  private final ColumnMapping columnMapping;
+  private final MetadataIndexNode indexRoot;
+  private final TableSchema tableSchema;
 
   public DeviceQueryTask(IDeviceID deviceID, List<String> columnNames, 
ColumnMapping columnMapping,
-      MetadataIndexNode indexRoot) {
+      MetadataIndexNode indexRoot, TableSchema tableSchema) {
     this.deviceID = deviceID;
     this.columnNames = columnNames;
     this.columnMapping = columnMapping;
     this.indexRoot = indexRoot;
+    this.tableSchema = tableSchema;
   }
 
   public IDeviceID getDeviceID() {
@@ -53,4 +56,16 @@ public class DeviceQueryTask {
   public MetadataIndexNode getIndexRoot() {
     return indexRoot;
   }
+
+  public TableSchema getTableSchema() {
+    return tableSchema;
+  }
+
+  @Override
+  public String toString() {
+    return "DeviceQueryTask{" +
+        "deviceID=" + deviceID +
+        ", columnNames=" + columnNames +
+        '}';
+  }
 }
diff --git 
a/tsfile/src/main/java/org/apache/tsfile/read/query/executor/task/DeviceTaskIterator.java
 
b/tsfile/src/main/java/org/apache/tsfile/read/query/executor/task/DeviceTaskIterator.java
index 9e387ff8..69bdefae 100644
--- 
a/tsfile/src/main/java/org/apache/tsfile/read/query/executor/task/DeviceTaskIterator.java
+++ 
b/tsfile/src/main/java/org/apache/tsfile/read/query/executor/task/DeviceTaskIterator.java
@@ -4,6 +4,7 @@ import java.util.Iterator;
 import java.util.List;
 import org.apache.tsfile.file.metadata.IDeviceID;
 import org.apache.tsfile.file.metadata.MetadataIndexNode;
+import org.apache.tsfile.file.metadata.TableSchema;
 import org.apache.tsfile.read.controller.IMetadataQuerier;
 import org.apache.tsfile.read.expression.ExpressionTree;
 import org.apache.tsfile.read.query.executor.TableQueryExecutor.ColumnMapping;
@@ -12,13 +13,16 @@ import org.apache.tsfile.utils.Pair;
 public class DeviceTaskIterator implements Iterator<DeviceQueryTask> {
   private List<String> columnNames;
   private ColumnMapping columnMapping;
+  private TableSchema tableSchema;
   private Iterator<Pair<IDeviceID, MetadataIndexNode>> deviceMetaIterator;
 
   public DeviceTaskIterator(List<String> columnNames, MetadataIndexNode 
indexRoot,
-      ColumnMapping columnMapping, IMetadataQuerier metadataQuerier, 
ExpressionTree idFilter) {
+      ColumnMapping columnMapping, IMetadataQuerier metadataQuerier, 
ExpressionTree idFilter,
+      TableSchema tableSchema) {
     this.columnNames = columnNames;
     this.columnMapping = columnMapping;
     this.deviceMetaIterator = metadataQuerier.deviceIterator(indexRoot, 
idFilter);
+    this.tableSchema = tableSchema;
   }
 
   @Override
@@ -29,6 +33,6 @@ public class DeviceTaskIterator implements 
Iterator<DeviceQueryTask> {
   @Override
   public DeviceQueryTask next() {
     final Pair<IDeviceID, MetadataIndexNode> next = deviceMetaIterator.next();
-    return new DeviceQueryTask(next.left, columnNames, columnMapping, 
next.right);
+    return new DeviceQueryTask(next.left, columnNames, columnMapping, 
next.right, tableSchema);
   }
 }
diff --git 
a/tsfile/src/main/java/org/apache/tsfile/read/reader/block/DeviceOrderedTsBlockReader.java
 
b/tsfile/src/main/java/org/apache/tsfile/read/reader/block/DeviceOrderedTsBlockReader.java
index a86a628b..8a8c981c 100644
--- 
a/tsfile/src/main/java/org/apache/tsfile/read/reader/block/DeviceOrderedTsBlockReader.java
+++ 
b/tsfile/src/main/java/org/apache/tsfile/read/reader/block/DeviceOrderedTsBlockReader.java
@@ -5,22 +5,33 @@ import java.util.NoSuchElementException;
 import org.apache.tsfile.read.common.block.TsBlock;
 import org.apache.tsfile.read.controller.IChunkLoader;
 import org.apache.tsfile.read.controller.IMetadataQuerier;
+import org.apache.tsfile.read.expression.ExpressionTree;
 import org.apache.tsfile.read.query.executor.task.DeviceQueryTask;
 import org.apache.tsfile.read.query.executor.task.DeviceTaskIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class DeviceOrderedTsBlockReader implements TsBlockReader {
 
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(DeviceOrderedTsBlockReader.class);
   private final DeviceTaskIterator taskIterator;
   private final IMetadataQuerier metadataQuerier;
   private final IChunkLoader chunkLoader;
+  private final int blockSize;
   private SingleDeviceTsBlockReader currentReader;
+  private ExpressionTree timeFilter;
+  private ExpressionTree measurementFilter;
 
   public DeviceOrderedTsBlockReader(DeviceTaskIterator taskIterator,
       IMetadataQuerier metadataQuerier,
-      IChunkLoader chunkLoader) {
+      IChunkLoader chunkLoader, ExpressionTree timeFilter, ExpressionTree 
measurementFilter,
+      int blockSize) {
     this.taskIterator = taskIterator;
     this.metadataQuerier = metadataQuerier;
     this.chunkLoader = chunkLoader;
+    this.blockSize = blockSize;
+    this.timeFilter = timeFilter;
+    this.measurementFilter = measurementFilter;
   }
 
   @Override
@@ -30,7 +41,12 @@ public class DeviceOrderedTsBlockReader implements 
TsBlockReader {
     }
     while (taskIterator.hasNext()) {
       final DeviceQueryTask nextTask = taskIterator.next();
-      currentReader = new SingleDeviceTsBlockReader(nextTask, metadataQuerier, 
chunkLoader);
+      try {
+        currentReader = new SingleDeviceTsBlockReader(nextTask, 
metadataQuerier, chunkLoader,
+            blockSize, timeFilter, measurementFilter);
+      } catch (IOException e) {
+        LOGGER.error("Failed to construct reader for {}", nextTask, e);
+      }
       if (currentReader.hasNext()) {
         return true;
       }
diff --git 
a/tsfile/src/main/java/org/apache/tsfile/read/reader/block/SingleDeviceTsBlockReader.java
 
b/tsfile/src/main/java/org/apache/tsfile/read/reader/block/SingleDeviceTsBlockReader.java
index f61ec47f..2f2c1783 100644
--- 
a/tsfile/src/main/java/org/apache/tsfile/read/reader/block/SingleDeviceTsBlockReader.java
+++ 
b/tsfile/src/main/java/org/apache/tsfile/read/reader/block/SingleDeviceTsBlockReader.java
@@ -1,38 +1,267 @@
 package org.apache.tsfile.read.reader.block;
 
 import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.NoSuchElementException;
+import org.apache.tsfile.block.column.Column;
+import org.apache.tsfile.file.metadata.IChunkMetadata;
+import org.apache.tsfile.read.common.BatchData;
 import org.apache.tsfile.read.common.block.TsBlock;
 import org.apache.tsfile.read.controller.IChunkLoader;
 import org.apache.tsfile.read.controller.IMetadataQuerier;
+import org.apache.tsfile.read.expression.ExpressionTree;
+import org.apache.tsfile.read.filter.basic.Filter;
 import org.apache.tsfile.read.query.executor.task.DeviceQueryTask;
+import org.apache.tsfile.read.reader.series.AbstractFileSeriesReader;
+import org.apache.tsfile.read.reader.series.FileSeriesReader;
+import org.apache.tsfile.utils.Binary;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class SingleDeviceTsBlockReader implements TsBlockReader {
 
-  private DeviceQueryTask task;
-  private IMetadataQuerier metadataQuerier;
-  private IChunkLoader chunkLoader;
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(SingleDeviceTsBlockReader.class);
+  private final DeviceQueryTask task;
+  private final ExpressionTree measurementExpression;
+  private final int blockSize;
+
+  private final TsBlock currentBlock;
+  private boolean lastBlockReturned = true;
+  private final Map<String, MeasurementColumnContext> measureColumnContextMap;
+  private final Map<String, IdColumnContext> idColumnContextMap;
+
+  private long nextTime;
 
   public SingleDeviceTsBlockReader(DeviceQueryTask task, IMetadataQuerier 
metadataQuerier,
-      IChunkLoader chunkLoader) {
+      IChunkLoader chunkLoader, int blockSize, ExpressionTree timeExpression,
+      ExpressionTree measurementFilter) throws IOException {
     this.task = task;
-    this.metadataQuerier = metadataQuerier;
-    this.chunkLoader = chunkLoader;
+    this.blockSize = blockSize;
+    this.measurementExpression = measurementFilter;
+
+    this.currentBlock = TsBlock.buildTsBlock(task.getColumnNames(), 
task.getTableSchema(),
+        blockSize);
+    this.measureColumnContextMap = new HashMap<>();
+    this.idColumnContextMap = new HashMap<>();
+
+    final List<List<IChunkMetadata>> chunkMetadataLists = 
metadataQuerier.getChunkMetadataLists(
+        task.getDeviceID(), task.getColumnMapping()
+            .getMeasurementColumns(), task.getIndexRoot());
 
+    Filter timeFilter = timeExpression == null ? null : 
timeExpression.toFilter();
+    for (List<IChunkMetadata> chunkMetadataList : chunkMetadataLists) {
+      if (!chunkMetadataList.isEmpty()) {
+        final String measurementUid = 
chunkMetadataList.get(0).getMeasurementUid();
+        AbstractFileSeriesReader seriesReader = new 
FileSeriesReader(chunkLoader,
+            chunkMetadataList, timeFilter);
+        if (seriesReader.hasNextBatch()) {
+          measureColumnContextMap.put(measurementUid, new 
MeasurementColumnContext(measurementUid,
+              task.getColumnMapping().getColumnPos(measurementUid), 
seriesReader.nextBatch(),
+              seriesReader));
+        }
+      }
+    }
 
+    for (String idColumn : task.getColumnMapping().getIdColumns()) {
+      final List<Integer> columnPosInResult = 
task.getColumnMapping().getColumnPos(idColumn);
+      final int columnPosInId = 
task.getTableSchema().findColumnIndex(idColumn);
+      idColumnContextMap.put(idColumn, new IdColumnContext(columnPosInResult, 
columnPosInId));
+    }
   }
 
   @Override
   public boolean hasNext() {
+    if (!lastBlockReturned) {
+      return true;
+    }
+
+    if (measureColumnContextMap.isEmpty()) {
+      return false;
+    }
+
+    currentBlock.reset();
+    nextTime = Long.MAX_VALUE;
+    List<MeasurementColumnContext> alignedColumns = new ArrayList<>();
+
+    while (currentBlock.getPositionCount() < blockSize) {
+      // find the minimum time among the batches and the associated columns
+      for (Entry<String, MeasurementColumnContext> entry : 
measureColumnContextMap.entrySet()) {
+        final BatchData batchData = entry.getValue().currentBatch;
+        final long currentTime = batchData.currentTime();
+        if (nextTime > currentTime) {
+          nextTime = currentTime;
+          alignedColumns.clear();
+        } else if (nextTime == currentTime) {
+          alignedColumns.add(entry.getValue());
+        }
+      }
+
+      try {
+        fillMeasurements(alignedColumns);
+      } catch (IOException e) {
+        LOGGER.error("Cannot fill measurements", e);
+        return false;
+      }
+
+      // all columns have exhausted
+      if (measureColumnContextMap.isEmpty()) {
+        break;
+      }
+    }
+
+    if (currentBlock.getPositionCount() > 0) {
+      fillIds();
+      currentBlock.fillTrailingNulls();
+      lastBlockReturned = false;
+      return true;
+    }
+
     return false;
   }
 
+  private void fillIds() {
+    for (Entry<String, IdColumnContext> entry : idColumnContextMap.entrySet()) 
{
+      final IdColumnContext idColumnContext = entry.getValue();
+      for (Integer pos : idColumnContext.posInResult) {
+        final Column column = currentBlock.getColumn(pos);
+        fillIdColumn(column, 
task.getDeviceID().segment(idColumnContext.posInDeviceId), 0,
+            currentBlock.getPositionCount());
+      }
+    }
+  }
+
+  private void fillMeasurements(List<MeasurementColumnContext> alignedColumns) 
throws IOException {
+    if (measurementExpression == null || measurementExpression.satisfy(this)) {
+      // use the time to fill the block
+      final int positionCount = currentBlock.getPositionCount();
+      currentBlock.getTimeColumn().getTimes()[positionCount] = nextTime;
+      // project the value columns to the result
+      for (final MeasurementColumnContext columnContext : alignedColumns) {
+        final BatchData batchData = columnContext.currentBatch;
+        final List<Integer> posInResult = columnContext.posInResult;
+        for (Integer pos : posInResult) {
+          final Column column = currentBlock.getColumn(pos);
+          fillMeasurementColumn(column, batchData, positionCount);
+        }
+
+        batchData.next();
+        if (!batchData.hasCurrent()) {
+          // get next batch of the column
+          if (columnContext.seriesReader.hasNextBatch()) {
+            columnContext.currentBatch = 
columnContext.seriesReader.nextBatch();
+          } else {
+            // no more data in this column
+            measureColumnContextMap.remove(columnContext.columnName);
+          }
+        }
+      }
+      currentBlock.setPositionCount(positionCount + 1);
+    }
+  }
+
+  private void fillIdColumn(Column column, Object val, int startPos, int 
endPos) {
+    switch (column.getDataType()) {
+      case TEXT:
+        if (val instanceof String) {
+          val = new Binary(((String) val), StandardCharsets.UTF_8);
+        }
+        Arrays.fill(column.getBinaries(), startPos, endPos, val);
+        break;
+      case BOOLEAN:
+        Arrays.fill(column.getBooleans(), startPos, endPos, ((boolean) val));
+        break;
+      case INT32:
+        Arrays.fill(column.getInts(), startPos, endPos, ((int) val));
+        break;
+      case INT64:
+        Arrays.fill(column.getLongs(), startPos, endPos, ((long) val));
+        break;
+      case FLOAT:
+        Arrays.fill(column.getFloats(), startPos, endPos, ((float) val));
+        break;
+      case DOUBLE:
+        Arrays.fill(column.getDoubles(), startPos, endPos, ((double) val));
+        break;
+      default:
+        throw new IllegalArgumentException("Unsupported data type: " + 
column.getDataType());
+    }
+    column.setPositionCount(endPos);
+  }
+
+  private void fillMeasurementColumn(Column column, BatchData batchData, int 
pos) {
+    switch (batchData.getDataType()) {
+      case BOOLEAN:
+        column.getBooleans()[pos] = batchData.getBoolean();
+        break;
+      case DOUBLE:
+        column.getDoubles()[pos] = batchData.getDouble();
+        break;
+        case FLOAT:
+          column.getFloats()[pos] = batchData.getFloat();
+          break;
+      case INT32:
+        column.getInts()[pos] = batchData.getInt();
+        break;
+      case TEXT:
+        column.getBinaries()[pos] = batchData.getBinary();
+        break;
+      case INT64:
+        column.getLongs()[pos] = batchData.getLong();
+        break;
+      default:
+        throw new IllegalArgumentException("Unsupported data type: " + 
batchData.getDataType());
+    }
+    column.setPositionCount(pos + 1);
+  }
+
+
   @Override
   public TsBlock next() throws IOException {
-    return null;
+    if (!hasNext()) {
+      throw new NoSuchElementException();
+    }
+    lastBlockReturned = true;
+    return currentBlock;
   }
 
   @Override
   public void close() throws Exception {
     // nothing to be done
   }
+
+  // gather necessary fields in this class to avoid redundant map access
+  public static class MeasurementColumnContext {
+
+    private final String columnName;
+    private final List<Integer> posInResult;
+    private BatchData currentBatch;
+    private final AbstractFileSeriesReader seriesReader;
+
+    public MeasurementColumnContext(String columnName, List<Integer> 
posInResult,
+        BatchData currentBatch,
+        AbstractFileSeriesReader seriesReader) {
+      this.columnName = columnName;
+      this.posInResult = posInResult;
+      this.currentBatch = currentBatch;
+      this.seriesReader = seriesReader;
+    }
+  }
+
+  public static class IdColumnContext {
+
+    private final List<Integer> posInResult;
+    private final int posInDeviceId;
+
+    public IdColumnContext(List<Integer> posInResult,
+        int posInDeviceId) {
+      this.posInResult = posInResult;
+      this.posInDeviceId = posInDeviceId;
+    }
+  }
 }

Reply via email to