This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch ty/TableModelGrammar
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit d14e3f21fd382e70c16c56326a1860b8fd590c65
Author: JackieTien97 <[email protected]>
AuthorDate: Wed Apr 17 21:41:36 2024 +0800

    partial ok
---
 .../operator/process/FilterAndProjectOperator.java |   9 +
 .../source/relational/TableScanOperator.java       |  26 +-
 .../relational/ColumnTransformerBuilder.java       | 674 +++++++++++++++++++++
 .../queryengine/plan/analyze/PredicateUtils.java   |  12 +-
 .../plan/planner/TableOperatorGenerator.java       | 192 ++++--
 .../predicate/ConvertPredicateToFilterVisitor.java | 351 ++++++++++-
 .../ConvertPredicateToTimeFilterVisitor.java       |   2 +-
 .../predicate/PredicatePushIntoScanChecker.java    |   2 +-
 .../plan/relational/planner/node/ProjectNode.java  |   4 +
 .../multi/LogicalAndMultiColumnTransformer.java    |  64 ++
 .../multi/LogicalMultiColumnTransformer.java}      |  25 +-
 .../multi/LogicalOrMultiColumnTransformer.java     |  63 ++
 .../dag/column/multi/MultiColumnTransformer.java   |  66 ++
 .../dag/column/unary/InColumnTransformer.java      |  80 +++
 14 files changed, 1510 insertions(+), 60 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/FilterAndProjectOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/FilterAndProjectOperator.java
index f62658c68de..3c1c45036c3 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/FilterAndProjectOperator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/FilterAndProjectOperator.java
@@ -27,6 +27,7 @@ import 
org.apache.iotdb.db.queryengine.transformation.dag.column.binary.BinaryCo
 import 
org.apache.iotdb.db.queryengine.transformation.dag.column.leaf.IdentityColumnTransformer;
 import 
org.apache.iotdb.db.queryengine.transformation.dag.column.leaf.LeafColumnTransformer;
 import 
org.apache.iotdb.db.queryengine.transformation.dag.column.multi.MappableUDFColumnTransformer;
+import 
org.apache.iotdb.db.queryengine.transformation.dag.column.multi.MultiColumnTransformer;
 import 
org.apache.iotdb.db.queryengine.transformation.dag.column.ternary.TernaryColumnTransformer;
 import 
org.apache.iotdb.db.queryengine.transformation.dag.column.unary.UnaryColumnTransformer;
 import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
@@ -43,6 +44,7 @@ import com.google.common.util.concurrent.ListenableFuture;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.OptionalInt;
 
 public class FilterAndProjectOperator implements ProcessOperator {
 
@@ -357,6 +359,13 @@ public class FilterAndProjectOperator implements 
ProcessOperator {
                   ((CaseWhenThenColumnTransformer) 
columnTransformer).getElseTransformer()));
       childMaxLevel = Math.max(childMaxLevel, childCount + 2);
       return childMaxLevel;
