This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 1a64ec2a225 Impl visitTreeNonAlignedDeviceViewScan
1a64ec2a225 is described below

commit 1a64ec2a225e4ded26b1327841cda4f298dd044a
Author: shuwenwei <[email protected]>
AuthorDate: Fri Apr 18 11:34:41 2025 +0800

    Impl visitTreeNonAlignedDeviceViewScan
---
 .../operator/process/FilterAndProjectOperator.java |  15 +
 .../execution/operator/process/LimitOperator.java  |   6 +
 .../execution/operator/process/OffsetOperator.java |   6 +
 .../process/join/LeftOuterTimeJoinOperator.java    |  15 +-
 .../join/TableLeftOuterTimeJoinOperator.java       |  55 ++
 .../relational/AbstractTableScanOperator.java      |  56 +-
 .../relational/DeviceIteratorScanOperator.java     | 233 ++++++++
 .../MeasurementToTableViewAdaptorUtils.java        | 103 ++++
 .../TreeToTableViewAdaptorOperator.java}           |  90 +--
 .../plan/planner/TableOperatorGenerator.java       | 575 +++++++++++++++---
 .../operator/DeviceIteratorScanOperatorTest.java   | 228 ++++++++
 ...nAlignedTreeDeviceViewScanOperatorTreeTest.java | 650 +++++++++++++++++++++
 .../TreeToTableViewAdaptorOperatorTest.java        | 237 ++++++++
 13 files changed, 2106 insertions(+), 163 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/FilterAndProjectOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/FilterAndProjectOperator.java
index c759c3a0fdf..06ca6d6d2ce 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/FilterAndProjectOperator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/FilterAndProjectOperator.java
@@ -100,6 +100,21 @@ public class FilterAndProjectOperator implements 
ProcessOperator {
     this.hasFilter = hasFilter;
   }
 