+    } else if (columnTransformer instanceof MultiColumnTransformer) {
+      int childrenCount = ((MultiColumnTransformer) 
columnTransformer).getChildren().size();
+      OptionalInt childMaxLevel =
+          ((MultiColumnTransformer) columnTransformer)
+              
.getChildren().stream().mapToInt(this::getMaxLevelOfColumnTransformerTree).max();
+
+      return Math.max(childrenCount + 1, childMaxLevel.orElse(childrenCount + 
1));
     } else {
       throw new UnsupportedOperationException("Unsupported ColumnTransformer");
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableScanOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableScanOperator.java
index 1e26fa5d636..a8f3a6da567 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableScanOperator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableScanOperator.java
@@ -315,7 +315,7 @@ public class TableScanOperator extends 
AbstractDataSourceOperator {
   }
 
   @Override
-  protected List<TSDataType> getResultDataTypes() {
+  public List<TSDataType> getResultDataTypes() {
     List<TSDataType> resultDataTypes = new ArrayList<>(columnSchemas.size());
     for (ColumnSchema columnSchema : columnSchemas) {
       resultDataTypes.add(getTSDataType(columnSchema.getType()));
@@ -343,7 +343,22 @@ public class TableScanOperator extends 
AbstractDataSourceOperator {
   }
 
   private AlignedSeriesScanUtil constructAlignedSeriesScanUtil(DeviceEntry 
deviceEntry) {
+    AlignedPath alignedPath =
+        constructAlignedPath(deviceEntry, measurementColumnNames, 
measurementSchemas);
 
+    return new AlignedSeriesScanUtil(
+        alignedPath,
+        scanOrder,
+        seriesScanOptions,
+        operatorContext.getInstanceContext(),
+        true,
+        measurementColumnTSDataTypes);
+  }
+
+  public static AlignedPath constructAlignedPath(
+      DeviceEntry deviceEntry,
+      List<String> measurementColumnNames,
+      List<IMeasurementSchema> measurementSchemas) {
     String[] devicePath = new String[1 + 
deviceEntry.getDeviceID().segmentNum()];
     devicePath[0] = "root";
     for (int i = 1; i < devicePath.length; i++) {
@@ -353,13 +368,6 @@ public class TableScanOperator extends 
AbstractDataSourceOperator {
 
     alignedPath.setMeasurementList(measurementColumnNames);
     alignedPath.setSchemaList(measurementSchemas);
-
-    return new AlignedSeriesScanUtil(
-        alignedPath,
-        scanOrder,
-        seriesScanOptions,
-        operatorContext.getInstanceContext(),
-        true,
-        measurementColumnTSDataTypes);
+    return alignedPath;
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/relational/ColumnTransformerBuilder.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/relational/ColumnTransformerBuilder.java
new file mode 100644
index 00000000000..2685376f87a
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/relational/ColumnTransformerBuilder.java
@@ -0,0 +1,674 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.execution.relational;
+
+import org.apache.iotdb.db.queryengine.common.SessionInfo;
+import org.apache.iotdb.db.queryengine.plan.analyze.TypeProvider;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.InputLocation;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol;
+import 
org.apache.iotdb.db.queryengine.transformation.dag.column.ColumnTransformer;
+import 
org.apache.iotdb.db.queryengine.transformation.dag.column.binary.ArithmeticAdditionColumnTransformer;
+import 
org.apache.iotdb.db.queryengine.transformation.dag.column.binary.ArithmeticDivisionColumnTransformer;
+import 
org.apache.iotdb.db.queryengine.transformation.dag.column.binary.ArithmeticModuloColumnTransformer;
+import 
org.apache.iotdb.db.queryengine.transformation.dag.column.binary.ArithmeticMultiplicationColumnTransformer;
+import 
org.apache.iotdb.db.queryengine.transformation.dag.column.binary.ArithmeticSubtractionColumnTransformer;
+import 
org.apache.iotdb.db.queryengine.transformation.dag.column.binary.CompareEqualToColumnTransformer;
+import 
org.apache.iotdb.db.queryengine.transformation.dag.column.binary.CompareGreaterEqualColumnTransformer;
+import 
org.apache.iotdb.db.queryengine.transformation.dag.column.binary.CompareGreaterThanColumnTransformer;
+import 
org.apache.iotdb.db.queryengine.transformation.dag.column.binary.CompareLessEqualColumnTransformer;
+import 
org.apache.iotdb.db.queryengine.transformation.dag.column.binary.CompareLessThanColumnTransformer;
+import 
org.apache.iotdb.db.queryengine.transformation.dag.column.binary.CompareNonEqualColumnTransformer;
+import 
org.apache.iotdb.db.queryengine.transformation.dag.column.leaf.ConstantColumnTransformer;
+import 
org.apache.iotdb.db.queryengine.transformation.dag.column.leaf.IdentityColumnTransformer;
+import 
org.apache.iotdb.db.queryengine.transformation.dag.column.leaf.LeafColumnTransformer;
+import 
org.apache.iotdb.db.queryengine.transformation.dag.column.leaf.NullColumnTransformer;
+import 
org.apache.iotdb.db.queryengine.transformation.dag.column.multi.LogicalAndMultiColumnTransformer;
+import 
org.apache.iotdb.db.queryengine.transformation.dag.column.multi.LogicalOrMultiColumnTransformer;
+import 
org.apache.iotdb.db.queryengine.transformation.dag.column.unary.ArithmeticNegationColumnTransformer;
+import 
org.apache.iotdb.db.queryengine.transformation.dag.column.unary.InColumnTransformer;
+import 
org.apache.iotdb.db.queryengine.transformation.dag.column.unary.IsNullColumnTransformer;
+import 
org.apache.iotdb.db.queryengine.transformation.dag.column.unary.LogicNotColumnTransformer;
+import 
org.apache.iotdb.db.queryengine.transformation.dag.column.unary.RegularColumnTransformer;
+import org.apache.iotdb.db.relational.sql.tree.ArithmeticBinaryExpression;
+import org.apache.iotdb.db.relational.sql.tree.ArithmeticUnaryExpression;
+import org.apache.iotdb.db.relational.sql.tree.AstVisitor;
+import org.apache.iotdb.db.relational.sql.tree.BetweenPredicate;
+import org.apache.iotdb.db.relational.sql.tree.BinaryLiteral;
+import org.apache.iotdb.db.relational.sql.tree.BooleanLiteral;
+import org.apache.iotdb.db.relational.sql.tree.Cast;
+import org.apache.iotdb.db.relational.sql.tree.CoalesceExpression;
+import org.apache.iotdb.db.relational.sql.tree.ComparisonExpression;
+import org.apache.iotdb.db.relational.sql.tree.CurrentDatabase;
+import org.apache.iotdb.db.relational.sql.tree.CurrentTime;
+import org.apache.iotdb.db.relational.sql.tree.CurrentUser;
+import org.apache.iotdb.db.relational.sql.tree.DecimalLiteral;
+import org.apache.iotdb.db.relational.sql.tree.DoubleLiteral;
+import org.apache.iotdb.db.relational.sql.tree.Expression;
+import org.apache.iotdb.db.relational.sql.tree.FunctionCall;
+import org.apache.iotdb.db.relational.sql.tree.IfExpression;
+import org.apache.iotdb.db.relational.sql.tree.InListExpression;
+import org.apache.iotdb.db.relational.sql.tree.InPredicate;
+import org.apache.iotdb.db.relational.sql.tree.IsNotNullPredicate;
+import org.apache.iotdb.db.relational.sql.tree.IsNullPredicate;
+import org.apache.iotdb.db.relational.sql.tree.LikePredicate;
+import org.apache.iotdb.db.relational.sql.tree.Literal;
+import org.apache.iotdb.db.relational.sql.tree.LogicalExpression;
+import org.apache.iotdb.db.relational.sql.tree.LongLiteral;
+import org.apache.iotdb.db.relational.sql.tree.NotExpression;
+import org.apache.iotdb.db.relational.sql.tree.NullIfExpression;
+import org.apache.iotdb.db.relational.sql.tree.NullLiteral;
+import org.apache.iotdb.db.relational.sql.tree.SearchedCaseExpression;
+import org.apache.iotdb.db.relational.sql.tree.SimpleCaseExpression;
+import org.apache.iotdb.db.relational.sql.tree.StringLiteral;
+import org.apache.iotdb.db.relational.sql.tree.SymbolReference;
+import org.apache.iotdb.db.relational.sql.tree.Trim;
+import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.block.column.BinaryColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.BooleanColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.DoubleColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.LongColumn;
+import org.apache.iotdb.tsfile.read.common.type.Type;
+import org.apache.iotdb.tsfile.utils.Binary;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static org.apache.iotdb.tsfile.read.common.type.BinaryType.TEXT;
+import static org.apache.iotdb.tsfile.read.common.type.BooleanType.BOOLEAN;
+import static org.apache.iotdb.tsfile.read.common.type.DoubleType.DOUBLE;
+import static org.apache.iotdb.tsfile.read.common.type.LongType.INT64;
+import static org.apache.iotdb.tsfile.utils.RegexUtils.compileRegex;
+import static org.apache.iotdb.tsfile.utils.RegexUtils.parseLikePatternToRegex;
+
+public class ColumnTransformerBuilder
+    extends AstVisitor<ColumnTransformer, ColumnTransformerBuilder.Context> {
+
+  private static final String UNSUPPORTED_EXPRESSION = "Unsupported 
expression: %s";
+
+  @Override
+  public ColumnTransformer visitExpression(Expression expression, Context 
context) {
+    throw new IllegalArgumentException(
+        String.format(UNSUPPORTED_EXPRESSION, 
expression.getClass().getSimpleName()));
+  }
+
+  @Override
+  protected ColumnTransformer visitArithmeticBinary(
+      ArithmeticBinaryExpression node, Context context) {
+    ColumnTransformer res =
+        context.cache.computeIfAbsent(
+            node,
+            n -> {
+              if (context.hasSeen.containsKey(node)) {
+                IdentityColumnTransformer identity =
+                    new IdentityColumnTransformer(
+                        DOUBLE, context.originSize + 
context.commonTransformerList.size());
+                ColumnTransformer columnTransformer = 
context.hasSeen.get(node);
+                columnTransformer.addReferenceCount();
+                context.commonTransformerList.add(columnTransformer);
+                context.leafList.add(identity);
+                context.inputDataTypes.add(TSDataType.DOUBLE);
+                return identity;
+              } else {
+                ColumnTransformer left = process(node.getLeft(), context);
+                ColumnTransformer right = process(node.getRight(), context);
+                switch (node.getOperator()) {
+                  case ADD:
+                    return new ArithmeticAdditionColumnTransformer(DOUBLE, 
left, right);
+                  case SUBTRACT:
+                    return new ArithmeticSubtractionColumnTransformer(DOUBLE, 
left, right);
+                  case MULTIPLY:
+                    return new 
ArithmeticMultiplicationColumnTransformer(DOUBLE, left, right);
+                  case DIVIDE:
+                    return new ArithmeticDivisionColumnTransformer(DOUBLE, 
left, right);
+                  case MODULUS:
+                    return new ArithmeticModuloColumnTransformer(DOUBLE, left, 
right);
+                  default:
+                    throw new UnsupportedOperationException(
+                        String.format(UNSUPPORTED_EXPRESSION, 
node.getOperator()));
+                }
+              }
+            });
+    res.addReferenceCount();
+    return res;
+  }
+
+  @Override
+  protected ColumnTransformer visitArithmeticUnary(
+      ArithmeticUnaryExpression node, Context context) {
+    switch (node.getSign()) {
+      case PLUS:
+        return process(node.getValue(), context);
+      case MINUS:
+        ColumnTransformer res =
+            context.cache.computeIfAbsent(
+                node,
+                n -> {
+                  if (context.hasSeen.containsKey(node)) {
+                    IdentityColumnTransformer identity =
+                        new IdentityColumnTransformer(
+                            DOUBLE, context.originSize + 
context.commonTransformerList.size());
+                    ColumnTransformer columnTransformer = 
context.hasSeen.get(node);
+                    columnTransformer.addReferenceCount();
+                    context.commonTransformerList.add(columnTransformer);
+                    context.leafList.add(identity);
+                    context.inputDataTypes.add(TSDataType.DOUBLE);
+                    return identity;
+                  } else {
+                    ColumnTransformer childColumnTransformer = 
process(node.getValue(), context);
+                    return new ArithmeticNegationColumnTransformer(DOUBLE, 
childColumnTransformer);
+                  }
+                });
+        res.addReferenceCount();
+        return res;
+      default:
+        throw new UnsupportedOperationException("Unknown sign: " + 
node.getSign());
+    }
+  }
+
+  @Override
+  protected ColumnTransformer visitBetweenPredicate(BetweenPredicate node, 
Context context) {
+    throw new 
UnsupportedOperationException(String.format(UNSUPPORTED_EXPRESSION, node));
+  }
+
+  @Override
+  protected ColumnTransformer visitCast(Cast node, Context context) {
+    throw new 
UnsupportedOperationException(String.format(UNSUPPORTED_EXPRESSION, node));
+  }
+
+  @Override
+  protected ColumnTransformer visitBooleanLiteral(BooleanLiteral node, Context 
context) {
+    ColumnTransformer res =
+        context.cache.computeIfAbsent(
+            node,
+            e -> {
+              ConstantColumnTransformer columnTransformer =
+                  new ConstantColumnTransformer(
+                      BOOLEAN,
+                      new BooleanColumn(1, Optional.empty(), new boolean[] 
{node.getValue()}));
+              context.leafList.add(columnTransformer);
+              return columnTransformer;
+            });
+    res.addReferenceCount();
+    return res;
+  }
+
+  @Override
+  protected ColumnTransformer visitBinaryLiteral(BinaryLiteral node, Context 
context) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  protected ColumnTransformer visitStringLiteral(StringLiteral node, Context 
context) {
+    ColumnTransformer res =
+        context.cache.computeIfAbsent(
+            node,
+            e -> {
+              ConstantColumnTransformer columnTransformer =
+                  new ConstantColumnTransformer(
+                      TEXT,
+                      new BinaryColumn(
+                          1,
+                          Optional.empty(),
+                          new Binary[] {new Binary(node.getValue(), 
TSFileConfig.STRING_CHARSET)}));
+              context.leafList.add(columnTransformer);
+              return columnTransformer;
+            });
+    res.addReferenceCount();
+    return res;
+  }
+
+  @Override
+  protected ColumnTransformer visitLongLiteral(LongLiteral node, Context 
context) {
+    ColumnTransformer res =
+        context.cache.computeIfAbsent(
+            node,
+            e -> {
+              ConstantColumnTransformer columnTransformer =
+                  new ConstantColumnTransformer(
+                      INT64,
+                      new LongColumn(1, Optional.empty(), new long[] 
{node.getParsedValue()}));
+              context.leafList.add(columnTransformer);
+              return columnTransformer;
+            });
+    res.addReferenceCount();
+    return res;
+  }
+
+  @Override
+  protected ColumnTransformer visitDoubleLiteral(DoubleLiteral node, Context 
context) {
+    ColumnTransformer res =
+        context.cache.computeIfAbsent(
+            node,
+            e -> {
+              ConstantColumnTransformer columnTransformer =
+                  new ConstantColumnTransformer(
+                      DOUBLE,
+                      new DoubleColumn(1, Optional.empty(), new double[] 
{node.getValue()}));
+              context.leafList.add(columnTransformer);
+              return columnTransformer;
+            });
+    res.addReferenceCount();
+    return res;
+  }
+
+  @Override
+  protected ColumnTransformer visitDecimalLiteral(DecimalLiteral node, Context 
context) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  protected ColumnTransformer visitNullLiteral(NullLiteral node, Context 
context) {
+    ColumnTransformer res =
+        context.cache.computeIfAbsent(
+            node,
+            e -> {
+              NullColumnTransformer columnTransformer = new 
NullColumnTransformer();
+              context.leafList.add(columnTransformer);
+              return columnTransformer;
+            });
+    res.addReferenceCount();
+    return res;
+  }
+
+  @Override
+  protected ColumnTransformer visitComparisonExpression(
+      ComparisonExpression node, Context context) {
+    ColumnTransformer res =
+        context.cache.computeIfAbsent(
+            node,
+            n -> {
+              if (context.hasSeen.containsKey(node)) {
+                IdentityColumnTransformer identity =
+                    new IdentityColumnTransformer(
+                        BOOLEAN, context.originSize + 
context.commonTransformerList.size());
+                ColumnTransformer columnTransformer = 
context.hasSeen.get(node);
+                columnTransformer.addReferenceCount();
+                context.commonTransformerList.add(columnTransformer);
+                context.leafList.add(identity);
+                context.inputDataTypes.add(TSDataType.BOOLEAN);
+                return identity;
+              } else {
+                ColumnTransformer left = process(node.getLeft(), context);
+                ColumnTransformer right = process(node.getRight(), context);
+                switch (node.getOperator()) {
+                  case EQUAL:
+                    return new CompareEqualToColumnTransformer(BOOLEAN, left, 
right);
+                  case NOT_EQUAL:
+                    return new CompareNonEqualColumnTransformer(BOOLEAN, left, 
right);
+                  case GREATER_THAN:
+                    return new CompareGreaterThanColumnTransformer(BOOLEAN, 
left, right);
+                  case GREATER_THAN_OR_EQUAL:
+                    return new CompareGreaterEqualColumnTransformer(BOOLEAN, 
left, right);
+                  case LESS_THAN:
+                    return new CompareLessThanColumnTransformer(BOOLEAN, left, 
right);
+                  case LESS_THAN_OR_EQUAL:
+                    return new CompareLessEqualColumnTransformer(BOOLEAN, 
left, right);
+                  default:
+                    throw new UnsupportedOperationException(
+                        String.format(UNSUPPORTED_EXPRESSION, 
node.getOperator()));
+                }
+              }
+            });
+    res.addReferenceCount();
+    return res;
+  }
+
+  @Override
+  protected ColumnTransformer visitCurrentDatabase(CurrentDatabase node, 
Context context) {
+    Optional<String> currentDatabase = context.sessionInfo.getDatabaseName();
+    ColumnTransformer res;
+    res =
+        currentDatabase
+            .map(
+                s ->
+                    context.cache.computeIfAbsent(
+                        node,
+                        e -> {
+                          ConstantColumnTransformer columnTransformer =
+                              new ConstantColumnTransformer(
+                                  TEXT,
+                                  new BinaryColumn(
+                                      1,
+                                      Optional.empty(),
+                                      new Binary[] {new Binary(s, 
TSFileConfig.STRING_CHARSET)}));
+                          context.leafList.add(columnTransformer);
+                          return columnTransformer;
+                        }))
+            .orElseGet(
+                () ->
+                    context.cache.computeIfAbsent(
+                        node,
+                        e -> {
+                          NullColumnTransformer columnTransformer = new 
NullColumnTransformer();
+                          context.leafList.add(columnTransformer);
+                          return columnTransformer;
+                        }));
+    res.addReferenceCount();
+    return res;
+  }
+
+  @Override
+  protected ColumnTransformer visitCurrentTime(CurrentTime node, Context 
context) {
+    throw new 
UnsupportedOperationException(String.format(UNSUPPORTED_EXPRESSION, node));
+  }
+
+  @Override
+  protected ColumnTransformer visitCurrentUser(CurrentUser node, Context 
context) {
+    String currentUser = context.sessionInfo.getUserName();
+    ColumnTransformer res =
+        context.cache.computeIfAbsent(
+            node,
+            e -> {
+              ConstantColumnTransformer columnTransformer =
+                  new ConstantColumnTransformer(
+                      TEXT,
+                      new BinaryColumn(
+                          1,
+                          Optional.empty(),
+                          new Binary[] {new Binary(currentUser, 
TSFileConfig.STRING_CHARSET)}));
+              context.leafList.add(columnTransformer);
+              return columnTransformer;
+            });
+    res.addReferenceCount();
+    return res;
+  }
+
+  @Override
+  protected ColumnTransformer visitFunctionCall(FunctionCall node, Context 
context) {
+    throw new 
UnsupportedOperationException(String.format(UNSUPPORTED_EXPRESSION, node));
+  }
+
+  @Override
+  protected ColumnTransformer visitInPredicate(InPredicate node, Context 
context) {
+    ColumnTransformer res =
+        context.cache.computeIfAbsent(
+            node,
+            n -> {
+              if (context.hasSeen.containsKey(node)) {
+                IdentityColumnTransformer identity =
+                    new IdentityColumnTransformer(
+                        BOOLEAN, context.originSize + 
context.commonTransformerList.size());
+                ColumnTransformer columnTransformer = 
context.hasSeen.get(node);
+                columnTransformer.addReferenceCount();
+                context.commonTransformerList.add(columnTransformer);
+                context.leafList.add(identity);
+                context.inputDataTypes.add(TSDataType.BOOLEAN);
+                return identity;
+              } else {
+                ColumnTransformer childColumnTransformer = 
process(node.getValue(), context);
+                InListExpression inListExpression = (InListExpression) 
node.getValueList();
+                List<Expression> expressionList = inListExpression.getValues();
+                List<Literal> values = new ArrayList<>();
+                for (Expression expression : expressionList) {
+                  checkArgument(expression instanceof Literal);
+                  values.add((Literal) expression);
+                }
+                return new InColumnTransformer(BOOLEAN, 
childColumnTransformer, values);
+              }
+            });
+    res.addReferenceCount();
+    return res;
+  }
+
+  @Override
+  protected ColumnTransformer visitNotExpression(NotExpression node, Context 
context) {
+    ColumnTransformer res =
+        context.cache.computeIfAbsent(
+            node,
+            n -> {
+              if (context.hasSeen.containsKey(node)) {
+                IdentityColumnTransformer identity =
+                    new IdentityColumnTransformer(
+                        BOOLEAN, context.originSize + 
context.commonTransformerList.size());
+                ColumnTransformer columnTransformer = 
context.hasSeen.get(node);
+                columnTransformer.addReferenceCount();
+                context.commonTransformerList.add(columnTransformer);
+                context.leafList.add(identity);
+                context.inputDataTypes.add(TSDataType.BOOLEAN);
+                return identity;
+              } else {
+                ColumnTransformer childColumnTransformer = 
process(node.getValue(), context);
+                return new LogicNotColumnTransformer(BOOLEAN, 
childColumnTransformer);
+              }
+            });
+    res.addReferenceCount();
+    return res;
+  }
+
+  @Override
+  protected ColumnTransformer visitLikePredicate(LikePredicate node, Context 
context) {
+    ColumnTransformer res =
+        context.cache.computeIfAbsent(
+            node,
+            n -> {
+              if (context.hasSeen.containsKey(node)) {
+                IdentityColumnTransformer identity =
+                    new IdentityColumnTransformer(
+                        BOOLEAN, context.originSize + 
context.commonTransformerList.size());
+                ColumnTransformer columnTransformer = 
context.hasSeen.get(node);
+                columnTransformer.addReferenceCount();
+                context.commonTransformerList.add(columnTransformer);
+                context.leafList.add(identity);
+                context.inputDataTypes.add(TSDataType.BOOLEAN);
+                return identity;
+              } else {
+                ColumnTransformer childColumnTransformer = 
process(node.getValue(), context);
+                return new RegularColumnTransformer(
+                    BOOLEAN,
+                    childColumnTransformer,
+                    compileRegex(
+                        parseLikePatternToRegex(((StringLiteral) 
node.getPattern()).getValue())));
+              }
+            });
+    res.addReferenceCount();
+    return res;
+  }
+
+  @Override
+  protected ColumnTransformer visitIsNotNullPredicate(IsNotNullPredicate node, 
Context context) {
+    ColumnTransformer res =
+        context.cache.computeIfAbsent(
+            node,
+            n -> {
+              if (context.hasSeen.containsKey(node)) {
+                IdentityColumnTransformer identity =
+                    new IdentityColumnTransformer(
+                        BOOLEAN, context.originSize + 
context.commonTransformerList.size());
+                ColumnTransformer columnTransformer = 
context.hasSeen.get(node);
+                columnTransformer.addReferenceCount();
+                context.commonTransformerList.add(columnTransformer);
+                context.leafList.add(identity);
+                context.inputDataTypes.add(TSDataType.BOOLEAN);
+                return identity;
+              } else {
+                ColumnTransformer childColumnTransformer = 
process(node.getValue(), context);
+                return new IsNullColumnTransformer(BOOLEAN, 
childColumnTransformer, true);
+              }
+            });
+    res.addReferenceCount();
+    return res;
+  }
+
+  @Override
+  protected ColumnTransformer visitIsNullPredicate(IsNullPredicate node, 
Context context) {
+    ColumnTransformer res =
+        context.cache.computeIfAbsent(
+            node,
+            n -> {
+              if (context.hasSeen.containsKey(node)) {
+                IdentityColumnTransformer identity =
+                    new IdentityColumnTransformer(
+                        BOOLEAN, context.originSize + 
context.commonTransformerList.size());
+                ColumnTransformer columnTransformer = 
context.hasSeen.get(node);
+                columnTransformer.addReferenceCount();
+                context.commonTransformerList.add(columnTransformer);
+                context.leafList.add(identity);
+                context.inputDataTypes.add(TSDataType.BOOLEAN);
+                return identity;
+              } else {
+                ColumnTransformer childColumnTransformer = 
process(node.getValue(), context);
+                return new IsNullColumnTransformer(BOOLEAN, 
childColumnTransformer, false);
+              }
+            });
+    res.addReferenceCount();
+    return res;
+  }
+
+  @Override
+  protected ColumnTransformer visitLogicalExpression(LogicalExpression node, 
Context context) {
+    ColumnTransformer res =
+        context.cache.computeIfAbsent(
+            node,
+            n -> {
+              if (context.hasSeen.containsKey(node)) {
+                IdentityColumnTransformer identity =
+                    new IdentityColumnTransformer(
+                        BOOLEAN, context.originSize + 
context.commonTransformerList.size());
+                ColumnTransformer columnTransformer = 
context.hasSeen.get(node);
+                columnTransformer.addReferenceCount();
+                context.commonTransformerList.add(columnTransformer);
+                context.leafList.add(identity);
+                context.inputDataTypes.add(TSDataType.BOOLEAN);
+                return identity;
+              } else {
+                List<ColumnTransformer> children =
+                    node.getChildren().stream()
+                        .map(c -> process(c, context))
+                        .collect(Collectors.toList());
+                switch (node.getOperator()) {
+                  case OR:
+                    return new LogicalOrMultiColumnTransformer(BOOLEAN, 
children);
+                  case AND:
+                    return new LogicalAndMultiColumnTransformer(BOOLEAN, 
children);
+                  default:
+                    throw new UnsupportedOperationException(
+                        String.format(UNSUPPORTED_EXPRESSION, 
node.getOperator()));
+                }
+              }
+            });
+    res.addReferenceCount();
+    return res;
+  }
+
+  @Override
+  protected ColumnTransformer visitSymbolReference(SymbolReference node, 
Context context) {
+    ColumnTransformer res =
+        context.cache.computeIfAbsent(
+            node,
+            e -> {
+              IdentityColumnTransformer identity =
+                  new IdentityColumnTransformer(
+                      context.getType(node),
+                      
context.inputLocations.get(Symbol.from(node)).get(0).getValueColumnIndex());
+              context.leafList.add(identity);
+              return identity;
+            });
+    res.addReferenceCount();
+    return res;
+  }
+
+  @Override
+  protected ColumnTransformer visitSimpleCaseExpression(
+      SimpleCaseExpression node, Context context) {
+    throw new 
UnsupportedOperationException(String.format(UNSUPPORTED_EXPRESSION, node));
+  }
+
+  @Override
+  protected ColumnTransformer visitSearchedCaseExpression(
+      SearchedCaseExpression node, Context context) {
+    throw new 
UnsupportedOperationException(String.format(UNSUPPORTED_EXPRESSION, node));
+  }
+
+  @Override
+  protected ColumnTransformer visitTrim(Trim node, Context context) {
+    throw new 
UnsupportedOperationException(String.format(UNSUPPORTED_EXPRESSION, node));
+  }
+
+  @Override
+  protected ColumnTransformer visitIfExpression(IfExpression node, Context 
context) {
+    throw new 
UnsupportedOperationException(String.format(UNSUPPORTED_EXPRESSION, node));
+  }
+
+  @Override
+  protected ColumnTransformer visitNullIfExpression(NullIfExpression node, 
Context context) {
+    throw new 
UnsupportedOperationException(String.format(UNSUPPORTED_EXPRESSION, node));
+  }
+
+  @Override
+  protected ColumnTransformer visitCoalesceExpression(CoalesceExpression node, 
Context context) {
+    throw new 
UnsupportedOperationException(String.format(UNSUPPORTED_EXPRESSION, node));
+  }
+
+  public static class Context {
+
+    private final SessionInfo sessionInfo;
+
+    // LeafColumnTransformer for LeafOperand
+    private final List<LeafColumnTransformer> leafList;
+
+    // Index of input column
+    private final Map<Symbol, List<InputLocation>> inputLocations;
+
+    // cache for constructing ColumnTransformer tree
+    private final Map<Expression, ColumnTransformer> cache;
+
+    // Sub expressions that has been seen in filter
+    private final Map<Expression, ColumnTransformer> hasSeen;
+
+    // Common Transformer between filter and project
+    private final List<ColumnTransformer> commonTransformerList;
+
+    private final List<TSDataType> inputDataTypes;
+
+    private final int originSize;
+
+    private final TypeProvider typeProvider;
+
+    public Context(
+        SessionInfo sessionInfo,
+        List<LeafColumnTransformer> leafList,
+        Map<Symbol, List<InputLocation>> inputLocations,
+        Map<Expression, ColumnTransformer> cache,
+        Map<Expression, ColumnTransformer> hasSeen,
+        List<ColumnTransformer> commonTransformerList,
+        List<TSDataType> inputDataTypes,
+        int originSize,
+        TypeProvider typeProvider) {
+      this.sessionInfo = sessionInfo;
+      this.leafList = leafList;
+      this.inputLocations = inputLocations;
+      this.cache = cache;
+      this.hasSeen = hasSeen;
+      this.commonTransformerList = commonTransformerList;
+      this.inputDataTypes = inputDataTypes;
+      this.originSize = originSize;
+      this.typeProvider = typeProvider;
+    }
+
+    public Type getType(SymbolReference symbolReference) {
+      return typeProvider.get(Symbol.from(symbolReference));
+    }
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/PredicateUtils.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/PredicateUtils.java
index d635f074789..dc2d8653b2c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/PredicateUtils.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/PredicateUtils.java
@@ -40,6 +40,8 @@ import 
org.apache.iotdb.db.queryengine.plan.expression.visitor.predicate.Convert
 import 
org.apache.iotdb.db.queryengine.plan.expression.visitor.predicate.PredicatePushIntoScanChecker;
 import 
org.apache.iotdb.db.queryengine.plan.expression.visitor.predicate.PredicateSimplifier;
 import 
org.apache.iotdb.db.queryengine.plan.expression.visitor.predicate.ReversePredicateVisitor;
+import org.apache.iotdb.db.queryengine.plan.relational.metadata.ColumnSchema;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 import org.apache.iotdb.tsfile.utils.Pair;
@@ -48,6 +50,7 @@ import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
 
@@ -293,14 +296,15 @@ public class PredicateUtils {
   public static Filter convertPredicateToFilter(
       org.apache.iotdb.db.relational.sql.tree.Expression predicate,
       List<String> allMeasurements,
-      TypeProvider typeProvider) {
+      Map<Symbol, ColumnSchema> schemaMap) {
     if (predicate == null) {
       return null;
     }
     return predicate.accept(
-        new ConvertPredicateToFilterVisitor(),
-        new ConvertPredicateToFilterVisitor.Context(
-            allMeasurements, isBuildPlanUseTemplate, typeProvider));
+        new org.apache.iotdb.db.queryengine.plan.relational.analyzer.predicate
+            .ConvertPredicateToFilterVisitor(),
+        new org.apache.iotdb.db.queryengine.plan.relational.analyzer.predicate
+            .ConvertPredicateToFilterVisitor.Context(allMeasurements, 
schemaMap));
   }
 
   /**
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
index f5e22768ce5..273e949b37f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
@@ -23,11 +23,15 @@ import org.apache.iotdb.commons.path.AlignedPath;
 import org.apache.iotdb.db.queryengine.execution.driver.DataDriverContext;
 import org.apache.iotdb.db.queryengine.execution.operator.Operator;
 import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
+import 
org.apache.iotdb.db.queryengine.execution.operator.process.FilterAndProjectOperator;
 import 
org.apache.iotdb.db.queryengine.execution.operator.source.AlignedSeriesScanOperator;
 import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.TableScanOperator;
+import 
org.apache.iotdb.db.queryengine.execution.relational.ColumnTransformerBuilder;
 import org.apache.iotdb.db.queryengine.plan.analyze.PredicateUtils;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.InputLocation;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.SeriesScanOptions;
 import org.apache.iotdb.db.queryengine.plan.relational.metadata.ColumnSchema;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol;
@@ -40,6 +44,8 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.ProjectNode;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.node.SortNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableScanNode;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TopKNode;
+import 
org.apache.iotdb.db.queryengine.transformation.dag.column.ColumnTransformer;
+import 
org.apache.iotdb.db.queryengine.transformation.dag.column.leaf.LeafColumnTransformer;
 import org.apache.iotdb.db.relational.sql.tree.Expression;
 import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -47,14 +53,21 @@ import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 
 import static java.util.Objects.requireNonNull;
+import static 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.TableScanOperator.constructAlignedPath;
 import static 
org.apache.iotdb.db.queryengine.plan.analyze.PredicateUtils.convertPredicateToFilter;
+import static 
org.apache.iotdb.db.queryengine.plan.expression.leaf.TimestampOperand.TIMESTAMP_EXPRESSION_STRING;
 import static 
org.apache.iotdb.db.queryengine.plan.relational.type.InternalTypeManager.getTSDataType;
 
 /** This Visitor is responsible for transferring Table PlanNode Tree to Table 
Operator Tree. */
@@ -102,7 +115,7 @@ public class TableOperatorGenerator extends 
PlanVisitor<Operator, LocalExecution
       }
     }
 
-    SeriesScanOptions.Builder scanOptionsBuilder = 
getSeriesScanOptionsBuilder(node, context);
+    SeriesScanOptions.Builder scanOptionsBuilder = 
getSeriesScanOptionsBuilder(context);
     scanOptionsBuilder.withPushDownLimit(node.getPushDownLimit());
     scanOptionsBuilder.withPushDownOffset(node.getPushDownOffset());
     scanOptionsBuilder.withAllSensors(new HashSet<>(measurementColumnNames));
@@ -111,11 +124,7 @@ public class TableOperatorGenerator extends 
PlanVisitor<Operator, LocalExecution
     boolean predicateCanPushIntoScan = canPushIntoScan(pushDownPredicate);
     if (pushDownPredicate != null && predicateCanPushIntoScan) {
       scanOptionsBuilder.withPushDownFilter(
-          convertPredicateToFilter(
-              pushDownPredicate,
-              node.getAlignedPath().getMeasurementList(),
-              context.getTypeProvider().getTemplatedInfo() != null,
-              context.getTypeProvider()));
+          convertPredicateToFilter(pushDownPredicate, measurementColumnNames, 
columnSchemaMap));
     }
 
     OperatorContext operatorContext =
@@ -138,58 +147,63 @@ public class TableOperatorGenerator extends 
PlanVisitor<Operator, LocalExecution
         new TableScanOperator(
             operatorContext,
             node.getPlanNodeId(),
-            seriesPath,
+            columnSchemas,
+            columnsIndexArray,
+            measurementColumnCount,
+            node.getDeviceEntries(),
             node.getScanOrder(),
             scanOptionsBuilder.build(),
-            node.isQueryAllSensors(),
-            context.getTypeProvider().getTemplatedInfo() != null
-                ? context.getTypeProvider().getTemplatedInfo().getDataTypes()
-                : null,
+            measurementColumnNames,
+            measurementSchemas,
             maxTsBlockLineNum);
 
-    ((DataDriverContext) 
context.getDriverContext()).addSourceOperator(seriesScanOperator);
-    ((DataDriverContext) context.getDriverContext()).addPath(seriesPath);
+    ((DataDriverContext) 
context.getDriverContext()).addSourceOperator(tableScanOperator);
+
+    for (int i = 0, size = node.getDeviceEntries().size(); i < size; i++) {
+      AlignedPath alignedPath =
+          constructAlignedPath(
+              node.getDeviceEntries().get(i), measurementColumnNames, 
measurementSchemas);
+      ((DataDriverContext) context.getDriverContext()).addPath(alignedPath);
+    }
+
     context.getDriverContext().setInputDriver(true);
 
     if (!predicateCanPushIntoScan) {
-      if (context.isBuildPlanUseTemplate()) {
-        TemplatedInfo templatedInfo = context.getTemplatedInfo();
-        return constructFilterOperator(
-            pushDownPredicate,
-            seriesScanOperator,
-            templatedInfo.getProjectExpressions(),
-            templatedInfo.getDataTypes(),
-            templatedInfo.getLayoutMap(),
-            templatedInfo.isKeepNull(),
-            node.getPlanNodeId(),
-            templatedInfo.getScanOrder(),
-            context);
-      }
-
-      AlignedPath alignedPath = node.getAlignedPath();
-      List<Expression> expressions = new ArrayList<>();
-      List<TSDataType> dataTypes = new ArrayList<>();
-      for (int i = 0; i < alignedPath.getMeasurementList().size(); i++) {
-        
expressions.add(ExpressionFactory.timeSeries(alignedPath.getSubMeasurementPath(i)));
-        dataTypes.add(alignedPath.getSubMeasurementDataType(i));
-      }
 
       return constructFilterOperator(
           pushDownPredicate,
-          seriesScanOperator,
-          expressions.toArray(new Expression[0]),
-          dataTypes,
+          tableScanOperator,
+          node.getOutputSymbols().stream()
+              .map(Symbol::toSymbolReference)
+              .toArray(Expression[]::new),
+          tableScanOperator.getResultDataTypes(),
           makeLayout(Collections.singletonList(node)),
-          false,
           node.getPlanNodeId(),
-          node.getScanOrder(),
           context);
     }
-    return seriesScanOperator;
+    return tableScanOperator;
   }
 
-  private SeriesScanOptions.Builder getSeriesScanOptionsBuilder(
-      TableScanNode node, LocalExecutionPlanContext context) {
+  private Map<Symbol, List<InputLocation>> makeLayout(List<PlanNode> children) 
{
+    Map<Symbol, List<InputLocation>> outputMappings = new LinkedHashMap<>();
+    int tsBlockIndex = 0;
+    for (PlanNode childNode : children) {
+      outputMappings
+          .computeIfAbsent(new Symbol(TIMESTAMP_EXPRESSION_STRING), key -> new 
ArrayList<>())
+          .add(new InputLocation(tsBlockIndex, -1));
+      int valueColumnIndex = 0;
+      for (Symbol columnName : childNode.getOutputSymbols()) {
+        outputMappings
+            .computeIfAbsent(columnName, key -> new ArrayList<>())
+            .add(new InputLocation(tsBlockIndex, valueColumnIndex));
+        valueColumnIndex++;
+      }
+      tsBlockIndex++;
+    }
+    return outputMappings;
+  }
+
+  private SeriesScanOptions.Builder 
getSeriesScanOptionsBuilder(LocalExecutionPlanContext context) {
     SeriesScanOptions.Builder scanOptionsBuilder = new 
SeriesScanOptions.Builder();
 
     Filter globalTimeFilter = context.getGlobalTimeFilter();
@@ -210,8 +224,102 @@ public class TableOperatorGenerator extends 
PlanVisitor<Operator, LocalExecution
     return super.visitFilter(node, context);
   }
 
+  private Operator constructFilterOperator(
+      Expression predicate,
+      Operator inputOperator,
+      Expression[] projectExpressions,
+      List<TSDataType> inputDataTypes,
+      Map<Symbol, List<InputLocation>> inputLocations,
+      PlanNodeId planNodeId,
+      LocalExecutionPlanContext context) {
+
+    final List<TSDataType> filterOutputDataTypes = new 
ArrayList<>(inputDataTypes);
+
+    // records LeafColumnTransformer of filter
+    List<LeafColumnTransformer> filterLeafColumnTransformerList = new 
ArrayList<>();
+
+    // records common ColumnTransformer between filter and project expressions
+    List<ColumnTransformer> commonTransformerList = new ArrayList<>();
+
+    // records LeafColumnTransformer of project expressions
+    List<LeafColumnTransformer> projectLeafColumnTransformerList = new 
ArrayList<>();
+
+    // records subexpression -> ColumnTransformer for filter
+    Map<Expression, ColumnTransformer> filterExpressionColumnTransformerMap = 
new HashMap<>();
+
+    ColumnTransformerBuilder visitor = new ColumnTransformerBuilder();
+
+    ColumnTransformerBuilder.Context filterColumnTransformerContext =
+        new ColumnTransformerBuilder.Context(
+            
context.getDriverContext().getFragmentInstanceContext().getSessionInfo(),
+            filterLeafColumnTransformerList,
+            inputLocations,
+            filterExpressionColumnTransformerMap,
+            ImmutableMap.of(),
+            ImmutableList.of(),
+            ImmutableList.of(),
+            0,
+            context.getTypeProvider());
+
+    ColumnTransformer filterOutputTransformer =
+        visitor.process(predicate, filterColumnTransformerContext);
+
+    List<ColumnTransformer> projectOutputTransformerList = new ArrayList<>();
+
+    Map<Expression, ColumnTransformer> projectExpressionColumnTransformerMap = 
new HashMap<>();
+
+    ColumnTransformerBuilder.Context projectColumnTransformerContext =
+        new ColumnTransformerBuilder.Context(
+            
context.getDriverContext().getFragmentInstanceContext().getSessionInfo(),
+            projectLeafColumnTransformerList,
+            inputLocations,
+            projectExpressionColumnTransformerMap,
+            filterExpressionColumnTransformerMap,
+            commonTransformerList,
+            filterOutputDataTypes,
+            inputLocations.size() - 1,
+            context.getTypeProvider());
+
+    for (Expression expression : projectExpressions) {
+      projectOutputTransformerList.add(
+          visitor.process(expression, projectColumnTransformerContext));
+    }
+
+    final OperatorContext operatorContext =
+        context
+            .getDriverContext()
+            .addOperatorContext(
+                context.getNextOperatorId(),
+                planNodeId,
+                FilterAndProjectOperator.class.getSimpleName());
+
+    // Project expressions don't contain Non-Mappable UDF, TransformOperator 
is not needed
+    return new FilterAndProjectOperator(
+        operatorContext,
+        inputOperator,
+        filterOutputDataTypes,
+        filterLeafColumnTransformerList,
+        filterOutputTransformer,
+        commonTransformerList,
+        projectLeafColumnTransformerList,
+        projectOutputTransformerList,
+        false,
+        true);
+  }
+
   @Override
   public Operator visitProject(ProjectNode node, LocalExecutionPlanContext 
context) {
+    if (node.getChild() instanceof FilterNode) {
+      FilterNode filterNode = (FilterNode) node.getChild();
+      return constructFilterOperator(
+          filterNode.getPredicate(),
+          filterNode.getChild().accept(this, context),
+          node.getAssignments().getExpressions().toArray(new Expression[0]),
+          tableScanOperator.getResultDataTypes(),
+          makeLayout(Collections.singletonList(filterNode.getChild())),
+          node.getPlanNodeId(),
+          context);
+    }
     return super.visitProject(node, context);
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/predicate/ConvertPredicateToFilterVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/predicate/ConvertPredicateToFilterVisitor.java
index 7ab2448ded1..32cc28b9637 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/predicate/ConvertPredicateToFilterVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/predicate/ConvertPredicateToFilterVisitor.java
@@ -19,4 +19,353 @@
 
 package org.apache.iotdb.db.queryengine.plan.relational.analyzer.predicate;
 
-public class ConvertPredicateToFilterVisitor {}
+import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
+import org.apache.iotdb.db.queryengine.plan.relational.metadata.ColumnSchema;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol;
+import org.apache.iotdb.db.relational.sql.tree.BetweenPredicate;
+import org.apache.iotdb.db.relational.sql.tree.BooleanLiteral;
+import org.apache.iotdb.db.relational.sql.tree.ComparisonExpression;
+import org.apache.iotdb.db.relational.sql.tree.DoubleLiteral;
+import org.apache.iotdb.db.relational.sql.tree.Expression;
+import org.apache.iotdb.db.relational.sql.tree.IfExpression;
+import org.apache.iotdb.db.relational.sql.tree.InListExpression;
+import org.apache.iotdb.db.relational.sql.tree.InPredicate;
+import org.apache.iotdb.db.relational.sql.tree.IsNotNullPredicate;
+import org.apache.iotdb.db.relational.sql.tree.IsNullPredicate;
+import org.apache.iotdb.db.relational.sql.tree.LikePredicate;
+import org.apache.iotdb.db.relational.sql.tree.Literal;
+import org.apache.iotdb.db.relational.sql.tree.LogicalExpression;
+import org.apache.iotdb.db.relational.sql.tree.NotExpression;
+import org.apache.iotdb.db.relational.sql.tree.NullIfExpression;
+import org.apache.iotdb.db.relational.sql.tree.SearchedCaseExpression;
+import org.apache.iotdb.db.relational.sql.tree.SimpleCaseExpression;
+import org.apache.iotdb.db.relational.sql.tree.StringLiteral;
+import org.apache.iotdb.db.relational.sql.tree.SymbolReference;
+import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
+import org.apache.iotdb.tsfile.read.common.type.Type;
+import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+import org.apache.iotdb.tsfile.read.filter.factory.FilterFactory;
+import org.apache.iotdb.tsfile.read.filter.factory.ValueFilterApi;
+import org.apache.iotdb.tsfile.read.filter.operator.ValueFilterOperators;
+import org.apache.iotdb.tsfile.utils.Binary;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.analyzer.predicate.ConvertPredicateToTimeFilterVisitor.getLongValue;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.analyzer.predicate.ConvertPredicateToTimeFilterVisitor.isTimeColumn;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.analyzer.predicate.PredicatePushIntoScanChecker.isLiteral;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.analyzer.predicate.PredicatePushIntoScanChecker.isSymbolReference;
+
+public class ConvertPredicateToFilterVisitor
+    extends PredicateVisitor<Filter, ConvertPredicateToFilterVisitor.Context> {
+
+  private static final ConvertPredicateToTimeFilterVisitor TIME_FILTER_VISITOR 
=
+      new ConvertPredicateToTimeFilterVisitor();
+
+  @Override
+  protected Filter visitInPredicate(InPredicate node, Context context) {
+    Expression operand = node.getValue();
+    if (isTimeColumn(operand)) {
+      return TIME_FILTER_VISITOR.process(node, null);
+    }
+
+    checkArgument(isSymbolReference(operand));
+
+    Expression valueList = node.getValueList();
+    checkArgument(valueList instanceof InListExpression);
+    List<Expression> values = ((InListExpression) valueList).getValues();
+    for (Expression value : values) {
+      checkArgument(value instanceof Literal);
+    }
+
+    if (values.size() == 1) {
+      return constructCompareFilter(
+          ComparisonExpression.Operator.EQUAL,
+          (SymbolReference) operand,
+          (Literal) values.get(0),
+          context);
+    }
+
+    return constructInFilter(
+        (SymbolReference) operand,
+        values.stream().map(v -> (Literal) v).collect(Collectors.toList()),
+        context);
+  }
+
+  private <T extends Comparable<T>> ValueFilterOperators.ValueIn<T> 
constructInFilter(
+      SymbolReference operand, List<Literal> values, Context context) {
+    int measurementIndex = context.getMeasurementIndex((operand).getName());
+    Set<T> inSet = constructInSet(values, 
context.getType(Symbol.from(operand)));
+    return ValueFilterApi.in(measurementIndex, inSet);
+  }
+
+  private <T extends Comparable<T>> Set<T> constructInSet(List<Literal> 
literals, Type dataType) {
+    Set<T> values = new HashSet<>();
+    for (Literal literal : literals) {
+      values.add(getValue(literal, dataType));
+    }
+    return values;
+  }
+
+  public static <T extends Comparable<T>> Filter constructCompareFilter(
+      ComparisonExpression.Operator operator,
+      SymbolReference symbolReference,
+      Literal literal,
+      Context context) {
+
+    if (!context.isMeasurementColumn(symbolReference)) {
+      throw new IllegalStateException(
+          String.format("Only support measurement column in filter: %s", 
symbolReference));
+    }
+
+    int measurementIndex = 
context.getMeasurementIndex(symbolReference.getName());
+    Type type = context.getType(Symbol.from(symbolReference));
+
+    T value = getValue(literal, type);
+
+    switch (operator) {
+      case EQUAL:
+        return ValueFilterApi.eq(measurementIndex, value);
+      case NOT_EQUAL:
+        return ValueFilterApi.notEq(measurementIndex, value);
+      case GREATER_THAN:
+        return ValueFilterApi.gt(measurementIndex, value);
+      case GREATER_THAN_OR_EQUAL:
+        return ValueFilterApi.gtEq(measurementIndex, value);
+      case LESS_THAN:
+        return ValueFilterApi.lt(measurementIndex, value);
+      case LESS_THAN_OR_EQUAL:
+        return ValueFilterApi.ltEq(measurementIndex, value);
+      default:
+        throw new IllegalArgumentException(
+            String.format("Unsupported comparison operator %s", operator));
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  public static <T extends Comparable<T>> T getValue(Literal value, Type 
dataType) {
+    try {
+      switch (dataType.getTypeEnum()) {
+        case INT32:
+          return (T) 
Integer.valueOf(Long.valueOf(getLongValue(value)).intValue());
+        case INT64:
+          return (T) Long.valueOf(getLongValue(value));
+        case FLOAT:
+          return (T) Float.valueOf((float) getDoubleValue(value));
+        case DOUBLE:
+          return (T) Double.valueOf(getDoubleValue(value));
+        case BOOLEAN:
+          return (T) Boolean.valueOf(getBooleanValue(value));
+        case TEXT:
+          return (T) new Binary(getStringValue(value), 
TSFileConfig.STRING_CHARSET);
+        default:
+          throw new UnsupportedOperationException(
+              String.format("Unsupported data type %s", dataType));
+      }
+    } catch (NumberFormatException e) {
+      throw new IllegalArgumentException(
+          String.format("\"%s\" cannot be cast to [%s]", value, dataType));
+    }
+  }
+
+  @Override
+  protected Filter visitIsNullPredicate(IsNullPredicate node, Context context) 
{
+    throw new IllegalArgumentException("IS NULL cannot be pushed down");
+  }
+
+  @Override
+  protected Filter visitIsNotNullPredicate(IsNotNullPredicate node, Context 
context) {
+    checkArgument(isSymbolReference(node.getValue()));
+    SymbolReference operand = (SymbolReference) node.getValue();
+    checkArgument(context.isMeasurementColumn(operand));
+    int measurementIndex = context.getMeasurementIndex(operand.getName());
+    return ValueFilterApi.isNotNull(measurementIndex);
+  }
+
+  @Override
+  protected Filter visitLikePredicate(LikePredicate node, Context context) {
+    checkArgument(isSymbolReference(node.getValue()));
+    SymbolReference operand = (SymbolReference) node.getValue();
+    checkArgument(context.isMeasurementColumn(operand));
+    int measurementIndex = context.getMeasurementIndex(operand.getName());
+    Expression pattern = node.getPattern();
+    return ValueFilterApi.like(measurementIndex, getStringValue(pattern));
+  }
+
+  @Override
+  protected Filter visitLogicalExpression(LogicalExpression node, Context 
context) {
+    switch (node.getOperator()) {
+      case OR:
+        return FilterFactory.or(
+            node.getTerms().stream().map(n -> process(n, 
context)).collect(Collectors.toList()));
+      case AND:
+        return FilterFactory.and(
+            node.getTerms().stream().map(n -> process(n, 
context)).collect(Collectors.toList()));
+      default:
+        throw new IllegalArgumentException(
+            String.format("Unsupported logical operator %s", 
node.getOperator()));
+    }
+  }
+
+  @Override
+  protected Filter visitNotExpression(NotExpression node, Context context) {
+    return FilterFactory.not(process(node.getValue(), context));
+  }
+
+  @Override
+  protected Filter visitComparisonExpression(ComparisonExpression node, 
Context context) {
+    if (isTimeColumn(node.getLeft()) || isTimeColumn(node.getRight())) {
+      return TIME_FILTER_VISITOR.process(node, null);
+    }
+
+    Expression left = node.getLeft();
+    Expression right = node.getRight();
+
+    if (isSymbolReference(left)
+        && context.isMeasurementColumn((SymbolReference) left)
+        && isLiteral(right)) {
+      return constructCompareFilter(
+          node.getOperator(), (SymbolReference) left, (Literal) right, 
context);
+    } else if (isLiteral(left)
+        && isSymbolReference(right)
+        && context.isMeasurementColumn((SymbolReference) right)) {
+      return constructCompareFilter(
+          node.getOperator(), (SymbolReference) right, (Literal) left, 
context);
+    } else {
+      throw new IllegalStateException(
+          String.format("%s is not supported in value push down", node));
+    }
+  }
+
+  @Override
+  protected Filter visitSimpleCaseExpression(SimpleCaseExpression node, 
Context context) {
+    throw new UnsupportedOperationException("Filter push down does not support 
CASE WHEN");
+  }
+
+  @Override
+  protected Filter visitSearchedCaseExpression(SearchedCaseExpression node, 
Context context) {
+    throw new UnsupportedOperationException("Filter push down does not support 
CASE WHEN");
+  }
+
+  @Override
+  protected Filter visitIfExpression(IfExpression node, Context context) {
+    throw new UnsupportedOperationException("Filter push down does not support 
IF");
+  }
+
+  @Override
+  protected Filter visitNullIfExpression(NullIfExpression node, Context 
context) {
+    throw new UnsupportedOperationException("Filter push down does not support 
NULLIF");
+  }
+
+  @Override
+  protected Filter visitBetweenPredicate(BetweenPredicate node, Context 
context) {
+    Expression firstExpression = node.getValue();
+    Expression secondExpression = node.getMin();
+    Expression thirdExpression = node.getMax();
+
+    if (isTimeColumn(firstExpression)
+        || isTimeColumn(secondExpression)
+        || isTimeColumn(thirdExpression)) {
+      return TIME_FILTER_VISITOR.process(node, null);
+    }
+
+    if (isSymbolReference(firstExpression)
+        && context.isMeasurementColumn((SymbolReference) firstExpression)) {
+      return constructBetweenFilter(
+          (SymbolReference) firstExpression, secondExpression, 
thirdExpression, context);
+    } else if (isSymbolReference(secondExpression)
+        && context.isMeasurementColumn((SymbolReference) secondExpression)) {
+      checkArgument(isLiteral(firstExpression));
+      return constructCompareFilter(
+          ComparisonExpression.Operator.LESS_THAN_OR_EQUAL,
+          (SymbolReference) secondExpression,
+          (Literal) firstExpression,
+          context);
+    } else if (isSymbolReference(thirdExpression)
+        && context.isMeasurementColumn((SymbolReference) thirdExpression)) {
+      checkArgument(isLiteral(firstExpression));
+      return constructCompareFilter(
+          ComparisonExpression.Operator.GREATER_THAN_OR_EQUAL,
+          (SymbolReference) thirdExpression,
+          (Literal) firstExpression,
+          context);
+    } else {
+      throw new IllegalStateException(
+          String.format("%s is not supported in value push down", node));
+    }
+  }
+
+  private <T extends Comparable<T>> Filter constructBetweenFilter(
+      SymbolReference measurementReference,
+      Expression minValue,
+      Expression maxValue,
+      ConvertPredicateToFilterVisitor.Context context) {
+    int measurementIndex = 
context.getMeasurementIndex(measurementReference.getName());
+    Type dataType = context.getType(Symbol.from(measurementReference));
+
+    checkArgument(isLiteral(minValue) && isLiteral(maxValue));
+
+    T min = getValue((Literal) minValue, dataType);
+    T max = getValue((Literal) maxValue, dataType);
+
+    if (min.compareTo(max) == 0) {
+      return ValueFilterApi.eq(measurementIndex, min);
+    }
+    return ValueFilterApi.between(measurementIndex, min, max);
+  }
+
+  public static double getDoubleValue(Expression expression) {
+    return ((DoubleLiteral) expression).getValue();
+  }
+
+  public static boolean getBooleanValue(Expression expression) {
+    return ((BooleanLiteral) expression).getValue();
+  }
+
+  public static String getStringValue(Expression expression) {
+    return ((StringLiteral) expression).getValue();
+  }
+
+  public static class Context {
+
+    private final Map<String, Integer> measuremrntsMap;
+    private final Map<Symbol, ColumnSchema> schemaMap;
+
+    public Context(List<String> allMeasurements, Map<Symbol, ColumnSchema> 
schemaMap) {
+      this.measuremrntsMap = new HashMap<>(allMeasurements.size());
+      for (int i = 0, size = allMeasurements.size(); i < size; i++) {
+        measuremrntsMap.put(allMeasurements.get(i), i);
+      }
+      this.schemaMap = schemaMap;
+    }
+
+    public int getMeasurementIndex(String measurement) {
+      Integer index = measuremrntsMap.get(measurement);
+      if (index == null) {
+        throw new IllegalArgumentException(
+            String.format("Measurement %s does not exist", measurement));
+      }
+      return index;
+    }
+
+    public Type getType(Symbol symbol) {
+      Type type = schemaMap.get(symbol).getType();
+      if (type == null) {
+        throw new IllegalArgumentException(
+            String.format("ColumnSchema of Symbol %s isn't saved in 
schemaMap", symbol));
+      }
+      return type;
+    }
+
+    public boolean isMeasurementColumn(SymbolReference symbolReference) {
+      ColumnSchema schema = schemaMap.get(Symbol.from(symbolReference));
+      return schema != null && schema.getColumnCategory() == 
TsTableColumnCategory.MEASUREMENT;
+    }
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/predicate/ConvertPredicateToTimeFilterVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/predicate/ConvertPredicateToTimeFilterVisitor.java
index f48a78151df..0deedfcefe4 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/predicate/ConvertPredicateToTimeFilterVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/predicate/ConvertPredicateToTimeFilterVisitor.java
@@ -59,7 +59,7 @@ public class ConvertPredicateToTimeFilterVisitor extends 
PredicateVisitor<Filter
       checkArgument(value instanceof LongLiteral);
     }
     if (values.size() == 1) {
-      TimeFilterApi.eq(((LongLiteral) values.get(0)).getParsedValue());
+      TimeFilterApi.eq(getLongValue(values.get(0)));
     }
     Set<Long> longValues = new HashSet<>();
     for (Expression value : values) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/predicate/PredicatePushIntoScanChecker.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/predicate/PredicatePushIntoScanChecker.java
index 519027bf832..7885deb71dd 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/predicate/PredicatePushIntoScanChecker.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/predicate/PredicatePushIntoScanChecker.java
@@ -63,7 +63,7 @@ public class PredicatePushIntoScanChecker extends 
PredicateVisitor<Boolean, Void
   protected Boolean visitLikePredicate(LikePredicate node, Void context) {
     return isSymbolReference(node.getValue())
         && isLiteral(node.getPattern())
-        && node.getEscape().map(this::isLiteral).orElse(true);
+        && 
node.getEscape().map(PredicatePushIntoScanChecker::isLiteral).orElse(true);
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/ProjectNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/ProjectNode.java
index ed65602cf9e..96f5da07379 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/ProjectNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/ProjectNode.java
@@ -35,6 +35,10 @@ public class ProjectNode extends SingleChildProcessNode {
     return null;
   }
 
+  public Assignments getAssignments() {
+    return assignments;
+  }
+
   @Override
   protected void serializeAttributes(ByteBuffer byteBuffer) {}
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/multi/LogicalAndMultiColumnTransformer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/multi/LogicalAndMultiColumnTransformer.java
new file mode 100644
index 00000000000..af14cb2204e
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/multi/LogicalAndMultiColumnTransformer.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.transformation.dag.column.multi;
+
+import 
org.apache.iotdb.db.queryengine.transformation.dag.column.ColumnTransformer;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.type.Type;
+
+import java.util.List;
+
+public class LogicalAndMultiColumnTransformer extends 
LogicalMultiColumnTransformer {
+  public LogicalAndMultiColumnTransformer(
+      Type returnType, List<ColumnTransformer> columnTransformerList) {
+    super(returnType, columnTransformerList);
+  }
+
+  @Override
+  protected void doTransform(
+      List<Column> childrenColumns, ColumnBuilder builder, int positionCount) {
+    for (int i = 0; i < positionCount; i++) {
+      boolean result = true;
+      boolean hasNull = false;
+      for (Column column : childrenColumns) {
+        if (column.isNull(i)) {
+          hasNull = true;
+        } else if (!column.getBoolean(i)) {
+          result = false;
+          break;
+        }
+      }
+      // have no null, all is true, result will be true
+      // have no null, and also have false, result will be false
+      // have null, and others are all true, result will be null
+      // have null, and also have false, result will be false
+      if (!result) {
+        returnType.writeBoolean(builder, false);
+      } else {
+        if (hasNull) {
+          builder.appendNull();
+        } else {
+          returnType.writeBoolean(builder, true);
+        }
+      }
+    }
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/predicate/ConvertPredicateToFilterVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/multi/LogicalMultiColumnTransformer.java
similarity index 50%
copy from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/predicate/ConvertPredicateToFilterVisitor.java
copy to 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/multi/LogicalMultiColumnTransformer.java
index 7ab2448ded1..6824af8ce97 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/predicate/ConvertPredicateToFilterVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/multi/LogicalMultiColumnTransformer.java
@@ -17,6 +17,27 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.queryengine.plan.relational.analyzer.predicate;
+package org.apache.iotdb.db.queryengine.transformation.dag.column.multi;
 
-public class ConvertPredicateToFilterVisitor {}
+import 
org.apache.iotdb.db.queryengine.transformation.dag.column.ColumnTransformer;
+import org.apache.iotdb.tsfile.read.common.type.Type;
+import org.apache.iotdb.tsfile.read.common.type.TypeEnum;
+
+import java.util.List;
+
+public abstract class LogicalMultiColumnTransformer extends 
MultiColumnTransformer {
+
+  protected LogicalMultiColumnTransformer(
+      Type returnType, List<ColumnTransformer> columnTransformerList) {
+    super(returnType, columnTransformerList);
+  }
+
+  @Override
+  protected void checkType() {
+    for (ColumnTransformer child : columnTransformerList) {
+      if (!child.typeEquals(TypeEnum.BOOLEAN)) {
+        throw new UnsupportedOperationException("Unsupported Type");
+      }
+    }
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/multi/LogicalOrMultiColumnTransformer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/multi/LogicalOrMultiColumnTransformer.java
new file mode 100644
index 00000000000..0d49f0bce91
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/multi/LogicalOrMultiColumnTransformer.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.transformation.dag.column.multi;
+
+import 
org.apache.iotdb.db.queryengine.transformation.dag.column.ColumnTransformer;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.type.Type;
+
+import java.util.List;
+
+public class LogicalOrMultiColumnTransformer extends 
LogicalMultiColumnTransformer {
+  public LogicalOrMultiColumnTransformer(
+      Type returnType, List<ColumnTransformer> columnTransformerList) {
+    super(returnType, columnTransformerList);
+  }
+
+  @Override
+  protected void doTransform(
+      List<Column> childrenColumns, ColumnBuilder builder, int positionCount) {
+    for (int i = 0; i < positionCount; i++) {
+      boolean result = false;
+      boolean hasNull = false;
+      for (Column column : childrenColumns) {
+        if (column.isNull(i)) {
+          hasNull = true;
+        } else if (column.getBoolean(i)) {
+          result = true;
+          break;
+        }
+      }
+      // have any true, result will be true
+      // have no true, and have both false and null, result will be null
+      // have all false, result will be false
+      if (result) {
+        returnType.writeBoolean(builder, true);
+      } else {
+        if (hasNull) {
+          builder.appendNull();
+        } else {
+          returnType.writeBoolean(builder, false);
+        }
+      }
+    }
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/multi/MultiColumnTransformer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/multi/MultiColumnTransformer.java
new file mode 100644
index 00000000000..4bb874c6c07
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/multi/MultiColumnTransformer.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.transformation.dag.column.multi;
+
+import 
org.apache.iotdb.db.queryengine.transformation.dag.column.ColumnTransformer;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.type.Type;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+public abstract class MultiColumnTransformer extends ColumnTransformer {
+
+  protected final List<ColumnTransformer> columnTransformerList;
+
+  protected MultiColumnTransformer(Type returnType, List<ColumnTransformer> 
columnTransformerList) {
+    super(returnType);
+    this.columnTransformerList = columnTransformerList;
+    checkType();
+  }
+
+  @Override
+  public void evaluate() {
+
+    for (ColumnTransformer child : columnTransformerList) {
+      child.tryEvaluate();
+    }
+
+    // attention: get positionCount before calling getColumn
+    int positionCount = 
columnTransformerList.get(0).getColumnCachePositionCount();
+
+    ColumnBuilder builder = returnType.createColumnBuilder(positionCount);
+    doTransform(
+        columnTransformerList.stream()
+            .map(ColumnTransformer::getColumn)
+            .collect(Collectors.toList()),
+        builder,
+        positionCount);
+    initializeColumnCache(builder.build());
+  }
+
+  protected abstract void doTransform(
+      List<Column> childrenColumns, ColumnBuilder builder, int positionCount);
+
+  public List<ColumnTransformer> getChildren() {
+    return columnTransformerList;
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/InColumnTransformer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/InColumnTransformer.java
index 6aa7e0797c8..7f197802f09 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/InColumnTransformer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/column/unary/InColumnTransformer.java
@@ -21,6 +21,11 @@ package 
org.apache.iotdb.db.queryengine.transformation.dag.column.unary;
 
 import org.apache.iotdb.db.exception.sql.SemanticException;
 import 
org.apache.iotdb.db.queryengine.transformation.dag.column.ColumnTransformer;
+import org.apache.iotdb.db.relational.sql.tree.BooleanLiteral;
+import org.apache.iotdb.db.relational.sql.tree.DoubleLiteral;
+import org.apache.iotdb.db.relational.sql.tree.Literal;
+import org.apache.iotdb.db.relational.sql.tree.LongLiteral;
+import org.apache.iotdb.db.relational.sql.tree.StringLiteral;
 import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
 import org.apache.iotdb.tsfile.read.common.block.column.Column;
 import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
@@ -28,6 +33,7 @@ import org.apache.iotdb.tsfile.read.common.type.Type;
 import org.apache.iotdb.tsfile.read.common.type.TypeEnum;
 
 import java.util.HashSet;
+import java.util.List;
 import java.util.Set;
 
 public class InColumnTransformer extends UnaryColumnTransformer {
@@ -56,6 +62,17 @@ public class InColumnTransformer extends 
UnaryColumnTransformer {
     initTypedSet(values);
   }
 
+  public InColumnTransformer(
+      Type returnType, ColumnTransformer childColumnTransformer, List<Literal> 
values) {
+    super(returnType, childColumnTransformer);
+    satisfy = new InSatisfy();
+    this.childType =
+        childColumnTransformer.getType() == null
+            ? null
+            : childColumnTransformer.getType().getTypeEnum();
+    initTypedSet(values);
+  }
+
   @Override
   protected void doTransform(Column column, ColumnBuilder columnBuilder) {
     for (int i = 0, n = column.getPositionCount(); i < n; i++) {
@@ -150,6 +167,69 @@ public class InColumnTransformer extends 
UnaryColumnTransformer {
     }
   }
 
+  private void initTypedSet(List<Literal> values) {
+    if (childType == null) {
+      return;
+    }
+    String errorMsg = "\"%s\" cannot be cast to [%s]";
+    switch (childType) {
+      case INT32:
+        intSet = new HashSet<>();
+        for (Literal value : values) {
+          try {
+            intSet.add((int) ((LongLiteral) value).getParsedValue());
+          } catch (IllegalArgumentException e) {
+            throw new SemanticException(String.format(errorMsg, value, 
childType));
+          }
+        }
+        break;
+      case INT64:
+        longSet = new HashSet<>();
+        for (Literal value : values) {
+          try {
+            longSet.add((((LongLiteral) value).getParsedValue()));
+          } catch (IllegalArgumentException e) {
+            throw new SemanticException(String.format(errorMsg, value, 
childType));
+          }
+        }
+        break;
+      case FLOAT:
+        floatSet = new HashSet<>();
+        for (Literal value : values) {
+          try {
+            floatSet.add((float) ((DoubleLiteral) value).getValue());
+          } catch (IllegalArgumentException e) {
+            throw new SemanticException(String.format(errorMsg, value, 
childType));
+          }
+        }
+        break;
+      case DOUBLE:
+        doubleSet = new HashSet<>();
+        for (Literal value : values) {
+          try {
+            doubleSet.add(((DoubleLiteral) value).getValue());
+          } catch (IllegalArgumentException e) {
+            throw new SemanticException(String.format(errorMsg, value, 
childType));
+          }
+        }
+        break;
+      case BOOLEAN:
+        booleanSet = new HashSet<>();
+        for (Literal value : values) {
+          booleanSet.add(((BooleanLiteral) value).getValue());
+        }
+        break;
+      case TEXT:
+        stringSet = new HashSet<>();
+        for (Literal value : values) {
+          stringSet.add(((StringLiteral) value).getValue());
+        }
+        break;
+      default:
+        throw new UnsupportedOperationException("unsupported data type: " + 
childType);
+    }
+  }
+
   private boolean strictCastToBool(String s) {
     if ("true".equalsIgnoreCase(s)) {
       return true;

Reply via email to