+  public FilterAndProjectOperator(
+      FilterAndProjectOperator filterAndProjectOperator, Operator 
inputOperator) {
+    this.operatorContext = filterAndProjectOperator.operatorContext;
+    this.filterLeafColumnTransformerList = 
filterAndProjectOperator.filterLeafColumnTransformerList;
+    this.filterOutputTransformer = 
filterAndProjectOperator.filterOutputTransformer;
+    this.commonTransformerList = 
filterAndProjectOperator.commonTransformerList;
+    this.projectLeafColumnTransformerList =
+        filterAndProjectOperator.projectLeafColumnTransformerList;
+    this.projectOutputTransformerList = 
filterAndProjectOperator.projectOutputTransformerList;
+    this.hasNonMappableUDF = filterAndProjectOperator.hasNonMappableUDF;
+    this.hasFilter = filterAndProjectOperator.hasFilter;
+    this.filterTsBlockBuilder = filterAndProjectOperator.filterTsBlockBuilder;
+    this.inputOperator = inputOperator;
+  }
+
   @Override
   public OperatorContext getOperatorContext() {
     return operatorContext;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/LimitOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/LimitOperator.java
index 6fdfdbed554..85ef22ababd 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/LimitOperator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/LimitOperator.java
@@ -45,6 +45,12 @@ public class LimitOperator implements ProcessOperator {
     this.child = requireNonNull(child, "child operator is null");
   }
 
+  public LimitOperator(LimitOperator limitOperator, Operator child) {
+    this.operatorContext = limitOperator.operatorContext;
+    this.remainingLimit = limitOperator.remainingLimit;
+    this.child = child;
+  }
+
   @Override
   public OperatorContext getOperatorContext() {
     return operatorContext;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/OffsetOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/OffsetOperator.java
index a8581466f89..24fc72d0941 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/OffsetOperator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/OffsetOperator.java
@@ -45,6 +45,12 @@ public class OffsetOperator implements ProcessOperator {
     this.child = requireNonNull(child, "child operator is null");
   }
 
+  public OffsetOperator(OffsetOperator offsetOperator, Operator child) {
+    this.operatorContext = offsetOperator.operatorContext;
+    this.remainingOffset = offsetOperator.remainingOffset;
+    this.child = child;
+  }
+
   @Override
   public OperatorContext getOperatorContext() {
     return operatorContext;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/LeftOuterTimeJoinOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/LeftOuterTimeJoinOperator.java
index 0607d0a1e35..8460b682e24 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/LeftOuterTimeJoinOperator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/LeftOuterTimeJoinOperator.java
@@ -54,7 +54,7 @@ public class LeftOuterTimeJoinOperator implements 
ProcessOperator {
   private final TsBlockBuilder resultBuilder;
 
   private final Operator left;
-  private final int leftColumnCount;
+  protected final int leftColumnCount;
 
   private TsBlock leftTsBlock;
 
@@ -194,7 +194,7 @@ public class LeftOuterTimeJoinOperator implements 
ProcessOperator {
   private void appendLeftTableRow() {
     for (int i = 0; i < leftColumnCount; i++) {
       Column leftColumn = leftTsBlock.getColumn(i);
-      ColumnBuilder columnBuilder = resultBuilder.getColumnBuilder(i);
+      ColumnBuilder columnBuilder = 
resultBuilder.getColumnBuilder(getOutputColumnIndex(i, true));
       if (leftColumn.isNull(leftIndex)) {
         columnBuilder.appendNull();
       } else {
@@ -231,7 +231,8 @@ public class LeftOuterTimeJoinOperator implements 
ProcessOperator {
       // right table has this time, append right table's corresponding row
       for (int i = leftColumnCount; i < outputColumnCount; i++) {
         Column rightColumn = rightTsBlock.getColumn(i - leftColumnCount);
-        ColumnBuilder columnBuilder = resultBuilder.getColumnBuilder(i);
+        ColumnBuilder columnBuilder =
+            resultBuilder.getColumnBuilder(getOutputColumnIndex(i, false));
         if (rightColumn.isNull(rightIndex)) {
           columnBuilder.appendNull();
         } else {
@@ -273,7 +274,7 @@ public class LeftOuterTimeJoinOperator implements 
ProcessOperator {
 
   private void appendValueColumnForLeftTable(int rowSize) {
     for (int i = 0; i < leftColumnCount; i++) {
-      ColumnBuilder columnBuilder = resultBuilder.getColumnBuilder(i);
+      ColumnBuilder columnBuilder = 
resultBuilder.getColumnBuilder(getOutputColumnIndex(i, true));
       Column valueColumn = leftTsBlock.getColumn(i);
 
       if (valueColumn.mayHaveNull()) {
@@ -296,7 +297,7 @@ public class LeftOuterTimeJoinOperator implements 
ProcessOperator {
   private void appendNullForRightTable(int rowSize) {
     int nullCount = rowSize - leftIndex;
     for (int i = leftColumnCount; i < outputColumnCount; i++) {
-      ColumnBuilder columnBuilder = resultBuilder.getColumnBuilder(i);
+      ColumnBuilder columnBuilder = 
resultBuilder.getColumnBuilder(getOutputColumnIndex(i, false));
       columnBuilder.appendNull(nullCount);
     }
   }
@@ -321,6 +322,10 @@ public class LeftOuterTimeJoinOperator implements 
ProcessOperator {
     return !tsBlockIsNotEmpty(leftTsBlock, leftIndex) && left.isFinished();
   }
 
+  protected int getOutputColumnIndex(int inputColumnIndex, boolean 
isLeftTable) {
+    return inputColumnIndex;
+  }
+
   @Override
   public long calculateMaxPeekMemory() {
     return Math.max(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/TableLeftOuterTimeJoinOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/TableLeftOuterTimeJoinOperator.java
new file mode 100644
index 00000000000..19b9e2eb4d4
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/TableLeftOuterTimeJoinOperator.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.execution.operator.process.join;
+
+import org.apache.iotdb.db.queryengine.execution.operator.Operator;
+import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
+import 
org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.TimeComparator;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.InputLocation;
+
+import org.apache.tsfile.enums.TSDataType;
+
+import java.util.List;
+import java.util.Map;
+
+public class TableLeftOuterTimeJoinOperator extends LeftOuterTimeJoinOperator {
+
+  private final Map<InputLocation, Integer> outputColumnMap;
+
+  public TableLeftOuterTimeJoinOperator(
+      OperatorContext operatorContext,
+      Operator leftChild,
+      Operator rightChild,
+      int leftColumnCount,
+      Map<InputLocation, Integer> outputColumnMap,
+      List<TSDataType> dataTypes,
+      TimeComparator comparator) {
+    super(operatorContext, leftChild, leftColumnCount, rightChild, dataTypes, 
comparator);
+    this.outputColumnMap = outputColumnMap;
+  }
+
+  @Override
+  protected int getOutputColumnIndex(int inputColumnIndex, boolean 
isLeftTable) {
+    return outputColumnMap.get(
+        isLeftTable
+            ? new InputLocation(0, inputColumnIndex)
+            : new InputLocation(1, inputColumnIndex - leftColumnCount));
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/AbstractTableScanOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/AbstractTableScanOperator.java
index f8de2d57572..96de6f92e10 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/AbstractTableScanOperator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/AbstractTableScanOperator.java
@@ -32,16 +32,11 @@ import 
org.apache.iotdb.db.queryengine.plan.statement.component.Ordering;
 import org.apache.iotdb.db.storageengine.dataregion.read.IQueryDataSource;
 import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSource;
 
-import org.apache.tsfile.block.column.Column;
-import org.apache.tsfile.common.conf.TSFileConfig;
 import org.apache.tsfile.common.conf.TSFileDescriptor;
 import org.apache.tsfile.enums.TSDataType;
 import org.apache.tsfile.read.common.block.TsBlock;
 import org.apache.tsfile.read.common.block.TsBlockBuilder;
-import org.apache.tsfile.read.common.block.column.BinaryColumn;
 import org.apache.tsfile.read.common.block.column.LongColumn;
-import org.apache.tsfile.read.common.block.column.RunLengthEncodedColumn;
-import org.apache.tsfile.utils.Binary;
 import org.apache.tsfile.utils.RamUsageEstimator;
 import org.apache.tsfile.write.schema.IMeasurementSchema;
 
@@ -200,57 +195,18 @@ public abstract class AbstractTableScanOperator extends 
AbstractSeriesScanOperat
   }
 
   private void constructResultTsBlock() {
-    int positionCount = measurementDataBlock.getPositionCount();
     DeviceEntry currentDeviceEntry = deviceEntries.get(currentDeviceIndex);
-    Column[] valueColumns = new Column[columnsIndexArray.length];
-    for (int i = 0; i < columnsIndexArray.length; i++) {
-      switch (columnSchemas.get(i).getColumnCategory()) {
-        case TAG:
-          String idColumnValue = getNthIdColumnValue(currentDeviceEntry, 
columnsIndexArray[i]);
-
-          valueColumns[i] =
-              getIdOrAttributeValueColumn(
-                  idColumnValue == null
-                      ? null
-                      : new Binary(idColumnValue, TSFileConfig.STRING_CHARSET),
-                  positionCount);
-          break;
-        case ATTRIBUTE:
-          Binary attributeColumnValue =
-              
currentDeviceEntry.getAttributeColumnValues()[columnsIndexArray[i]];
-          valueColumns[i] = getIdOrAttributeValueColumn(attributeColumnValue, 
positionCount);
-          break;
-        case FIELD:
-          valueColumns[i] = 
measurementDataBlock.getColumn(columnsIndexArray[i]);
-          break;
-        case TIME:
-          valueColumns[i] = measurementDataBlock.getTimeColumn();
-          break;
-        default:
-          throw new IllegalArgumentException(
-              "Unexpected column category: " + 
columnSchemas.get(i).getColumnCategory());
-      }
-    }
     this.resultTsBlock =
-        new TsBlock(
-            positionCount,
-            new RunLengthEncodedColumn(TIME_COLUMN_TEMPLATE, positionCount),
-            valueColumns);
+        MeasurementToTableViewAdaptorUtils.toTableBlock(
+            measurementDataBlock,
+            columnsIndexArray,
+            columnSchemas,
+            deviceEntries.get(currentDeviceIndex),
+            idColumnIndex -> getNthIdColumnValue(currentDeviceEntry, 
idColumnIndex));
   }
 
   abstract String getNthIdColumnValue(DeviceEntry deviceEntry, int 
idColumnIndex);
 
-  private RunLengthEncodedColumn getIdOrAttributeValueColumn(Binary value, int 
positionCount) {
-    if (value == null) {
-      return new RunLengthEncodedColumn(
-          new BinaryColumn(1, Optional.of(new boolean[] {true}), new Binary[] 
{null}),
-          positionCount);
-    } else {
-      return new RunLengthEncodedColumn(
-          new BinaryColumn(1, Optional.empty(), new Binary[] {value}), 
positionCount);
-    }
-  }
-
   @Override
   public boolean hasNext() throws Exception {
     return !isFinished();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/DeviceIteratorScanOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/DeviceIteratorScanOperator.java
new file mode 100644
index 00000000000..1689fd2fd84
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/DeviceIteratorScanOperator.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.execution.operator.source.relational;
+
+import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper;
+import org.apache.iotdb.db.queryengine.execution.operator.Operator;
+import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
+import 
org.apache.iotdb.db.queryengine.execution.operator.source.AbstractDataSourceOperator;
+import org.apache.iotdb.db.queryengine.plan.relational.metadata.DeviceEntry;
+import org.apache.iotdb.db.storageengine.dataregion.read.IQueryDataSource;
+import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSource;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.read.common.block.TsBlock;
+import org.apache.tsfile.utils.RamUsageEstimator;
+import org.apache.tsfile.write.schema.IMeasurementSchema;
+
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Set;
+
+public class DeviceIteratorScanOperator extends AbstractDataSourceOperator {
+  private static final long INSTANCE_SIZE =
+      
RamUsageEstimator.shallowSizeOfInstance(DeviceIteratorScanOperator.class);
+
+  private final OperatorContext operatorContext;
+  private final List<DeviceEntry> deviceEntries;
+  private final DeviceChildOperatorTreeGenerator 
deviceChildOperatorTreeGenerator;
+
+  private QueryDataSource queryDataSource;
+  private int currentDeviceIndex;
+  private Operator currentDeviceRootOperator;
+  private List<Operator> dataSourceOperators;
+  // For each device operator tree, isBlocked needs to be called once.
+  // Calling isBlocked will set this field to true.
+  // When isBlocked is not called for a device, hasNext will return true and 
next will return null.
+  private boolean currentDeviceInit;
+
+  public DeviceIteratorScanOperator(
+      OperatorContext operatorContext,
+      List<DeviceEntry> deviceEntries,
+      DeviceChildOperatorTreeGenerator childOperatorTreeGenerator) {
+    this.operatorContext = operatorContext;
+    this.deviceEntries = deviceEntries;
+    this.deviceChildOperatorTreeGenerator = childOperatorTreeGenerator;
+    this.currentDeviceIndex = 0;
+    this.currentDeviceInit = false;
+    this.operatorContext.recordSpecifiedInfo(
+        AbstractTableScanOperator.CURRENT_DEVICE_INDEX_STRING, 
Integer.toString(0));
+    constructCurrentDeviceOperatorTree();
+  }
+
+  @Override
+  public boolean hasNext() throws Exception {
+    if (currentDeviceRootOperator != null && 
currentDeviceRootOperator.hasNext()) {
+      return true;
+    } else {
+      if (!currentDeviceInit) {
+        return true;
+      }
+      if (currentDeviceIndex + 1 >= deviceEntries.size()) {
+        return false;
+      } else {
+        nextDevice();
+        return true;
+      }
+    }
+  }
+
+  @Override
+  public boolean isFinished() throws Exception {
+    return !hasNext();
+  }
+
+  private void nextDevice() throws Exception {
+    currentDeviceIndex++;
+    
deviceChildOperatorTreeGenerator.getCurrentDeviceStartCloseOperator().close();
+    if (currentDeviceIndex >= deviceEntries.size()) {
+      return;
+    }
+    constructCurrentDeviceOperatorTree();
+    queryDataSource.reset();
+    initQueryDataSource(queryDataSource);
+    this.operatorContext.recordSpecifiedInfo(
+        AbstractTableScanOperator.CURRENT_DEVICE_INDEX_STRING,
+        Integer.toString(currentDeviceIndex));
+  }
+
+  private void constructCurrentDeviceOperatorTree() {
+    if (this.deviceEntries.isEmpty()) {
+      return;
+    }
+    if (this.deviceEntries.get(this.currentDeviceIndex) == null) {
+      throw new IllegalStateException(
+          "Device entries of index " + this.currentDeviceIndex + " is empty");
+    }
+    DeviceEntry deviceEntry = this.deviceEntries.get(this.currentDeviceIndex);
+
+    
deviceChildOperatorTreeGenerator.generateCurrentDeviceOperatorTree(deviceEntry);
+    currentDeviceRootOperator = 
deviceChildOperatorTreeGenerator.getCurrentDeviceRootOperator();
+    dataSourceOperators = 
deviceChildOperatorTreeGenerator.getCurrentDeviceDataSourceOperators();
+    currentDeviceInit = false;
+  }
+
+  @Override
+  public void initQueryDataSource(IQueryDataSource dataSource) {
+    this.queryDataSource = (QueryDataSource) dataSource;
+    if (dataSourceOperators == null || dataSourceOperators.isEmpty()) {
+      return;
+    }
+    for (Operator operator : dataSourceOperators) {
+      ((AbstractDataSourceOperator) operator).initQueryDataSource(dataSource);
+    }
+  }
+
+  @Override
+  public TsBlock next() throws Exception {
+    if (!hasNext()) {
+      throw new NoSuchElementException();
+    }
+    if (!currentDeviceInit) {
+      return null;
+    }
+    return currentDeviceRootOperator.next();
+  }
+
+  @Override
+  public OperatorContext getOperatorContext() {
+    return operatorContext;
+  }
+
+  @Override
+  public ListenableFuture<?> isBlocked() {
+    currentDeviceInit = true;
+    return currentDeviceRootOperator.isBlocked();
+  }
+
+  @Override
+  public void close() throws Exception {
+    if (currentDeviceRootOperator != null) {
+      currentDeviceRootOperator.close();
+    }
+  }
+
+  @Override
+  protected List<TSDataType> getResultDataTypes() {
+    throw new UnsupportedOperationException(
+        "Should not call getResultDataTypes() method in 
DeviceIteratorScanOperator");
+  }
+
+  @Override
+  public long calculateMaxPeekMemory() {
+    return currentDeviceRootOperator.calculateMaxPeekMemory();
+  }
+
+  @Override
+  public long calculateMaxReturnSize() {
+    return currentDeviceRootOperator.calculateMaxReturnSize();
+  }
+
+  @Override
+  public long calculateRetainedSizeAfterCallingNext() {
+    return currentDeviceRootOperator.calculateRetainedSizeAfterCallingNext();
+  }
+
+  @Override
+  public long ramBytesUsed() {
+    return INSTANCE_SIZE
+        + 
MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(operatorContext)
+        + 
MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(currentDeviceRootOperator)
+        + RamUsageEstimator.sizeOfCollection(deviceEntries);
+  }
+
+  public static class TreeNonAlignedDeviceViewScanParameters {
+    public final OperatorContext context;
+    public final List<DeviceEntry> deviceEntries;
+    public final List<String> measurementColumnNames;
+    public final Set<String> allSensors;
+    public final List<IMeasurementSchema> measurementSchemas;
+    public final DeviceChildOperatorTreeGenerator generator;
+
+    public TreeNonAlignedDeviceViewScanParameters(
+        Set<String> allSensors,
+        OperatorContext context,
+        List<DeviceEntry> deviceEntries,
+        List<String> measurementColumnNames,
+        List<IMeasurementSchema> measurementSchemas,
+        DeviceChildOperatorTreeGenerator generator) {
+      this.allSensors = allSensors;
+      this.context = context;
+      this.deviceEntries = deviceEntries;
+      this.measurementColumnNames = measurementColumnNames;
+      this.measurementSchemas = measurementSchemas;
+      this.generator = generator;
+    }
+  }
+
+  public interface DeviceChildOperatorTreeGenerator {
+    // Do the offset and limit operator need to keep after the device iterator
+    boolean keepOffsetAndLimitOperatorAfterDeviceIterator();
+
+    // Generate the following operator subtree based on the current deviceEntry
+    void generateCurrentDeviceOperatorTree(DeviceEntry deviceEntry);
+
+    // Returns the root operator of the subtree
+    Operator getCurrentDeviceRootOperator();
+
+    // Returns all DataSourceOperators created this time for use in 
initQueryDataSource in
+    // DeviceIterator
+    List<Operator> getCurrentDeviceDataSourceOperators();
+
+    // Returns which operator to close after switching device
+    Operator getCurrentDeviceStartCloseOperator();
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/MeasurementToTableViewAdaptorUtils.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/MeasurementToTableViewAdaptorUtils.java
new file mode 100644
index 00000000000..0088b2a0956
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/MeasurementToTableViewAdaptorUtils.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.execution.operator.source.relational;
+
+import org.apache.iotdb.db.queryengine.plan.relational.metadata.ColumnSchema;
+import org.apache.iotdb.db.queryengine.plan.relational.metadata.DeviceEntry;
+
+import org.apache.tsfile.block.column.Column;
+import org.apache.tsfile.common.conf.TSFileConfig;
+import org.apache.tsfile.read.common.block.TsBlock;
+import org.apache.tsfile.read.common.block.column.BinaryColumn;
+import org.apache.tsfile.read.common.block.column.RunLengthEncodedColumn;
+import org.apache.tsfile.utils.Binary;
+
+import java.util.List;
+import java.util.Optional;
+
+import static 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.AbstractTableScanOperator.TIME_COLUMN_TEMPLATE;
+
+public class MeasurementToTableViewAdaptorUtils {
+  private MeasurementToTableViewAdaptorUtils() {}
+
+  public static TsBlock toTableBlock(
+      TsBlock measurementDataBlock,
+      int[] columnsIndexArray,
+      List<ColumnSchema> columnSchemas,
+      DeviceEntry deviceEntry,
+      GetNthIdColumnValueFunc func) {
+    if (measurementDataBlock == null) {
+      return null;
+    }
+    int positionCount = measurementDataBlock.getPositionCount();
+    Column[] valueColumns = new Column[columnsIndexArray.length];
+    for (int i = 0; i < columnsIndexArray.length; i++) {
+      switch (columnSchemas.get(i).getColumnCategory()) {
+        case TAG:
+          String idColumnValue = 
func.getNthIdColumnValue(columnsIndexArray[i]);
+
+          valueColumns[i] =
+              getIdOrAttributeValueColumn(
+                  idColumnValue == null
+                      ? null
+                      : new Binary(idColumnValue, TSFileConfig.STRING_CHARSET),
+                  positionCount);
+          break;
+        case ATTRIBUTE:
+          Binary attributeColumnValue =
+              deviceEntry.getAttributeColumnValues()[columnsIndexArray[i]];
+          valueColumns[i] = getIdOrAttributeValueColumn(attributeColumnValue, 
positionCount);
+          break;
+        case FIELD:
+          valueColumns[i] = 
measurementDataBlock.getColumn(columnsIndexArray[i]);
+          break;
+        case TIME:
+          valueColumns[i] = measurementDataBlock.getTimeColumn();
+          break;
+        default:
+          throw new IllegalArgumentException(
+              "Unexpected column category: " + 
columnSchemas.get(i).getColumnCategory());
+      }
+    }
+    return new TsBlock(
+        positionCount,
+        new RunLengthEncodedColumn(TIME_COLUMN_TEMPLATE, positionCount),
+        valueColumns);
+  }
+
+  private static RunLengthEncodedColumn getIdOrAttributeValueColumn(
+      Binary value, int positionCount) {
+    if (value == null) {
+      return new RunLengthEncodedColumn(
+          new BinaryColumn(
+              1, Optional.of(new boolean[] {true}), new 
org.apache.tsfile.utils.Binary[] {null}),
+          positionCount);
+    } else {
+      return new RunLengthEncodedColumn(
+          new BinaryColumn(1, Optional.empty(), new 
org.apache.tsfile.utils.Binary[] {value}),
+          positionCount);
+    }
+  }
+
+  @FunctionalInterface
+  public interface GetNthIdColumnValueFunc {
+    String getNthIdColumnValue(int idColumnIndex);
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/OffsetOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TreeToTableViewAdaptorOperator.java
similarity index 52%
copy from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/OffsetOperator.java
copy to 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TreeToTableViewAdaptorOperator.java
index a8581466f89..ff631af4072 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/OffsetOperator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TreeToTableViewAdaptorOperator.java
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
@@ -17,69 +17,80 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.queryengine.execution.operator.process;
+package org.apache.iotdb.db.queryengine.execution.operator.source.relational;
 
 import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper;
 import org.apache.iotdb.db.queryengine.execution.operator.Operator;
 import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
+import 
org.apache.iotdb.db.queryengine.execution.operator.process.ProcessOperator;
+import org.apache.iotdb.db.queryengine.plan.relational.metadata.ColumnSchema;
+import org.apache.iotdb.db.queryengine.plan.relational.metadata.DeviceEntry;
 
 import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.tsfile.file.metadata.IDeviceID;
 import org.apache.tsfile.read.common.block.TsBlock;
 import org.apache.tsfile.utils.RamUsageEstimator;
 
-import static com.google.common.base.Preconditions.checkArgument;
-import static java.util.Objects.requireNonNull;
-
-public class OffsetOperator implements ProcessOperator {
+import java.util.List;
 
+public class TreeToTableViewAdaptorOperator implements ProcessOperator {
   private static final long INSTANCE_SIZE =
-      RamUsageEstimator.shallowSizeOfInstance(OffsetOperator.class);
+      
RamUsageEstimator.shallowSizeOfInstance(TreeToTableViewAdaptorOperator.class);
   private final OperatorContext operatorContext;
-  private long remainingOffset;
+  private final DeviceEntry currentDeviceEntry;
+  private final int[] columnsIndexArray;
+  private final List<ColumnSchema> columnSchemas;
   private final Operator child;
-
-  public OffsetOperator(OperatorContext operatorContext, long offset, Operator 
child) {
-    this.operatorContext = requireNonNull(operatorContext, "operatorContext is 
null");
-    checkArgument(offset >= 0, "offset must be at least zero");
-    this.remainingOffset = offset;
-    this.child = requireNonNull(child, "child operator is null");
+  private final IDeviceID.TreeDeviceIdColumnValueExtractor extractor;
+
+  public TreeToTableViewAdaptorOperator(
+      OperatorContext operatorContext,
+      DeviceEntry deviceEntry,
+      int[] columnIndexArray,
+      List<ColumnSchema> columnSchemas,
+      Operator child,
+      IDeviceID.TreeDeviceIdColumnValueExtractor extractor) {
+    this.operatorContext = operatorContext;
+    this.currentDeviceEntry = deviceEntry;
+    this.columnsIndexArray = columnIndexArray;
+    this.columnSchemas = columnSchemas;
+    this.child = child;
+    this.extractor = extractor;
   }
 
   @Override
-  public OperatorContext getOperatorContext() {
-    return operatorContext;
+  public boolean hasNext() throws Exception {
+    return child.hasNext();
   }
 
   @Override
-  public ListenableFuture<?> isBlocked() {
-    return child.isBlocked();
+  public TsBlock next() throws Exception {
+    TsBlock measurementDataBlock = child.next();
+    return MeasurementToTableViewAdaptorUtils.toTableBlock(
+        measurementDataBlock,
+        columnsIndexArray,
+        columnSchemas,
+        currentDeviceEntry,
+        idColumnIndex -> getNthIdColumnValue(currentDeviceEntry, 
idColumnIndex));
+  }
+
+  private String getNthIdColumnValue(DeviceEntry deviceEntry, int 
idColumnIndex) {
+    return (String) extractor.extract(deviceEntry.getDeviceID(), 
idColumnIndex);
   }
 
   @Override
-  public TsBlock next() throws Exception {
-    TsBlock block = child.nextWithTimer();
-    if (block == null) {
-      return null;
-    }
-    if (remainingOffset > 0) {
-      // It's safe to narrow long to int here, because 
block.getPositionCount() will always be less
-      // than Integer.MAX_VALUE
-      int offset = (int) Math.min(remainingOffset, block.getPositionCount());
-      remainingOffset -= offset;
-      return block.getRegion(offset, block.getPositionCount() - offset);
-    } else {
-      return block;
-    }
+  public void close() throws Exception {
+    child.close();
   }
 
   @Override
-  public boolean hasNext() throws Exception {
-    return child.hasNextWithTimer();
+  public ListenableFuture<?> isBlocked() {
+    return child.isBlocked();
   }
 
   @Override
-  public void close() throws Exception {
-    child.close();
+  public OperatorContext getOperatorContext() {
+    return operatorContext;
   }
 
   @Override
@@ -89,7 +100,7 @@ public class OffsetOperator implements ProcessOperator {
 
   @Override
   public long calculateMaxPeekMemory() {
-    return child.calculateMaxPeekMemoryWithCounter();
+    return child.calculateMaxPeekMemory();
   }
 
   @Override
@@ -105,7 +116,10 @@ public class OffsetOperator implements ProcessOperator {
   @Override
   public long ramBytesUsed() {
     return INSTANCE_SIZE
+        + 
MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(operatorContext)
         + MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(child)
-        + 
MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(operatorContext);
+        + 
MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(currentDeviceEntry)
+        + RamUsageEstimator.sizeOf(columnsIndexArray)
+        + RamUsageEstimator.sizeOfCollection(columnSchemas);
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
index fd54b3135a1..ae0956a42d3 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.commons.conf.CommonDescriptor;
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.path.AlignedFullPath;
+import org.apache.iotdb.commons.path.NonAlignedFullPath;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.schema.table.TsTable;
 import org.apache.iotdb.commons.schema.table.column.TsTableColumnSchema;
@@ -70,7 +71,14 @@ import 
org.apache.iotdb.db.queryengine.execution.operator.process.gapfill.GapFil
 import 
org.apache.iotdb.db.queryengine.execution.operator.process.gapfill.GapFillWGroupWoMoOperator;
 import 
org.apache.iotdb.db.queryengine.execution.operator.process.gapfill.GapFillWoGroupWMoOperator;
 import 
org.apache.iotdb.db.queryengine.execution.operator.process.gapfill.GapFillWoGroupWoMoOperator;
+import 
org.apache.iotdb.db.queryengine.execution.operator.process.join.FullOuterTimeJoinOperator;
+import 
org.apache.iotdb.db.queryengine.execution.operator.process.join.InnerTimeJoinOperator;
 import 
org.apache.iotdb.db.queryengine.execution.operator.process.join.SimpleNestedLoopCrossJoinOperator;
+import 
org.apache.iotdb.db.queryengine.execution.operator.process.join.TableLeftOuterTimeJoinOperator;
+import 
org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.AscTimeComparator;
+import 
org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.ColumnMerger;
+import 
org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.DescTimeComparator;
+import 
org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.SingleColumnMerger;
 import 
org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.comparator.JoinKeyComparatorFactory;
 import 
org.apache.iotdb.db.queryengine.execution.operator.process.last.LastQueryUtil;
 import 
org.apache.iotdb.db.queryengine.execution.operator.schema.CountMergeOperator;
@@ -81,9 +89,11 @@ import 
org.apache.iotdb.db.queryengine.execution.operator.schema.source.SchemaSo
 import 
org.apache.iotdb.db.queryengine.execution.operator.sink.IdentitySinkOperator;
 import 
org.apache.iotdb.db.queryengine.execution.operator.source.AbstractDataSourceOperator;
 import 
org.apache.iotdb.db.queryengine.execution.operator.source.ExchangeOperator;
+import 
org.apache.iotdb.db.queryengine.execution.operator.source.SeriesScanOperator;
 import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.AbstractAggTableScanOperator;
 import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.AbstractTableScanOperator;
 import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.DefaultAggTableScanOperator;
+import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.DeviceIteratorScanOperator;
 import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.InformationSchemaTableScanOperator;
 import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.LastQueryAggTableScanOperator;
 import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.MarkDistinctOperator;
@@ -94,6 +104,7 @@ import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.Merg
 import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.TableScanOperator;
 import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.TreeAlignedDeviceViewAggregationScanOperator;
 import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.TreeAlignedDeviceViewScanOperator;
+import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.TreeToTableViewAdaptorOperator;
 import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.AggregationOperator;
 import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.LastByDescAccumulator;
 import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.LastDescAccumulator;
@@ -132,6 +143,8 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.planner.CastToTimestampLi
 import org.apache.iotdb.db.queryengine.plan.relational.planner.OrderingScheme;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.SortOrder;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.SymbolsExtractor;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.ir.IrUtils;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationTableScanNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationTreeDeviceViewScanNode;
@@ -365,8 +378,489 @@ public class TableOperatorGenerator extends 
PlanVisitor<Operator, LocalExecution
   @Override
   public Operator visitTreeNonAlignedDeviceViewScan(
       TreeNonAlignedDeviceViewScanNode node, LocalExecutionPlanContext 
context) {
-    throw new UnsupportedOperationException(
-        "view for non aligned devices in tree is not supported");
+
+    DeviceIteratorScanOperator.TreeNonAlignedDeviceViewScanParameters 
parameter =
+        constructTreeNonAlignedDeviceViewScanOperatorParameter(
+            node,
+            context,
+            TreeNonAlignedDeviceViewScanNode.class.getSimpleName(),
+            node.getMeasurementColumnNameMap());
+
+    DeviceIteratorScanOperator treeNonAlignedDeviceIteratorScanOperator =
+        new DeviceIteratorScanOperator(
+            parameter.context, parameter.deviceEntries, parameter.generator);
+    addSource(
+        treeNonAlignedDeviceIteratorScanOperator,
+        context,
+        node,
+        parameter.measurementColumnNames,
+        parameter.measurementSchemas,
+        parameter.allSensors,
+        TreeNonAlignedDeviceViewScanNode.class.getSimpleName());
+
+    if (!parameter.generator.keepOffsetAndLimitOperatorAfterDeviceIterator()) {
+      return treeNonAlignedDeviceIteratorScanOperator;
+    }
+    Operator operator = treeNonAlignedDeviceIteratorScanOperator;
+    if (node.getPushDownOffset() > 0) {
+      operator = new OffsetOperator(parameter.context, 
node.getPushDownOffset(), operator);
+    }
+    if (node.getPushDownLimit() > 0) {
+      operator = new LimitOperator(parameter.context, node.getPushDownLimit(), 
operator);
+    }
+    return operator;
+  }
+
+  private DeviceIteratorScanOperator.TreeNonAlignedDeviceViewScanParameters
+      constructTreeNonAlignedDeviceViewScanOperatorParameter(
+          TreeNonAlignedDeviceViewScanNode node,
+          LocalExecutionPlanContext context,
+          String className,
+          Map<String, String> fieldColumnsRenameMap) {
+    if (node.isPushLimitToEachDevice() && node.getPushDownOffset() > 0) {
+      throw new IllegalArgumentException(
+          "PushDownOffset should not be set when isPushLimitToEachDevice is 
true.");
+    }
+    CommonTableScanOperatorParameters commonParameter =
+        new CommonTableScanOperatorParameters(node, fieldColumnsRenameMap, 
true);
+    List<IMeasurementSchema> measurementSchemas = 
commonParameter.measurementSchemas;
+    List<String> measurementColumnNames = 
commonParameter.measurementColumnNames;
+    List<ColumnSchema> fullColumnSchemas = commonParameter.columnSchemas;
+    int[] columnsIndexArray = commonParameter.columnsIndexArray;
+
+    boolean isSingleColumn = measurementSchemas.size() == 1;
+
+    OperatorContext operatorContext =
+        context
+            .getDriverContext()
+            .addOperatorContext(context.getNextOperatorId(), 
node.getPlanNodeId(), className);
+
+    Set<String> allSensors = new HashSet<>(measurementColumnNames);
+
+    DeviceIteratorScanOperator.DeviceChildOperatorTreeGenerator 
deviceChildOperatorTreeGenerator =
+        new DeviceIteratorScanOperator.DeviceChildOperatorTreeGenerator() {
+
+          private Operator operator;
+          private List<SeriesScanOptions> seriesScanOptionsList;
+          private List<Operator> seriesScanOperators;
+          private FilterAndProjectOperator filterAndProjectOperator;
+          private OffsetOperator reuseOffsetOperator;
+          private LimitOperator reuseLimitOperator;
+          private Operator startCloseInternalOperator;
+
+          private List<Expression> cannotPushDownConjuncts;
+          private boolean removeUpperOffsetAndLimitOperator;
+
+          @Override
+          public boolean keepOffsetAndLimitOperatorAfterDeviceIterator() {
+            calculateSeriesScanOptionsList();
+            return !removeUpperOffsetAndLimitOperator && 
!node.isPushLimitToEachDevice();
+          }
+
+          @Override
+          public void generateCurrentDeviceOperatorTree(DeviceEntry 
deviceEntry) {
+            calculateSeriesScanOptionsList();
+            operator = constructTreeToTableViewAdaptorOperator(deviceEntry);
+            if (isSingleColumn) {
+              return;
+            }
+            if (!cannotPushDownConjuncts.isEmpty()
+                || node.getAssignments().size() != 
node.getOutputSymbols().size()) {
+              operator = getFilterAndProjectOperator(operator);
+            }
+            if (!node.isPushLimitToEachDevice() || 
removeUpperOffsetAndLimitOperator) {
+              return;
+            }
+            if (node.getPushDownLimit() > 0) {
+              operator = new LimitOperator(operatorContext, 
node.getPushDownLimit(), operator);
+            }
+          }
+
+          private void calculateSeriesScanOptionsList() {
+            if (seriesScanOptionsList != null) {
+              return;
+            }
+            seriesScanOptionsList = new ArrayList<>(measurementSchemas.size());
+            cannotPushDownConjuncts = new ArrayList<>();
+            Map<String, List<Expression>> pushDownConjunctsForEachMeasurement 
= new HashMap<>();
+            if (node.getPushDownPredicate() != null) {
+              List<Expression> conjuncts = 
IrUtils.extractConjuncts(node.getPushDownPredicate());
+              for (Expression conjunct : conjuncts) {
+                Set<Symbol> symbols = SymbolsExtractor.extractUnique(conjunct);
+                boolean containsMultiDataSource = symbols.size() > 1;
+                if (containsMultiDataSource) {
+                  cannotPushDownConjuncts.add(conjunct);
+                  continue;
+                }
+                String symbolName = symbols.iterator().next().getName();
+                pushDownConjunctsForEachMeasurement
+                    .computeIfAbsent(symbolName, k -> new ArrayList<>())
+                    .add(conjunct);
+              }
+            }
+
+            boolean canPushDownLimit = cannotPushDownConjuncts.isEmpty();
+            // only use full outer time join
+            boolean canPushDownLimitToAllSeriesScanOptions =
+                canPushDownLimit && 
pushDownConjunctsForEachMeasurement.isEmpty();
+            // the left child of LeftOuterTimeJoinOperator is 
SeriesScanOperator
+            boolean pushDownLimitToLeftChildSeriesScanOperator =
+                canPushDownLimit && pushDownConjunctsForEachMeasurement.size() 
== 1;
+            // the left child of LeftOuterTimeJoinOperator is 
InnerTimeJoinOperator
+            boolean pushDownOffsetAndLimitAfterInnerJoinOperator =
+                canPushDownLimit && pushDownConjunctsForEachMeasurement.size() 
> 1;
+            removeUpperOffsetAndLimitOperator =
+                pushDownLimitToLeftChildSeriesScanOperator
+                    || pushDownOffsetAndLimitAfterInnerJoinOperator
+                    || isSingleColumn;
+            for (int i = 0; i < measurementSchemas.size(); i++) {
+              IMeasurementSchema measurementSchema = measurementSchemas.get(i);
+              List<Expression> pushDownPredicatesForCurrentMeasurement =
+                  
pushDownConjunctsForEachMeasurement.get(measurementSchema.getMeasurementName());
+              Expression pushDownPredicateForCurrentMeasurement =
+                  isSingleColumn
+                      ? node.getPushDownPredicate()
+                      : (pushDownPredicatesForCurrentMeasurement == null
+                          ? null
+                          : 
IrUtils.combineConjuncts(pushDownPredicatesForCurrentMeasurement));
+              SeriesScanOptions.Builder builder =
+                  node.getTimePredicate()
+                      .map(expression -> getSeriesScanOptionsBuilder(context, 
expression))
+                      .orElseGet(SeriesScanOptions.Builder::new);
+              builder.withAllSensors(new HashSet<>(measurementColumnNames));
+              if (pushDownPredicateForCurrentMeasurement != null) {
+                builder.withPushDownFilter(
+                    convertPredicateToFilter(
+                        pushDownPredicateForCurrentMeasurement,
+                        
Collections.singletonMap(measurementSchema.getMeasurementName(), 0),
+                        commonParameter.columnSchemaMap,
+                        commonParameter.timeColumnName));
+              }
+              if (isSingleColumn
+                  || (pushDownLimitToLeftChildSeriesScanOperator
+                      && pushDownPredicateForCurrentMeasurement != null)) {
+                builder.withPushDownLimit(node.getPushDownLimit());
+                
builder.withPushLimitToEachDevice(node.isPushLimitToEachDevice());
+              }
+              if (canPushDownLimitToAllSeriesScanOptions) {
+                builder.withPushDownLimit(node.getPushDownLimit() + 
node.getPushDownOffset());
+              }
+              if (isSingleColumn
+                  || (pushDownLimitToLeftChildSeriesScanOperator
+                      && pushDownPredicateForCurrentMeasurement != null)) {
+                builder.withPushDownOffset(
+                    node.isPushLimitToEachDevice() ? 0 : 
node.getPushDownOffset());
+              }
+              seriesScanOptionsList.add(builder.build());
+            }
+          }
+
+          private Operator constructTreeToTableViewAdaptorOperator(DeviceEntry 
deviceEntry) {
+            seriesScanOperators = new ArrayList<>(measurementSchemas.size());
+            operator = constructAndJoinScanOperators(deviceEntry);
+            return new TreeToTableViewAdaptorOperator(
+                operatorContext,
+                deviceEntry,
+                columnsIndexArray,
+                fullColumnSchemas,
+                operator,
+                TableOperatorGenerator.createTreeDeviceIdColumnValueExtractor(
+                    node.getTreeDBName()));
+          }
+
+          private Operator constructAndJoinScanOperators(DeviceEntry 
deviceEntry) {
+            List<Operator> childrenWithPushDownPredicate = new ArrayList<>();
+            List<TSDataType> innerJoinDataTypeList = new ArrayList<>();
+            List<Operator> childrenWithoutPushDownPredicate = new 
ArrayList<>();
+            List<TSDataType> fullOuterTimeJoinDataTypeList = new ArrayList<>();
+            Map<InputLocation, Integer> leftOuterJoinColumnIndexMap = new 
HashMap<>();
+            for (int i = 0; i < measurementSchemas.size(); i++) {
+              IMeasurementSchema measurementSchema = measurementSchemas.get(i);
+              NonAlignedFullPath path =
+                  new NonAlignedFullPath(deviceEntry.getDeviceID(), 
measurementSchema);
+              SeriesScanOptions seriesScanOptions = 
seriesScanOptionsList.get(i);
+              Operator seriesScanOperator =
+                  new SeriesScanOperator(
+                      operatorContext,
+                      node.getPlanNodeId(),
+                      path,
+                      node.getScanOrder(),
+                      seriesScanOptions);
+              seriesScanOperators.add(seriesScanOperator);
+              if (seriesScanOptions.getPushDownFilter() != null) {
+                childrenWithPushDownPredicate.add(seriesScanOperator);
+                innerJoinDataTypeList.add(measurementSchema.getType());
+                leftOuterJoinColumnIndexMap.put(
+                    new InputLocation(0, childrenWithPushDownPredicate.size() 
- 1), i);
+              } else {
+                childrenWithoutPushDownPredicate.add(seriesScanOperator);
+                fullOuterTimeJoinDataTypeList.add(measurementSchema.getType());
+                leftOuterJoinColumnIndexMap.put(
+                    new InputLocation(1, 
childrenWithoutPushDownPredicate.size() - 1), i);
+              }
+            }
+            Operator leftChild =
+                generateInnerTimeJoinOperator(childrenWithPushDownPredicate, 
innerJoinDataTypeList);
+            Operator rightChild =
+                generateFullOuterTimeJoinOperator(
+                    childrenWithoutPushDownPredicate, 
fullOuterTimeJoinDataTypeList);
+            return generateLeftOuterTimeJoinOperator(
+                leftChild,
+                rightChild,
+                childrenWithPushDownPredicate.size(),
+                leftOuterJoinColumnIndexMap,
+                IMeasurementSchema.getDataTypeList(measurementSchemas));
+          }
+
+          private Operator generateInnerTimeJoinOperator(
+              List<Operator> operators, List<TSDataType> dataTypes) {
+            if (operators.isEmpty()) {
+              return null;
+            }
+            if (operators.size() == 1) {
+              return operators.get(0);
+            }
+            Map<InputLocation, Integer> outputColumnMap = new HashMap<>();
+            for (int i = 0; i < operators.size(); i++) {
+              outputColumnMap.put(new InputLocation(i, 0), i);
+            }
+            Operator currentOperator =
+                new InnerTimeJoinOperator(
+                    operatorContext,
+                    operators,
+                    dataTypes,
+                    node.getScanOrder() == Ordering.ASC
+                        ? new AscTimeComparator()
+                        : new DescTimeComparator(),
+                    outputColumnMap);
+            boolean addOffsetAndLimitOperatorAfterLeftChild =
+                operators.size() > 1 && cannotPushDownConjuncts.isEmpty();
+            if (addOffsetAndLimitOperatorAfterLeftChild) {
+              if (node.getPushDownOffset() > 0) {
+                currentOperator = getReuseOffsetOperator(currentOperator);
+              }
+              if (node.getPushDownLimit() > 0) {
+                currentOperator = getReuseLimitOperator(currentOperator);
+              }
+            }
+            return currentOperator;
+          }
+
+          private Operator generateFullOuterTimeJoinOperator(
+              List<Operator> operators, List<TSDataType> dataTypes) {
+            if (operators.isEmpty()) {
+              return null;
+            }
+            if (operators.size() == 1) {
+              return operators.get(0);
+            }
+            List<ColumnMerger> columnMergers = new 
ArrayList<>(operators.size());
+            for (int i = 0; i < operators.size(); i++) {
+              columnMergers.add(
+                  new SingleColumnMerger(
+                      new InputLocation(i, 0),
+                      node.getScanOrder() == Ordering.ASC
+                          ? new AscTimeComparator()
+                          : new DescTimeComparator()));
+            }
+            return new FullOuterTimeJoinOperator(
+                operatorContext,
+                operators,
+                node.getScanOrder(),
+                dataTypes,
+                columnMergers,
+                node.getScanOrder() == Ordering.ASC
+                    ? new AscTimeComparator()
+                    : new DescTimeComparator());
+          }
+
+          private Operator generateLeftOuterTimeJoinOperator(
+              Operator left,
+              Operator right,
+              int leftColumnCount,
+              Map<InputLocation, Integer> outputColumnMap,
+              List<TSDataType> dataTypes) {
+            if (left == null) {
+              return right;
+            } else if (right == null) {
+              return left;
+            } else {
+              return new TableLeftOuterTimeJoinOperator(
+                  operatorContext,
+                  left,
+                  right,
+                  leftColumnCount,
+                  outputColumnMap,
+                  dataTypes,
+                  node.getScanOrder() == Ordering.ASC
+                      ? new AscTimeComparator()
+                      : new DescTimeComparator());
+            }
+          }
+
+          private Operator getReuseOffsetOperator(Operator child) {
+            this.reuseOffsetOperator =
+                reuseOffsetOperator == null
+                    ? new OffsetOperator(operatorContext, 
node.getPushDownOffset(), child)
+                    : new OffsetOperator(reuseOffsetOperator, child);
+            return this.reuseOffsetOperator;
+          }
+
+          private Operator getReuseLimitOperator(Operator child) {
+            this.reuseLimitOperator =
+                reuseLimitOperator == null
+                    ? new LimitOperator(operatorContext, 
node.getPushDownLimit(), child)
+                    : new LimitOperator(reuseLimitOperator, child);
+            return this.reuseLimitOperator;
+          }
+
+          private Operator getFilterAndProjectOperator(Operator childOperator) 
{
+            startCloseInternalOperator = childOperator;
+            if (filterAndProjectOperator != null) {
+              return new FilterAndProjectOperator(filterAndProjectOperator, 
childOperator);
+            }
+            List<TSDataType> inputDataTypeList = new 
ArrayList<>(fullColumnSchemas.size());
+            Map<Symbol, List<InputLocation>> symbolInputLocationMap =
+                new HashMap<>(fullColumnSchemas.size());
+            for (int i = 0; i < fullColumnSchemas.size(); i++) {
+              ColumnSchema columnSchema = fullColumnSchemas.get(i);
+              symbolInputLocationMap
+                  .computeIfAbsent(new Symbol(columnSchema.getName()), key -> 
new ArrayList<>())
+                  .add(new InputLocation(0, i));
+              inputDataTypeList.add(getTSDataType(columnSchema.getType()));
+            }
+            Expression combinedCannotPushDownPredicates =
+                cannotPushDownConjuncts.isEmpty()
+                    ? null
+                    : IrUtils.combineConjuncts(cannotPushDownConjuncts);
+            filterAndProjectOperator =
+                (FilterAndProjectOperator)
+                    
TableOperatorGenerator.this.constructFilterAndProjectOperator(
+                        Optional.ofNullable(combinedCannotPushDownPredicates),
+                        childOperator,
+                        node.getOutputSymbols().stream()
+                            .map(Symbol::toSymbolReference)
+                            .toArray(Expression[]::new),
+                        inputDataTypeList,
+                        symbolInputLocationMap,
+                        node.getPlanNodeId(),
+                        context);
+            return filterAndProjectOperator;
+          }
+
+          @Override
+          public Operator getCurrentDeviceRootOperator() {
+            return operator;
+          }
+
+          @Override
+          public List<Operator> getCurrentDeviceDataSourceOperators() {
+            return seriesScanOperators;
+          }
+
+          @Override
+          public Operator getCurrentDeviceStartCloseOperator() {
+            return startCloseInternalOperator == null ? operator : 
startCloseInternalOperator;
+          }
+        };
+
+    return new 
DeviceIteratorScanOperator.TreeNonAlignedDeviceViewScanParameters(
+        allSensors,
+        operatorContext,
+        node.getDeviceEntries(),
+        measurementColumnNames,
+        measurementSchemas,
+        deviceChildOperatorTreeGenerator);
+  }
+
+  private static class CommonTableScanOperatorParameters {
+
+    List<Symbol> outputColumnNames;
+    List<ColumnSchema> columnSchemas;
+    int[] columnsIndexArray;
+    Map<Symbol, ColumnSchema> columnSchemaMap;
+    Map<Symbol, Integer> idAndAttributeColumnsIndexMap;
+    List<String> measurementColumnNames;
+    Map<String, Integer> measurementColumnsIndexMap;
+    String timeColumnName;
+    List<IMeasurementSchema> measurementSchemas;
+    int measurementColumnCount;
+    int idx;
+
+    private CommonTableScanOperatorParameters(
+        DeviceTableScanNode node,
+        Map<String, String> fieldColumnsRenameMap,
+        boolean keepNonOutputMeasurementColumns) {
+      outputColumnNames = node.getOutputSymbols();
+      int outputColumnCount =
+          keepNonOutputMeasurementColumns ? node.getAssignments().size() : 
outputColumnNames.size();
+      columnSchemas = new ArrayList<>(outputColumnCount);
+      columnsIndexArray = new int[outputColumnCount];
+      columnSchemaMap = node.getAssignments();
+      idAndAttributeColumnsIndexMap = node.getIdAndAttributeIndexMap();
+      measurementColumnNames = new ArrayList<>();
+      measurementColumnsIndexMap = new HashMap<>();
+      measurementSchemas = new ArrayList<>();
+      measurementColumnCount = 0;
+      idx = 0;
+
+      for (Symbol columnName : outputColumnNames) {
+        ColumnSchema schema =
+            requireNonNull(columnSchemaMap.get(columnName), columnName + " is 
null");
+
+        switch (schema.getColumnCategory()) {
+          case TAG:
+          case ATTRIBUTE:
+            columnsIndexArray[idx++] =
+                requireNonNull(
+                    idAndAttributeColumnsIndexMap.get(columnName), columnName 
+ " is null");
+            columnSchemas.add(schema);
+            break;
+          case FIELD:
+            columnsIndexArray[idx++] = measurementColumnCount;
+            measurementColumnCount++;
+
+            String realMeasurementName =
+                fieldColumnsRenameMap.getOrDefault(schema.getName(), 
schema.getName());
+
+            measurementColumnNames.add(realMeasurementName);
+            measurementSchemas.add(
+                new MeasurementSchema(realMeasurementName, 
getTSDataType(schema.getType())));
+            columnSchemas.add(schema);
+            measurementColumnsIndexMap.put(columnName.getName(), 
measurementColumnCount - 1);
+            break;
+          case TIME:
+            columnsIndexArray[idx++] = -1;
+            columnSchemas.add(schema);
+            timeColumnName = columnName.getName();
+            break;
+          default:
+            throw new IllegalArgumentException(
+                "Unexpected column category: " + schema.getColumnCategory());
+        }
+      }
+      Set<Symbol> outputSet = new HashSet<>(outputColumnNames);
+      for (Map.Entry<Symbol, ColumnSchema> entry : 
node.getAssignments().entrySet()) {
+        if (!outputSet.contains(entry.getKey()) && 
entry.getValue().getColumnCategory() == FIELD) {
+          if (keepNonOutputMeasurementColumns) {
+            columnSchemas.add(entry.getValue());
+            columnsIndexArray[idx++] = measurementColumnCount;
+          }
+          measurementColumnCount++;
+          String realMeasurementName =
+              fieldColumnsRenameMap.getOrDefault(
+                  entry.getValue().getName(), entry.getValue().getName());
+
+          measurementColumnNames.add(realMeasurementName);
+          measurementSchemas.add(
+              new MeasurementSchema(
+                  realMeasurementName, 
getTSDataType(entry.getValue().getType())));
+          measurementColumnsIndexMap.put(entry.getKey().getName(), 
measurementColumnCount - 1);
+        } else if (entry.getValue().getColumnCategory() == TIME) {
+          timeColumnName = entry.getKey().getName();
+        }
+      }
+    }
   }
 
   public static IDeviceID.TreeDeviceIdColumnValueExtractor 
createTreeDeviceIdColumnValueExtractor(
@@ -453,78 +947,19 @@ public class TableOperatorGenerator extends 
PlanVisitor<Operator, LocalExecution
           String className,
           Map<String, String> fieldColumnsRenameMap) {
 
-    List<Symbol> outputColumnNames = node.getOutputSymbols();
-    int outputColumnCount = outputColumnNames.size();
-    List<ColumnSchema> columnSchemas = new ArrayList<>(outputColumnCount);
-    int[] columnsIndexArray = new int[outputColumnCount];
-    Map<Symbol, ColumnSchema> columnSchemaMap = node.getAssignments();
-    Map<Symbol, Integer> idAndAttributeColumnsIndexMap = 
node.getIdAndAttributeIndexMap();
-    List<String> measurementColumnNames = new ArrayList<>();
-    Map<String, Integer> measurementColumnsIndexMap = new HashMap<>();
-    String timeColumnName = null;
-    List<IMeasurementSchema> measurementSchemas = new ArrayList<>();
-    int measurementColumnCount = 0;
-    int idx = 0;
-    for (Symbol columnName : outputColumnNames) {
-      ColumnSchema schema =
-          requireNonNull(columnSchemaMap.get(columnName), columnName + " is 
null");
-
-      switch (schema.getColumnCategory()) {
-        case TAG:
-        case ATTRIBUTE:
-          columnsIndexArray[idx++] =
-              requireNonNull(
-                  idAndAttributeColumnsIndexMap.get(columnName), columnName + 
" is null");
-          columnSchemas.add(schema);
-          break;
-        case FIELD:
-          columnsIndexArray[idx++] = measurementColumnCount;
-          measurementColumnCount++;
-
-          String realMeasurementName =
-              fieldColumnsRenameMap.getOrDefault(schema.getName(), 
schema.getName());
-
-          measurementColumnNames.add(realMeasurementName);
-          measurementSchemas.add(
-              new MeasurementSchema(realMeasurementName, 
getTSDataType(schema.getType())));
-          columnSchemas.add(schema);
-          measurementColumnsIndexMap.put(columnName.getName(), 
measurementColumnCount - 1);
-          break;
-        case TIME:
-          columnsIndexArray[idx++] = -1;
-          columnSchemas.add(schema);
-          timeColumnName = columnName.getName();
-          break;
-        default:
-          throw new IllegalArgumentException(
-              "Unexpected column category: " + schema.getColumnCategory());
-      }
-    }
-
-    Set<Symbol> outputSet = new HashSet<>(outputColumnNames);
-    for (Map.Entry<Symbol, ColumnSchema> entry : 
node.getAssignments().entrySet()) {
-      if (!outputSet.contains(entry.getKey()) && 
entry.getValue().getColumnCategory() == FIELD) {
-        measurementColumnCount++;
-        String realMeasurementName =
-            fieldColumnsRenameMap.getOrDefault(
-                entry.getValue().getName(), entry.getValue().getName());
-
-        measurementColumnNames.add(realMeasurementName);
-        measurementSchemas.add(
-            new MeasurementSchema(realMeasurementName, 
getTSDataType(entry.getValue().getType())));
-        measurementColumnsIndexMap.put(entry.getKey().getName(), 
measurementColumnCount - 1);
-      } else if (entry.getValue().getColumnCategory() == TIME) {
-        timeColumnName = entry.getKey().getName();
-      }
-    }
-
+    CommonTableScanOperatorParameters commonParameter =
+        new CommonTableScanOperatorParameters(node, fieldColumnsRenameMap, 
false);
+    List<IMeasurementSchema> measurementSchemas = 
commonParameter.measurementSchemas;
+    List<String> measurementColumnNames = 
commonParameter.measurementColumnNames;
+    List<ColumnSchema> columnSchemas = commonParameter.columnSchemas;
+    int[] columnsIndexArray = commonParameter.columnsIndexArray;
     SeriesScanOptions seriesScanOptions =
         buildSeriesScanOptions(
             context,
-            columnSchemaMap,
+            commonParameter.columnSchemaMap,
             measurementColumnNames,
-            measurementColumnsIndexMap,
-            timeColumnName,
+            commonParameter.measurementColumnsIndexMap,
+            commonParameter.timeColumnName,
             node.getTimePredicate(),
             node.getPushDownLimit(),
             node.getPushDownOffset(),
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/DeviceIteratorScanOperatorTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/DeviceIteratorScanOperatorTest.java
new file mode 100644
index 00000000000..951c2b895a1
--- /dev/null
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/DeviceIteratorScanOperatorTest.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.execution.operator;
+
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.exception.MetadataException;
+import org.apache.iotdb.commons.path.AlignedFullPath;
+import org.apache.iotdb.db.queryengine.common.FragmentInstanceId;
+import org.apache.iotdb.db.queryengine.common.PlanFragmentId;
+import org.apache.iotdb.db.queryengine.common.QueryId;
+import org.apache.iotdb.db.queryengine.execution.driver.DriverContext;
+import 
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext;
+import 
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceStateMachine;
+import 
org.apache.iotdb.db.queryengine.execution.operator.source.AlignedSeriesScanOperator;
+import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.DeviceIteratorScanOperator;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
+import 
org.apache.iotdb.db.queryengine.plan.relational.metadata.AlignedDeviceEntry;
+import org.apache.iotdb.db.queryengine.plan.relational.metadata.DeviceEntry;
+import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering;
+import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSource;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+
+import org.apache.tsfile.exception.write.WriteProcessException;
+import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.read.common.block.TsBlock;
+import org.apache.tsfile.read.common.block.column.BinaryColumn;
+import org.apache.tsfile.read.common.block.column.BooleanColumn;
+import org.apache.tsfile.read.common.block.column.DoubleColumn;
+import org.apache.tsfile.read.common.block.column.FloatColumn;
+import org.apache.tsfile.read.common.block.column.IntColumn;
+import org.apache.tsfile.read.common.block.column.LongColumn;
+import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.write.schema.IMeasurementSchema;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.stream.Collectors;
+
+import static 
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
+import static 
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.SeriesScanOptions.getDefaultSeriesScanOptions;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class DeviceIteratorScanOperatorTest {
+
+  private static final String DEVICE_ITERATOR_SCAN_OPERATOR_TEST =
+      "root.DeviceIteratorScanOperatorTest";
+  private static final List<IMeasurementSchema> measurementSchemas = new 
ArrayList<>();
+
+  private static final List<TsFileResource> seqResources = new ArrayList<>();
+  private static final List<TsFileResource> unSeqResources = new ArrayList<>();
+
+  private static final double DELTA = 0.000001;
+
+  @BeforeClass
+  public static void setUp() throws MetadataException, IOException, 
WriteProcessException {
+    AlignedSeriesTestUtil.setUp(
+        measurementSchemas, seqResources, unSeqResources, 
DEVICE_ITERATOR_SCAN_OPERATOR_TEST);
+  }
+
+  @AfterClass
+  public static void tearDown() throws IOException {
+    AlignedSeriesTestUtil.tearDown(seqResources, unSeqResources);
+  }
+
+  @Test
+  public void test1() {
+    ExecutorService instanceNotificationExecutor =
+        IoTDBThreadPoolFactory.newFixedThreadPool(1, 
"test-instance-notification");
+    DeviceIteratorScanOperator operator = null;
+    try {
+      QueryId queryId = new QueryId("stub_query");
+      FragmentInstanceId instanceId =
+          new FragmentInstanceId(new PlanFragmentId(queryId, 0), 
"stub-instance");
+      FragmentInstanceStateMachine stateMachine =
+          new FragmentInstanceStateMachine(instanceId, 
instanceNotificationExecutor);
+      FragmentInstanceContext fragmentInstanceContext =
+          createFragmentInstanceContext(instanceId, stateMachine);
+      DriverContext driverContext = new DriverContext(fragmentInstanceContext, 
0);
+      PlanNodeId planNodeId = new PlanNodeId("1");
+      driverContext.addOperatorContext(
+          1, planNodeId, DeviceIteratorScanOperator.class.getSimpleName());
+
+      List<DeviceEntry> deviceEntries =
+          Arrays.asList(
+              new AlignedDeviceEntry(
+                  IDeviceID.Factory.DEFAULT_FACTORY.create(
+                      DEVICE_ITERATOR_SCAN_OPERATOR_TEST + ".device0"),
+                  new Binary[0]),
+              new AlignedDeviceEntry(
+                  IDeviceID.Factory.DEFAULT_FACTORY.create(
+                      DEVICE_ITERATOR_SCAN_OPERATOR_TEST + ".device0"),
+                  new Binary[0]),
+              new AlignedDeviceEntry(
+                  IDeviceID.Factory.DEFAULT_FACTORY.create(
+                      DEVICE_ITERATOR_SCAN_OPERATOR_TEST + ".device0"),
+                  new Binary[0]));
+
+      DeviceIteratorScanOperator.DeviceChildOperatorTreeGenerator generator =
+          new DeviceIteratorScanOperator.DeviceChildOperatorTreeGenerator() {
+
+            private Operator currentDeviceRootOperator;
+
+            @Override
+            public boolean keepOffsetAndLimitOperatorAfterDeviceIterator() {
+              return true;
+            }
+
+            @Override
+            public void generateCurrentDeviceOperatorTree(DeviceEntry 
deviceEntry) {
+              AlignedFullPath alignedPath =
+                  new AlignedFullPath(
+                      deviceEntry.getDeviceID(),
+                      measurementSchemas.stream()
+                          .map(IMeasurementSchema::getMeasurementName)
+                          .collect(Collectors.toList()),
+                      measurementSchemas.stream()
+                          .map(m -> (IMeasurementSchema) m)
+                          .collect(Collectors.toList()));
+              currentDeviceRootOperator =
+                  new AlignedSeriesScanOperator(
+                      driverContext.getOperatorContexts().get(0),
+                      planNodeId,
+                      alignedPath,
+                      Ordering.ASC,
+                      getDefaultSeriesScanOptions(alignedPath),
+                      false,
+                      null,
+                      -1);
+            }
+
+            @Override
+            public Operator getCurrentDeviceRootOperator() {
+              return currentDeviceRootOperator;
+            }
+
+            @Override
+            public List<Operator> getCurrentDeviceDataSourceOperators() {
+              return Collections.singletonList(currentDeviceRootOperator);
+            }
+
+            @Override
+            public Operator getCurrentDeviceStartCloseOperator() {
+              return currentDeviceRootOperator;
+            }
+          };
+      operator =
+          new DeviceIteratorScanOperator(
+              driverContext.getOperatorContexts().get(0), deviceEntries, 
generator);
+      operator.initQueryDataSource(new QueryDataSource(seqResources, 
unSeqResources));
+
+      int count = 0;
+      while (operator.isBlocked().isDone() && operator.hasNext()) {
+        if (count % 500 == 0) {
+          count = 0;
+        }
+        TsBlock tsBlock = operator.next();
+        if (tsBlock == null) {
+          continue;
+        }
+        assertEquals(measurementSchemas.size(), tsBlock.getValueColumnCount());
+        assertTrue(tsBlock.getColumn(0) instanceof BooleanColumn);
+        assertTrue(tsBlock.getColumn(1) instanceof IntColumn);
+        assertTrue(tsBlock.getColumn(2) instanceof LongColumn);
+        assertTrue(tsBlock.getColumn(3) instanceof FloatColumn);
+        assertTrue(tsBlock.getColumn(4) instanceof DoubleColumn);
+        assertTrue(tsBlock.getColumn(5) instanceof BinaryColumn);
+
+        for (int i = 0; i < tsBlock.getPositionCount(); i++, count++) {
+          assertEquals(count, tsBlock.getTimeByIndex(i));
+          int delta = 0;
+          if ((long) count < 200) {
+            delta = 20000;
+          } else if ((long) count < 260
+              || ((long) count >= 300 && (long) count < 380)
+              || (long) count >= 400) {
+            delta = 10000;
+          }
+          assertEquals((delta + (long) count) % 2 == 0, 
tsBlock.getColumn(0).getBoolean(i));
+          assertEquals(delta + (long) count, tsBlock.getColumn(1).getInt(i));
+          assertEquals(delta + (long) count, tsBlock.getColumn(2).getLong(i));
+          assertEquals(delta + (long) count, tsBlock.getColumn(3).getFloat(i), 
DELTA);
+          assertEquals(delta + (long) count, 
tsBlock.getColumn(4).getDouble(i), DELTA);
+          assertEquals(
+              String.valueOf(delta + (long) count), 
tsBlock.getColumn(5).getBinary(i).toString());
+        }
+      }
+      assertEquals(500, count);
+
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail();
+    } finally {
+      if (operator != null) {
+        try {
+          operator.close();
+        } catch (Exception ignored) {
+        }
+      }
+      instanceNotificationExecutor.shutdown();
+    }
+  }
+}
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/NonAlignedTreeDeviceViewScanOperatorTreeTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/NonAlignedTreeDeviceViewScanOperatorTreeTest.java
new file mode 100644
index 00000000000..69f7305e39d
--- /dev/null
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/NonAlignedTreeDeviceViewScanOperatorTreeTest.java
@@ -0,0 +1,650 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.execution.operator;
+
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.exception.MetadataException;
+import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
+import org.apache.iotdb.db.queryengine.common.FragmentInstanceId;
+import org.apache.iotdb.db.queryengine.common.PlanFragmentId;
+import org.apache.iotdb.db.queryengine.common.QueryId;
+import org.apache.iotdb.db.queryengine.execution.driver.DataDriverContext;
+import org.apache.iotdb.db.queryengine.execution.fragment.DataNodeQueryContext;
+import 
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext;
+import 
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceStateMachine;
+import 
org.apache.iotdb.db.queryengine.execution.operator.process.LimitOperator;
+import 
org.apache.iotdb.db.queryengine.execution.operator.process.OffsetOperator;
+import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.DeviceIteratorScanOperator;
+import org.apache.iotdb.db.queryengine.plan.analyze.TypeProvider;
+import org.apache.iotdb.db.queryengine.plan.planner.LocalExecutionPlanContext;
+import org.apache.iotdb.db.queryengine.plan.planner.TableOperatorGenerator;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.queryengine.plan.relational.analyzer.TestMatadata;
+import org.apache.iotdb.db.queryengine.plan.relational.metadata.ColumnSchema;
+import org.apache.iotdb.db.queryengine.plan.relational.metadata.DeviceEntry;
+import 
org.apache.iotdb.db.queryengine.plan.relational.metadata.NonAlignedAlignedDeviceEntry;
+import 
org.apache.iotdb.db.queryengine.plan.relational.metadata.QualifiedObjectName;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.SymbolsExtractor;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.ir.IrUtils;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TreeNonAlignedDeviceViewScanNode;
+import 
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ComparisonExpression;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Expression;
+import 
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.LogicalExpression;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.LongLiteral;
+import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering;
+import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSource;
+import 
org.apache.iotdb.db.storageengine.dataregion.read.reader.series.SeriesReaderTestUtil;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.exception.write.WriteProcessException;
+import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.read.common.block.TsBlock;
+import org.apache.tsfile.read.common.type.Type;
+import org.apache.tsfile.read.common.type.TypeEnum;
+import org.apache.tsfile.read.common.type.TypeFactory;
+import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.write.schema.IMeasurementSchema;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.stream.Collectors;
+
+import static 
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
+import static 
org.apache.iotdb.db.queryengine.execution.operator.Operator.NOT_BLOCKED;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class NonAlignedTreeDeviceViewScanOperatorTreeTest {
+
+  private static final String 
NON_ALIGNED_TREE_DEVICE_VIEW_SCAN_OPERATOR_TREE_TEST =
+      "root.NonAlignedTreeDeviceViewScanOperatorTreeTest";
+  private final TableOperatorGenerator tableOperatorGenerator =
+      new TableOperatorGenerator(new TestMatadata());
+  private final List<String> deviceIds = new ArrayList<>();
+  private final List<IMeasurementSchema> measurementSchemas = new 
ArrayList<>();
+
+  private final List<TsFileResource> seqResources = new ArrayList<>();
+  private final List<TsFileResource> unSeqResources = new ArrayList<>();
+
+  private final Map<Symbol, ColumnSchema> columnSchemaMap = new HashMap<>();
+  private TypeProvider typeProvider;
+
+  @Before
+  public void setUp() throws MetadataException, IOException, 
WriteProcessException {
+    SeriesReaderTestUtil.setUp(
+        measurementSchemas,
+        deviceIds,
+        seqResources,
+        unSeqResources,
+        NON_ALIGNED_TREE_DEVICE_VIEW_SCAN_OPERATOR_TREE_TEST);
+
+    columnSchemaMap.put(
+        new Symbol("tag1"),
+        new ColumnSchema(
+            "tag1", TypeFactory.getType(TSDataType.TEXT), false, 
TsTableColumnCategory.TAG));
+    columnSchemaMap.put(
+        new Symbol("time"),
+        new ColumnSchema(
+            "time", TypeFactory.getType(TSDataType.INT64), false, 
TsTableColumnCategory.TIME));
+    columnSchemaMap.put(
+        new Symbol("sensor0"),
+        new ColumnSchema(
+            "sensor0", TypeFactory.getType(TSDataType.INT32), false, 
TsTableColumnCategory.FIELD));
+    columnSchemaMap.put(
+        new Symbol("sensor1"),
+        new ColumnSchema(
+            "sensor1", TypeFactory.getType(TSDataType.INT32), false, 
TsTableColumnCategory.FIELD));
+    columnSchemaMap.put(
+        new Symbol("sensor2"),
+        new ColumnSchema(
+            "sensor2", TypeFactory.getType(TSDataType.INT32), false, 
TsTableColumnCategory.FIELD));
+    columnSchemaMap.put(
+        new Symbol("sensor3"),
+        new ColumnSchema(
+            "sensor3", TypeFactory.getType(TSDataType.INT32), false, 
TsTableColumnCategory.FIELD));
+
+    Map<Symbol, Type> symbolTSDataTypeMap = new HashMap<>();
+    symbolTSDataTypeMap.put(new Symbol("sensor0"), 
TypeFactory.getType(TSDataType.INT32));
+    symbolTSDataTypeMap.put(new Symbol("sensor1"), 
TypeFactory.getType(TSDataType.INT32));
+    symbolTSDataTypeMap.put(new Symbol("sensor2"), 
TypeFactory.getType(TSDataType.INT32));
+    symbolTSDataTypeMap.put(new Symbol("sensor3"), 
TypeFactory.getType(TSDataType.INT32));
+    symbolTSDataTypeMap.put(new Symbol("time"), 
TypeFactory.getType(TypeEnum.INT64));
+    symbolTSDataTypeMap.put(new Symbol("tag1"), 
TypeFactory.getType(TSDataType.TEXT));
+    typeProvider = new TypeProvider(symbolTSDataTypeMap);
+  }
+
+  @After
+  public void tearDown() throws IOException {
+    SeriesReaderTestUtil.tearDown(seqResources, unSeqResources);
+  }
+
+  @Test
+  public void testScanWithPushDownPredicateAndLimitAndOffset() throws 
Exception {
+    List<String> outputColumnList = Arrays.asList("sensor0", "sensor1", 
"sensor2", "time", "tag1");
+    TreeNonAlignedDeviceViewScanNode node = 
getTreeNonAlignedDeviceViewScanNode(outputColumnList);
+    node.setPushDownOffset(500);
+    node.setPushDownLimit(500);
+    node.setPushDownPredicate(
+        new ComparisonExpression(
+            ComparisonExpression.Operator.GREATER_THAN,
+            new Symbol("sensor1").toSymbolReference(),
+            new LongLiteral("1000")));
+    ExecutorService instanceNotificationExecutor =
+        IoTDBThreadPoolFactory.newFixedThreadPool(1, 
"test-instance-notification");
+    Operator operator = getOperator(node, instanceNotificationExecutor);
+    assertTrue(operator instanceof DeviceIteratorScanOperator);
+    try {
+      checkResult(operator, outputColumnList, 500);
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    } finally {
+      operator.close();
+      instanceNotificationExecutor.shutdown();
+    }
+  }
+
+  @Test
+  public void testScanWithPushDownPredicateAndPushLimitToEachDevice() throws 
Exception {
+    List<String> outputColumnList = Arrays.asList("sensor0", "sensor1", 
"sensor2", "time", "tag1");
+    TreeNonAlignedDeviceViewScanNode node = 
getTreeNonAlignedDeviceViewScanNode(outputColumnList);
+    node.setPushLimitToEachDevice(true);
+    node.setPushDownLimit(500);
+    node.setPushDownPredicate(
+        new ComparisonExpression(
+            ComparisonExpression.Operator.GREATER_THAN,
+            new Symbol("sensor1").toSymbolReference(),
+            new LongLiteral("1000")));
+    ExecutorService instanceNotificationExecutor =
+        IoTDBThreadPoolFactory.newFixedThreadPool(1, 
"test-instance-notification");
+    Operator operator = getOperator(node, instanceNotificationExecutor);
+    assertTrue(operator instanceof DeviceIteratorScanOperator);
+    try {
+      checkResult(operator, outputColumnList, 1320);
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    } finally {
+      operator.close();
+      instanceNotificationExecutor.shutdown();
+    }
+  }
+
+  @Test
+  public void 
testScanWithCanPushDownPredicateAndCannotPushDownPredicateAndPushLimitToEachDevice()
+      throws Exception {
+    ExecutorService instanceNotificationExecutor =
+        IoTDBThreadPoolFactory.newFixedThreadPool(1, 
"test-instance-notification");
+    List<String> outputColumnList = Arrays.asList("sensor0", "sensor1", 
"sensor2", "time", "tag1");
+    TreeNonAlignedDeviceViewScanNode node =
+        getTreeNonAlignedDeviceViewScanNode(
+            outputColumnList,
+            Arrays.asList("sensor0", "sensor1", "sensor2", "time", "tag1", 
"sensor3"));
+    node.setPushDownLimit(100);
+    node.setPushLimitToEachDevice(true);
+    node.setPushDownPredicate(
+        new LogicalExpression(
+            LogicalExpression.Operator.AND,
+            Arrays.asList(
+                new LogicalExpression(
+                    LogicalExpression.Operator.OR,
+                    Arrays.asList(
+                        new ComparisonExpression(
+                            ComparisonExpression.Operator.GREATER_THAN,
+                            new Symbol("sensor0").toSymbolReference(),
+                            new LongLiteral("1000")),
+                        new ComparisonExpression(
+                            ComparisonExpression.Operator.GREATER_THAN,
+                            new Symbol("sensor1").toSymbolReference(),
+                            new LongLiteral("1000")))),
+                new ComparisonExpression(
+                    ComparisonExpression.Operator.GREATER_THAN,
+                    new Symbol("sensor2").toSymbolReference(),
+                    new LongLiteral("1000")),
+                new ComparisonExpression(
+                    ComparisonExpression.Operator.GREATER_THAN,
+                    new Symbol("sensor3").toSymbolReference(),
+                    new LongLiteral("1000")))));
+    Operator operator = getOperator(node, instanceNotificationExecutor);
+    assertTrue(operator instanceof DeviceIteratorScanOperator);
+    try {
+      checkResult(operator, outputColumnList, 300);
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    } finally {
+      operator.close();
+      instanceNotificationExecutor.shutdown();
+    }
+  }
+
+  @Test
+  public void testScanWithPushDownPredicateAndPushLimitToEachDevice1() throws 
Exception {
+    List<String> outputColumnList = Arrays.asList("sensor0", "sensor1", 
"sensor2", "time", "tag1");
+    TreeNonAlignedDeviceViewScanNode node =
+        getTreeNonAlignedDeviceViewScanNode(
+            outputColumnList,
+            Arrays.asList("sensor0", "sensor1", "sensor2", "time", "tag1", 
"sensor3"));
+    node.setPushDownPredicate(
+        new ComparisonExpression(
+            ComparisonExpression.Operator.GREATER_THAN,
+            new Symbol("sensor3").toSymbolReference(),
+            new LongLiteral("1000")));
+    node.setPushLimitToEachDevice(true);
+    node.setPushDownLimit(10);
+    ExecutorService instanceNotificationExecutor =
+        IoTDBThreadPoolFactory.newFixedThreadPool(1, 
"test-instance-notification");
+    Operator operator = getOperator(node, instanceNotificationExecutor);
+    assertTrue(operator instanceof DeviceIteratorScanOperator);
+    try {
+      checkResult(operator, outputColumnList, 30);
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    } finally {
+      operator.close();
+      instanceNotificationExecutor.shutdown();
+    }
+  }
+
+  @Test
+  public void testScanWithCannotPushDownPredicateAndLimitAndOffset2() throws 
Exception {
+    List<String> outputColumnList = Arrays.asList("sensor0", "sensor1", 
"sensor2", "time", "tag1");
+    TreeNonAlignedDeviceViewScanNode node = 
getTreeNonAlignedDeviceViewScanNode(outputColumnList);
+    node.setPushDownOffset(500);
+    node.setPushDownLimit(500);
+    node.setPushDownPredicate(
+        new LogicalExpression(
+            LogicalExpression.Operator.OR,
+            Arrays.asList(
+                new ComparisonExpression(
+                    ComparisonExpression.Operator.GREATER_THAN,
+                    new Symbol("sensor1").toSymbolReference(),
+                    new LongLiteral("1000")),
+                new ComparisonExpression(
+                    ComparisonExpression.Operator.GREATER_THAN,
+                    new Symbol("sensor2").toSymbolReference(),
+                    new LongLiteral("1000")))));
+    ExecutorService instanceNotificationExecutor =
+        IoTDBThreadPoolFactory.newFixedThreadPool(1, 
"test-instance-notification");
+    Operator operator = getOperator(node, instanceNotificationExecutor);
+    assertTrue(operator instanceof LimitOperator);
+    try {
+      checkResult(operator, outputColumnList, 500);
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    } finally {
+      operator.close();
+      instanceNotificationExecutor.shutdown();
+    }
+  }
+
+  @Test
+  public void testScanWithLimit() throws Exception {
+    List<String> outputColumnList = Arrays.asList("sensor0", "sensor1", 
"sensor2", "time", "tag1");
+    TreeNonAlignedDeviceViewScanNode node = 
getTreeNonAlignedDeviceViewScanNode(outputColumnList);
+    node.setPushDownLimit(500);
+    ExecutorService instanceNotificationExecutor =
+        IoTDBThreadPoolFactory.newFixedThreadPool(1, 
"test-instance-notification");
+    Operator operator = getOperator(node, instanceNotificationExecutor);
+    assertTrue(operator instanceof LimitOperator);
+    try {
+      checkResult(operator, outputColumnList, 500);
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    } finally {
+      operator.close();
+      instanceNotificationExecutor.shutdown();
+    }
+  }
+
+  @Test
+  public void testScanWithPushLimitToEachDevice() throws Exception {
+    List<String> outputColumnList = Arrays.asList("sensor0", "sensor1", 
"sensor2", "time", "tag1");
+    TreeNonAlignedDeviceViewScanNode node = 
getTreeNonAlignedDeviceViewScanNode(outputColumnList);
+    node.setPushDownLimit(500);
+    node.setPushLimitToEachDevice(true);
+    ExecutorService instanceNotificationExecutor =
+        IoTDBThreadPoolFactory.newFixedThreadPool(1, 
"test-instance-notification");
+    Operator operator = getOperator(node, instanceNotificationExecutor);
+    assertTrue(operator instanceof DeviceIteratorScanOperator);
+    try {
+      checkResult(operator, outputColumnList, 1500);
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    } finally {
+      operator.close();
+      instanceNotificationExecutor.shutdown();
+    }
+  }
+
+  @Test
+  public void testScanWithOneFieldColumn() throws Exception {
+    List<String> outputColumnList = Arrays.asList("sensor0", "time", "tag1");
+    TreeNonAlignedDeviceViewScanNode node = 
getTreeNonAlignedDeviceViewScanNode(outputColumnList);
+    node.setPushDownLimit(500);
+    node.setPushDownPredicate(
+        new ComparisonExpression(
+            ComparisonExpression.Operator.GREATER_THAN,
+            new Symbol("sensor0").toSymbolReference(),
+            new LongLiteral("1000")));
+    ExecutorService instanceNotificationExecutor =
+        IoTDBThreadPoolFactory.newFixedThreadPool(1, 
"test-instance-notification");
+    Operator operator = getOperator(node, instanceNotificationExecutor);
+    assertTrue(operator instanceof DeviceIteratorScanOperator);
+    try {
+      checkResult(operator, outputColumnList, 500);
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    } finally {
+      operator.close();
+      instanceNotificationExecutor.shutdown();
+    }
+  }
+
+  @Test
+  public void testScanWithOneFieldColumnAndPushLimitToEachDevice() throws 
Exception {
+    List<String> outputColumnList = Arrays.asList("sensor0", "time", "tag1");
+    TreeNonAlignedDeviceViewScanNode node = 
getTreeNonAlignedDeviceViewScanNode(outputColumnList);
+    node.setPushDownLimit(500);
+    node.setPushLimitToEachDevice(true);
+    node.setPushDownPredicate(
+        new ComparisonExpression(
+            ComparisonExpression.Operator.GREATER_THAN,
+            new Symbol("sensor0").toSymbolReference(),
+            new LongLiteral("1000")));
+    ExecutorService instanceNotificationExecutor =
+        IoTDBThreadPoolFactory.newFixedThreadPool(1, 
"test-instance-notification");
+    Operator operator = getOperator(node, instanceNotificationExecutor);
+    assertTrue(operator instanceof DeviceIteratorScanOperator);
+    try {
+      checkResult(operator, outputColumnList, 1320);
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    } finally {
+      operator.close();
+      instanceNotificationExecutor.shutdown();
+    }
+  }
+
+  @Test
+  public void testScanWithOffset() throws Exception {
+    List<String> outputColumnList = Arrays.asList("sensor0", "sensor1", 
"sensor2", "time", "tag1");
+    TreeNonAlignedDeviceViewScanNode node = 
getTreeNonAlignedDeviceViewScanNode(outputColumnList);
+    node.setPushDownOffset(1200);
+    ExecutorService instanceNotificationExecutor =
+        IoTDBThreadPoolFactory.newFixedThreadPool(1, 
"test-instance-notification");
+    Operator operator = getOperator(node, instanceNotificationExecutor);
+    assertTrue(operator instanceof OffsetOperator);
+    try {
+      checkResult(operator, outputColumnList, 300);
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    } finally {
+      operator.close();
+      instanceNotificationExecutor.shutdown();
+    }
+  }
+
+  @Test
+  public void testScanWithPushDownPredicateAndOffset1() throws Exception {
+    List<String> outputColumnList = Arrays.asList("sensor0", "sensor1", 
"sensor2", "time", "tag1");
+    TreeNonAlignedDeviceViewScanNode node = 
getTreeNonAlignedDeviceViewScanNode(outputColumnList);
+    node.setPushDownOffset(1200);
+    node.setPushDownPredicate(
+        new ComparisonExpression(
+            ComparisonExpression.Operator.GREATER_THAN,
+            new Symbol("sensor1").toSymbolReference(),
+            new LongLiteral("0")));
+    ExecutorService instanceNotificationExecutor =
+        IoTDBThreadPoolFactory.newFixedThreadPool(1, 
"test-instance-notification");
+    Operator operator = getOperator(node, instanceNotificationExecutor);
+    assertTrue(operator instanceof DeviceIteratorScanOperator);
+    try {
+      checkResult(operator, outputColumnList, 300);
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    } finally {
+      operator.close();
+      instanceNotificationExecutor.shutdown();
+    }
+  }
+
+  @Test
+  public void testScanWithPushDownPredicateAndOffset2() throws Exception {
+    List<String> outputColumnList = Arrays.asList("sensor0", "sensor1", 
"sensor2", "time", "tag1");
+    TreeNonAlignedDeviceViewScanNode node = 
getTreeNonAlignedDeviceViewScanNode(outputColumnList);
+    node.setPushDownOffset(1200);
+    node.setPushDownPredicate(
+        new LogicalExpression(
+            LogicalExpression.Operator.AND,
+            Arrays.asList(
+                new ComparisonExpression(
+                    ComparisonExpression.Operator.GREATER_THAN,
+                    new Symbol("sensor1").toSymbolReference(),
+                    new LongLiteral("0")),
+                new ComparisonExpression(
+                    ComparisonExpression.Operator.GREATER_THAN,
+                    new Symbol("sensor2").toSymbolReference(),
+                    new LongLiteral("0")))));
+    ExecutorService instanceNotificationExecutor =
+        IoTDBThreadPoolFactory.newFixedThreadPool(1, 
"test-instance-notification");
+    Operator operator = getOperator(node, instanceNotificationExecutor);
+    assertTrue(operator instanceof DeviceIteratorScanOperator);
+    try {
+      checkResult(operator, outputColumnList, 300);
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    } finally {
+      operator.close();
+      instanceNotificationExecutor.shutdown();
+    }
+  }
+
+  @Test
+  public void testScanWithPushDownPredicate() throws Exception {
+    List<String> outputColumnList = Arrays.asList("sensor0", "sensor1", 
"sensor2", "time", "tag1");
+    TreeNonAlignedDeviceViewScanNode node =
+        getTreeNonAlignedDeviceViewScanNode(
+            outputColumnList,
+            Arrays.asList("sensor0", "sensor1", "sensor2", "time", "tag1", 
"sensor3"));
+    node.setPushDownPredicate(
+        new ComparisonExpression(
+            ComparisonExpression.Operator.GREATER_THAN,
+            new Symbol("sensor3").toSymbolReference(),
+            new LongLiteral("1000")));
+    ExecutorService instanceNotificationExecutor =
+        IoTDBThreadPoolFactory.newFixedThreadPool(1, 
"test-instance-notification");
+    Operator operator = getOperator(node, instanceNotificationExecutor);
+    try {
+      checkResult(operator, outputColumnList, 1320);
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    } finally {
+      operator.close();
+      instanceNotificationExecutor.shutdown();
+    }
+  }
+
+  @Test
+  public void testUtils() {
+    Expression expression =
+        new LogicalExpression(
+            LogicalExpression.Operator.AND,
+            Arrays.asList(
+                new LogicalExpression(
+                    LogicalExpression.Operator.OR,
+                    Arrays.asList(
+                        new ComparisonExpression(
+                            ComparisonExpression.Operator.GREATER_THAN,
+                            new Symbol("sensor0").toSymbolReference(),
+                            new LongLiteral("1000")),
+                        new ComparisonExpression(
+                            ComparisonExpression.Operator.GREATER_THAN,
+                            new Symbol("sensor1").toSymbolReference(),
+                            new LongLiteral("1000")))),
+                new ComparisonExpression(
+                    ComparisonExpression.Operator.GREATER_THAN,
+                    new Symbol("sensor2").toSymbolReference(),
+                    new LongLiteral("1000")),
+                new ComparisonExpression(
+                    ComparisonExpression.Operator.GREATER_THAN,
+                    new Symbol("sensor3").toSymbolReference(),
+                    new LongLiteral("1000"))));
+    List<Expression> conjuncts = IrUtils.extractConjuncts(expression);
+    assertEquals(3, conjuncts.size());
+    Set<Symbol> symbols = SymbolsExtractor.extractUnique(expression);
+    assertEquals(4, symbols.size());
+    assertTrue(symbols.contains(new Symbol("sensor0")));
+    assertTrue(symbols.contains(new Symbol("sensor1")));
+    assertTrue(symbols.contains(new Symbol("sensor2")));
+    assertTrue(symbols.contains(new Symbol("sensor3")));
+  }
+
+  private Operator getOperator(
+      TreeNonAlignedDeviceViewScanNode node, ExecutorService 
instanceNotificationExecutor) {
+    QueryId queryId = new QueryId("stub_query");
+    FragmentInstanceId instanceId =
+        new FragmentInstanceId(new PlanFragmentId(queryId, 0), 
"stub-instance");
+    FragmentInstanceStateMachine stateMachine =
+        new FragmentInstanceStateMachine(instanceId, 
instanceNotificationExecutor);
+    FragmentInstanceContext fragmentInstanceContext =
+        createFragmentInstanceContext(instanceId, stateMachine);
+    DataDriverContext driverContext = new 
DataDriverContext(fragmentInstanceContext, 0);
+    PlanNodeId planNodeId = new PlanNodeId("1");
+    driverContext.addOperatorContext(
+        1, planNodeId, DeviceIteratorScanOperator.class.getSimpleName());
+
+    LocalExecutionPlanContext localExecutionPlanContext =
+        new LocalExecutionPlanContext(
+            typeProvider, fragmentInstanceContext, new 
DataNodeQueryContext(1));
+    Operator operator =
+        tableOperatorGenerator.visitTreeNonAlignedDeviceViewScan(node, 
localExecutionPlanContext);
+    ((DataDriverContext) localExecutionPlanContext.getDriverContext())
+        .getSourceOperators()
+        .forEach(
+            sourceOperator ->
+                sourceOperator.initQueryDataSource(
+                    new QueryDataSource(seqResources, unSeqResources)));
+    return operator;
+  }
+
+  private void checkResult(Operator operator, List<String> outputColumnList, 
int expectedCount)
+      throws Exception {
+    int count = 0;
+    while (operator.isBlocked().get() != NOT_BLOCKED && operator.hasNext()) {
+      TsBlock tsBlock = operator.next();
+      if (tsBlock == null) {
+        continue;
+      }
+      assertEquals(outputColumnList.size(), tsBlock.getValueColumnCount());
+      for (int i = 0; i < outputColumnList.size(); i++) {
+        Symbol symbol = new Symbol(outputColumnList.get(i));
+        assertEquals(
+            columnSchemaMap.get(symbol).getType(),
+            TypeFactory.getType(tsBlock.getColumn(i).getDataType()));
+      }
+      count += tsBlock.getPositionCount();
+    }
+    assertEquals(expectedCount, count);
+  }
+
+  private TreeNonAlignedDeviceViewScanNode getTreeNonAlignedDeviceViewScanNode(
+      List<String> outputColumns) {
+    return getTreeNonAlignedDeviceViewScanNode(outputColumns, outputColumns);
+  }
+
+  private TreeNonAlignedDeviceViewScanNode getTreeNonAlignedDeviceViewScanNode(
+      List<String> outputColumns, List<String> assignmentColumns) {
+    List<Symbol> outputSymbols =
+        outputColumns.stream().map(Symbol::new).collect(Collectors.toList());
+
+    Map<Symbol, ColumnSchema> assignments = new HashMap<>();
+    for (String assignmentColumn : assignmentColumns) {
+      Symbol symbol = new Symbol(assignmentColumn);
+      assignments.put(symbol, columnSchemaMap.get(symbol));
+    }
+
+    Map<Symbol, Integer> idAndAttributeIndexMap = new HashMap<>();
+    idAndAttributeIndexMap.put(new Symbol("tag1"), 0);
+
+    List<DeviceEntry> deviceEntries =
+        Arrays.asList(
+            new NonAlignedAlignedDeviceEntry(
+                IDeviceID.Factory.DEFAULT_FACTORY.create(
+                    NON_ALIGNED_TREE_DEVICE_VIEW_SCAN_OPERATOR_TREE_TEST + 
".device0"),
+                new Binary[0]),
+            new NonAlignedAlignedDeviceEntry(
+                IDeviceID.Factory.DEFAULT_FACTORY.create(
+                    NON_ALIGNED_TREE_DEVICE_VIEW_SCAN_OPERATOR_TREE_TEST + 
".device1"),
+                new Binary[0]),
+            new NonAlignedAlignedDeviceEntry(
+                IDeviceID.Factory.DEFAULT_FACTORY.create(
+                    NON_ALIGNED_TREE_DEVICE_VIEW_SCAN_OPERATOR_TREE_TEST + 
".device1"),
+                new Binary[0]));
+    Expression timePredicate = null;
+    Expression pushDownPredicate = null;
+    long pushDownLimit = 0;
+    long pushDownOffset = 0;
+    boolean pushLimitToEachDevice = false;
+    boolean containsNonAlignedDevice = true;
+    String treeDBName = NON_ALIGNED_TREE_DEVICE_VIEW_SCAN_OPERATOR_TREE_TEST;
+
+    Map<String, String> measurementColumnNameMap = new HashMap<>();
+    return new TreeNonAlignedDeviceViewScanNode(
+        new PlanNodeId("1"),
+        new QualifiedObjectName(
+            
NON_ALIGNED_TREE_DEVICE_VIEW_SCAN_OPERATOR_TREE_TEST.toLowerCase(), "table1"),
+        outputSymbols,
+        assignments,
+        deviceEntries,
+        idAndAttributeIndexMap,
+        Ordering.ASC,
+        timePredicate,
+        pushDownPredicate,
+        pushDownLimit,
+        pushDownOffset,
+        pushLimitToEachDevice,
+        containsNonAlignedDevice,
+        treeDBName,
+        measurementColumnNameMap);
+  }
+}
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/TreeToTableViewAdaptorOperatorTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/TreeToTableViewAdaptorOperatorTest.java
new file mode 100644
index 00000000000..8ef662e9db9
--- /dev/null
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/TreeToTableViewAdaptorOperatorTest.java
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.execution.operator;
+
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.exception.MetadataException;
+import org.apache.iotdb.commons.path.AlignedFullPath;
+import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
+import org.apache.iotdb.db.queryengine.common.FragmentInstanceId;
+import org.apache.iotdb.db.queryengine.common.PlanFragmentId;
+import org.apache.iotdb.db.queryengine.common.QueryId;
+import org.apache.iotdb.db.queryengine.execution.driver.DriverContext;
+import 
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext;
+import 
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceStateMachine;
+import 
org.apache.iotdb.db.queryengine.execution.operator.source.AlignedSeriesScanOperator;
+import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.TreeToTableViewAdaptorOperator;
+import org.apache.iotdb.db.queryengine.plan.planner.TableOperatorGenerator;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
+import 
org.apache.iotdb.db.queryengine.plan.relational.metadata.AlignedDeviceEntry;
+import org.apache.iotdb.db.queryengine.plan.relational.metadata.ColumnSchema;
+import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering;
+import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSource;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+
+import io.airlift.units.Duration;
+import org.apache.tsfile.common.conf.TSFileConfig;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.exception.write.WriteProcessException;
+import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.read.common.block.TsBlock;
+import org.apache.tsfile.read.common.block.column.BinaryColumn;
+import org.apache.tsfile.read.common.block.column.BooleanColumn;
+import org.apache.tsfile.read.common.block.column.DoubleColumn;
+import org.apache.tsfile.read.common.block.column.FloatColumn;
+import org.apache.tsfile.read.common.block.column.IntColumn;
+import org.apache.tsfile.read.common.block.column.LongColumn;
+import org.apache.tsfile.read.common.block.column.RunLengthEncodedColumn;
+import org.apache.tsfile.read.common.block.column.TimeColumn;
+import org.apache.tsfile.read.common.type.TypeFactory;
+import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.write.schema.IMeasurementSchema;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static 
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
+import static 
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.SeriesScanOptions.getDefaultSeriesScanOptions;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class TreeToTableViewAdaptorOperatorTest {
+  private static final String TREE_TO_TABLE_VIEW_ADAPTOR_OPERATOR_TEST =
+      "root.TreeToTableViewAdaptorOperatorTest";
+  private static final List<IMeasurementSchema> measurementSchemas = new 
ArrayList<>();
+
+  private static final List<TsFileResource> seqResources = new ArrayList<>();
+  private static final List<TsFileResource> unSeqResources = new ArrayList<>();
+
+  private static final double DELTA = 0.000001;
+
+  @BeforeClass
+  public static void setUp() throws MetadataException, IOException, 
WriteProcessException {
+    AlignedSeriesTestUtil.setUp(
+        measurementSchemas, seqResources, unSeqResources, 
TREE_TO_TABLE_VIEW_ADAPTOR_OPERATOR_TEST);
+  }
+
+  @AfterClass
+  public static void tearDown() throws IOException {
+    AlignedSeriesTestUtil.tearDown(seqResources, unSeqResources);
+  }
+
+  @Test
+  public void test1() {
+    ExecutorService instanceNotificationExecutor =
+        IoTDBThreadPoolFactory.newFixedThreadPool(1, 
"test-instance-notification");
+    Operator operator = null;
+    try {
+      AlignedFullPath alignedPath =
+          new AlignedFullPath(
+              IDeviceID.Factory.DEFAULT_FACTORY.create(
+                  TREE_TO_TABLE_VIEW_ADAPTOR_OPERATOR_TEST + ".device0"),
+              measurementSchemas.stream()
+                  .map(IMeasurementSchema::getMeasurementName)
+                  .collect(Collectors.toList()),
+              measurementSchemas.stream()
+                  .map(m -> (IMeasurementSchema) m)
+                  .collect(Collectors.toList()));
+      QueryId queryId = new QueryId("stub_query");
+      FragmentInstanceId instanceId =
+          new FragmentInstanceId(new PlanFragmentId(queryId, 0), 
"stub-instance");
+      FragmentInstanceStateMachine stateMachine =
+          new FragmentInstanceStateMachine(instanceId, 
instanceNotificationExecutor);
+      FragmentInstanceContext fragmentInstanceContext =
+          createFragmentInstanceContext(instanceId, stateMachine);
+      DriverContext driverContext = new DriverContext(fragmentInstanceContext, 
0);
+      PlanNodeId planNodeId = new PlanNodeId("1");
+      driverContext.addOperatorContext(
+          1, planNodeId, AlignedSeriesScanOperator.class.getSimpleName());
+
+      AlignedSeriesScanOperator seriesScanOperator =
+          new AlignedSeriesScanOperator(
+              driverContext.getOperatorContexts().get(0),
+              planNodeId,
+              alignedPath,
+              Ordering.ASC,
+              getDefaultSeriesScanOptions(alignedPath),
+              false,
+              null,
+              -1);
+      seriesScanOperator.initQueryDataSource(new QueryDataSource(seqResources, 
unSeqResources));
+      seriesScanOperator
+          .getOperatorContext()
+          .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
+      List<ColumnSchema> columnSchemas = new 
ArrayList<>(measurementSchemas.size() + 3);
+      int[] columnIndexArray = new int[measurementSchemas.size() + 3];
+
+      for (int i = 0; i < measurementSchemas.size(); i++) {
+        IMeasurementSchema measurementSchema = measurementSchemas.get(i);
+        columnSchemas.add(
+            new ColumnSchema(
+                measurementSchema.getMeasurementName(),
+                TypeFactory.getType(measurementSchema.getType()),
+                false,
+                TsTableColumnCategory.FIELD));
+        columnIndexArray[i] = i;
+      }
+      columnSchemas.add(
+          new ColumnSchema(
+              "tag", TypeFactory.getType(TSDataType.STRING), false, 
TsTableColumnCategory.TAG));
+      columnIndexArray[measurementSchemas.size()] = 0;
+      columnSchemas.add(
+          new ColumnSchema(
+              "attr",
+              TypeFactory.getType(TSDataType.STRING),
+              false,
+              TsTableColumnCategory.ATTRIBUTE));
+      columnIndexArray[measurementSchemas.size() + 1] = 0;
+      columnSchemas.add(
+          new ColumnSchema(
+              "time",
+              TypeFactory.getType(TSDataType.TIMESTAMP),
+              false,
+              TsTableColumnCategory.TIME));
+      columnIndexArray[measurementSchemas.size() + 2] = -1;
+
+      operator =
+          new TreeToTableViewAdaptorOperator(
+              driverContext.addOperatorContext(
+                  2, planNodeId, 
TreeToTableViewAdaptorOperator.class.getSimpleName()),
+              new AlignedDeviceEntry(
+                  alignedPath.getDeviceId(),
+                  new Binary[] {new Binary("attr1", 
TSFileConfig.STRING_CHARSET)}),
+              columnIndexArray,
+              columnSchemas,
+              seriesScanOperator,
+              TableOperatorGenerator.createTreeDeviceIdColumnValueExtractor(
+                  TREE_TO_TABLE_VIEW_ADAPTOR_OPERATOR_TEST));
+      int count = 0;
+      while (operator.hasNext()) {
+        TsBlock tsBlock = operator.next();
+        if (tsBlock == null) {
+          continue;
+        }
+
+        assertEquals(columnSchemas.size(), tsBlock.getValueColumnCount());
+        assertTrue(tsBlock.getColumn(0) instanceof BooleanColumn);
+        assertTrue(tsBlock.getColumn(1) instanceof IntColumn);
+        assertTrue(tsBlock.getColumn(2) instanceof LongColumn);
+        assertTrue(tsBlock.getColumn(3) instanceof FloatColumn);
+        assertTrue(tsBlock.getColumn(4) instanceof DoubleColumn);
+        assertTrue(tsBlock.getColumn(5) instanceof BinaryColumn);
+        assertTrue(tsBlock.getColumn(6) instanceof RunLengthEncodedColumn);
+        assertTrue(tsBlock.getColumn(7) instanceof RunLengthEncodedColumn);
+        assertTrue(tsBlock.getColumn(8) instanceof TimeColumn);
+
+        for (int i = 0; i < tsBlock.getPositionCount(); i++, count++) {
+          assertEquals(count, tsBlock.getColumn(8).getLong(i));
+          int delta = 0;
+          if ((long) count < 200) {
+            delta = 20000;
+          } else if ((long) count < 260
+              || ((long) count >= 300 && (long) count < 380)
+              || (long) count >= 400) {
+            delta = 10000;
+          }
+          assertEquals((delta + (long) count) % 2 == 0, 
tsBlock.getColumn(0).getBoolean(i));
+          assertEquals(delta + (long) count, tsBlock.getColumn(1).getInt(i));
+          assertEquals(delta + (long) count, tsBlock.getColumn(2).getLong(i));
+          assertEquals(delta + (long) count, tsBlock.getColumn(3).getFloat(i), 
DELTA);
+          assertEquals(delta + (long) count, 
tsBlock.getColumn(4).getDouble(i), DELTA);
+          assertEquals(
+              String.valueOf(delta + (long) count), 
tsBlock.getColumn(5).getBinary(i).toString());
+          assertEquals("device0", 
tsBlock.getColumn(6).getBinary(i).toString());
+          assertEquals("attr1", tsBlock.getColumn(7).getBinary(i).toString());
+        }
+      }
+      Assert.assertEquals(500, count);
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail();
+    } finally {
+      if (operator != null) {
+        try {
+          operator.close();
+        } catch (Exception ignored) {
+        }
+      }
+      instanceNotificationExecutor.shutdown();
+    }
+  }
+}

Reply via email to