This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch transform-filter-planner
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 35e02aedb6b6713a7a2ce0e172b6cf2a2726f413
Author: Steve Yurong Su <[email protected]>
AuthorDate: Sun May 8 19:02:23 2022 +0800

    TransformNode & FilterNode
---
 .../execution/operator/process/FilterOperator.java |  12 +-
 .../operator/process/TransformOperator.java        |  24 +++-
 .../db/mpp/plan/planner/LocalExecutionPlanner.java |  56 ++++++++-
 .../mpp/plan/planner/plan/node/PlanNodeType.java   |   6 +-
 .../plan/planner/plan/node/process/FilterNode.java |  86 +++++++-------
 .../planner/plan/node/process/TransformNode.java   | 125 +++++++++++++++++++--
 .../db/query/udf/core/executor/UDTFContext.java    |   7 ++
 .../db/mpp/plan/plan/QueryLogicalPlanUtil.java     |  85 +++++++++++---
 .../plan/node/process/FilterNodeSerdeTest.java     |   7 +-
 9 files changed, 328 insertions(+), 80 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FilterOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FilterOperator.java
index f7a37068f2..79e5e2fd2f 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FilterOperator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FilterOperator.java
@@ -22,8 +22,8 @@ package org.apache.iotdb.db.mpp.execution.operator.process;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.mpp.execution.operator.Operator;
 import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
+import org.apache.iotdb.db.mpp.plan.analyze.TypeProvider;
 import org.apache.iotdb.db.query.expression.Expression;
-import org.apache.iotdb.db.query.udf.core.executor.UDTFContext;
 import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader;
 import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -33,6 +33,7 @@ import 
org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
 import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
 
 import java.io.IOException;
+import java.time.ZoneId;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -46,15 +47,18 @@ public class FilterOperator extends TransformOperator {
       List<TSDataType> inputDataTypes,
       Expression filterExpression,
       Expression[] outputExpressions,
-      UDTFContext udtfContext)
+      boolean keepNull,
+      ZoneId zoneId,
+      TypeProvider typeProvider)
       throws QueryProcessException, IOException {
     super(
         operatorContext,
         inputOperator,
         inputDataTypes,
         bindExpressions(filterExpression, outputExpressions),
-        udtfContext,
-        false);
+        keepNull,
+        zoneId,
+        typeProvider);
   }
 
   private static Expression[] bindExpressions(
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TransformOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TransformOperator.java
index 76d1a4a0fb..7d47611f6d 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TransformOperator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TransformOperator.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.mpp.execution.operator.Operator;
 import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
+import org.apache.iotdb.db.mpp.plan.analyze.TypeProvider;
 import org.apache.iotdb.db.query.expression.Expression;
 import org.apache.iotdb.db.query.udf.core.executor.UDTFContext;
 import org.apache.iotdb.db.query.udf.core.layer.EvaluationDAGBuilder;
@@ -42,11 +43,13 @@ import 
org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
 import com.google.common.util.concurrent.ListenableFuture;
 
 import java.io.IOException;
+import java.time.ZoneId;
 import java.util.ArrayList;
 import java.util.List;
 
 public class TransformOperator implements ProcessOperator {
 
+  // TODO: make it configurable
   protected static final int FETCH_SIZE = 10000;
 
   protected final float udfReaderMemoryBudgetInMB =
@@ -60,10 +63,10 @@ public class TransformOperator implements ProcessOperator {
   protected final Operator inputOperator;
   protected final List<TSDataType> inputDataTypes;
   protected final Expression[] outputExpressions;
-  protected final UDTFContext udtfContext;
   protected final boolean keepNull;
 
   protected RawQueryInputLayer inputLayer;
+  protected UDTFContext udtfContext;
   protected LayerPointReader[] transformers;
   protected TimeSelector timeHeap;
   protected List<TSDataType> outputDataTypes;
@@ -73,19 +76,21 @@ public class TransformOperator implements ProcessOperator {
       Operator inputOperator,
       List<TSDataType> inputDataTypes,
       Expression[] outputExpressions,
-      UDTFContext udtfContext,
-      boolean keepNull)
+      boolean keepNull,
+      ZoneId zoneId,
+      TypeProvider typeProvider)
       throws QueryProcessException, IOException {
     this.operatorContext = operatorContext;
     this.inputOperator = inputOperator;
     this.inputDataTypes = inputDataTypes;
     this.outputExpressions = outputExpressions;
-    this.udtfContext = udtfContext;
     this.keepNull = keepNull;
 
     initInputLayer(inputDataTypes);
+    initUdtfContext(zoneId);
     initTransformers();
     readyForFirstIteration();
+    updateTypeProvider(typeProvider);
   }
 
   private void initInputLayer(List<TSDataType> inputDataTypes) throws 
QueryProcessException {
@@ -96,6 +101,11 @@ public class TransformOperator implements ProcessOperator {
             new TsBlockInputDataSet(inputOperator, inputDataTypes));
   }
 
+  private void initUdtfContext(ZoneId zoneId) {
+    udtfContext = new UDTFContext(zoneId);
+    udtfContext.constructUdfExecutors(outputExpressions);
+  }
+
   protected void initTransformers() throws QueryProcessException, IOException {
     UDFRegistrationService.getInstance().acquireRegistrationLock();
     try {
@@ -139,6 +149,12 @@ public class TransformOperator implements ProcessOperator {
     }
   }
 
+  private void updateTypeProvider(TypeProvider typeProvider) {
+    for (int i = 0; i < transformers.length; ++i) {
+      typeProvider.setType(outputExpressions[i].toString(), 
transformers[i].getDataType());
+    }
+  }
+
   @Override
   public boolean hasNext() {
     return !timeHeap.isEmpty();
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
index 9fe53fb9e1..3a8919af44 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.db.mpp.plan.planner;
 
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.db.engine.storagegroup.DataRegion;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.metadata.path.AlignedPath;
 import org.apache.iotdb.db.metadata.schemaregion.ISchemaRegion;
 import org.apache.iotdb.db.mpp.aggregation.AccumulatorFactory;
@@ -38,9 +39,11 @@ import org.apache.iotdb.db.mpp.execution.operator.Operator;
 import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
 import org.apache.iotdb.db.mpp.execution.operator.process.DeviceMergeOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.DeviceViewOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.FilterOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.LimitOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.OffsetOperator;
 import org.apache.iotdb.db.mpp.execution.operator.process.TimeJoinOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.TransformOperator;
 import 
org.apache.iotdb.db.mpp.execution.operator.process.merge.AscTimeComparator;
 import org.apache.iotdb.db.mpp.execution.operator.process.merge.ColumnMerger;
 import 
org.apache.iotdb.db.mpp.execution.operator.process.merge.DescTimeComparator;
@@ -99,6 +102,9 @@ import org.apache.iotdb.db.utils.datastructure.TimeSelector;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 
+import org.apache.commons.lang3.Validate;
+
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -433,12 +439,49 @@ public class LocalExecutionPlanner {
 
     @Override
     public Operator visitTransform(TransformNode node, 
LocalExecutionPlanContext context) {
-      return super.visitTransform(node, context);
+      final OperatorContext operatorContext =
+          context.instanceContext.addOperatorContext(
+              context.getNextOperatorId(),
+              node.getPlanNodeId(),
+              TransformNode.class.getSimpleName());
+      final Operator inputOperator = generateOnlyChildOperator(node, context);
+      final List<TSDataType> inputDataTypes = getOutputColumnTypes(node, 
context.getTypeProvider());
+
+      try {
+        return new TransformOperator(
+            operatorContext,
+            inputOperator,
+            inputDataTypes,
+            node.getOutputExpressions(),
+            node.isKeepNull(),
+            node.getZoneId(),
+            context.getTypeProvider());
+      } catch (QueryProcessException | IOException e) {
+        throw new RuntimeException(e);
+      }
     }
 
     @Override
     public Operator visitFilter(FilterNode node, LocalExecutionPlanContext 
context) {
-      return super.visitFilter(node, context);
+      final OperatorContext operatorContext =
+          context.instanceContext.addOperatorContext(
+              context.getNextOperatorId(), node.getPlanNodeId(), 
FilterNode.class.getSimpleName());
+      final Operator inputOperator = generateOnlyChildOperator(node, context);
+      final List<TSDataType> inputDataTypes = getOutputColumnTypes(node, 
context.getTypeProvider());
+
+      try {
+        return new FilterOperator(
+            operatorContext,
+            inputOperator,
+            inputDataTypes,
+            node.getPredicate(),
+            node.getOutputExpressions(),
+            node.isKeepNull(),
+            node.getZoneId(),
+            context.getTypeProvider());
+      } catch (QueryProcessException | IOException e) {
+        throw new RuntimeException(e);
+      }
     }
 
     @Override
@@ -629,6 +672,15 @@ public class LocalExecutionPlanner {
           .map(typeProvider::getType)
           .collect(Collectors.toList());
     }
+
+    private Operator generateOnlyChildOperator(PlanNode node, 
LocalExecutionPlanContext context) {
+      List<Operator> children =
+          node.getChildren().stream()
+              .map(child -> child.accept(this, context))
+              .collect(Collectors.toList());
+      Validate.isTrue(children.size() == 1);
+      return children.get(0);
+    }
   }
 
   private static class InstanceHolder {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNodeType.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNodeType.java
index 4d3df73e2f..5662d0ca54 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNodeType.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNodeType.java
@@ -44,6 +44,7 @@ import 
org.apache.iotdb.db.mpp.plan.planner.plan.node.process.OffsetNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.ProjectNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SortNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TimeJoinNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TransformNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.sink.FragmentSinkNode;
 import 
org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesAggregationScanNode;
 import 
org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesScanNode;
@@ -96,7 +97,8 @@ public enum PlanNodeType {
   ALIGNED_SERIES_SCAN((short) 33),
   ALIGNED_SERIES_AGGREGATE_SCAN((short) 34),
   DEVICE_MERGE((short) 35),
-  SCHEMA_FETCH_MERGE((short) 36);
+  SCHEMA_FETCH_MERGE((short) 36),
+  TRANSFORM((short) 37);
 
   private final short nodeType;
 
@@ -194,6 +196,8 @@ public enum PlanNodeType {
         return AlignedSeriesAggregationScanNode.deserialize(buffer);
       case 36:
         return SchemaFetchMergeNode.deserialize(buffer);
+      case 37:
+        return TransformNode.deserialize(buffer);
       default:
         throw new IllegalArgumentException("Invalid node type: " + nodeType);
     }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/FilterNode.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/FilterNode.java
index dfc10e0cba..338a519123 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/FilterNode.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/FilterNode.java
@@ -23,74 +23,74 @@ import 
org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor;
 import org.apache.iotdb.db.query.expression.Expression;
-
-import com.google.common.collect.ImmutableList;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
 import java.nio.ByteBuffer;
-import java.util.List;
+import java.time.ZoneId;
 import java.util.Objects;
 
-/** The FilterNode is responsible to filter the RowRecord from TsBlock. */
-public class FilterNode extends ProcessNode {
-
-  private PlanNode child;
+public class FilterNode extends TransformNode {
 
   private final Expression predicate;
 
-  public FilterNode(PlanNodeId id, Expression predicate) {
-    super(id);
+  public FilterNode(
+      PlanNodeId id,
+      PlanNode childPlanNode,
+      Expression[] outputExpressions,
+      Expression predicate,
+      boolean keepNull,
+      ZoneId zoneId) {
+    super(id, childPlanNode, outputExpressions, keepNull, zoneId);
     this.predicate = predicate;
   }
 
-  public FilterNode(PlanNodeId id, PlanNode child, Expression predicate) {
-    this(id, predicate);
-    this.child = child;
-  }
-
-  public Expression getPredicate() {
-    return predicate;
-  }
-
-  @Override
-  public List<PlanNode> getChildren() {
-    return ImmutableList.of(child);
-  }
-
-  @Override
-  public void addChild(PlanNode child) {
-    this.child = child;
+  public FilterNode(
+      PlanNodeId id,
+      Expression[] outputExpressions,
+      Expression predicate,
+      boolean keepNull,
+      ZoneId zoneId) {
+    super(id, outputExpressions, keepNull, zoneId);
+    this.predicate = predicate;
   }
 
   @Override
-  public int allowedChildCount() {
-    return ONE_CHILD;
+  public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+    return visitor.visitFilter(this, context);
   }
 
   @Override
   public PlanNode clone() {
-    return new FilterNode(getPlanNodeId(), predicate);
-  }
-
-  @Override
-  public List<String> getOutputColumnNames() {
-    return child.getOutputColumnNames();
-  }
-
-  @Override
-  public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
-    return visitor.visitFilter(this, context);
+    return new FilterNode(getPlanNodeId(), outputExpressions, predicate, 
keepNull, zoneId);
   }
 
   @Override
   protected void serializeAttributes(ByteBuffer byteBuffer) {
     PlanNodeType.FILTER.serialize(byteBuffer);
+    ReadWriteIOUtils.write(outputExpressions.length, byteBuffer);
+    for (Expression expression : outputExpressions) {
+      Expression.serialize(expression, byteBuffer);
+    }
     Expression.serialize(predicate, byteBuffer);
+    ReadWriteIOUtils.write(keepNull, byteBuffer);
+    ReadWriteIOUtils.write(zoneId.getId(), byteBuffer);
   }
 
   public static FilterNode deserialize(ByteBuffer byteBuffer) {
+    int outputExpressionsLength = ReadWriteIOUtils.readInt(byteBuffer);
+    Expression[] outputExpressions = new Expression[outputExpressionsLength];
+    for (int i = 0; i < outputExpressionsLength; ++i) {
+      outputExpressions[i] = Expression.deserialize(byteBuffer);
+    }
     Expression predicate = Expression.deserialize(byteBuffer);
+    boolean keepNull = ReadWriteIOUtils.readBool(byteBuffer);
+    ZoneId zoneId = 
ZoneId.of(Objects.requireNonNull(ReadWriteIOUtils.readString(byteBuffer)));
     PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
-    return new FilterNode(planNodeId, predicate);
+    return new FilterNode(planNodeId, outputExpressions, predicate, keepNull, 
zoneId);
+  }
+
+  public Expression getPredicate() {
+    return predicate;
   }
 
   @Override
@@ -98,18 +98,18 @@ public class FilterNode extends ProcessNode {
     if (this == o) {
       return true;
     }
-    if (o == null || getClass() != o.getClass()) {
+    if (!(o instanceof FilterNode)) {
       return false;
     }
     if (!super.equals(o)) {
       return false;
     }
     FilterNode that = (FilterNode) o;
-    return child.equals(that.child) && predicate.equals(that.predicate);
+    return predicate.equals(that.predicate);
   }
 
   @Override
   public int hashCode() {
-    return Objects.hash(super.hashCode(), child, predicate);
+    return Objects.hash(super.hashCode(), predicate);
   }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/TransformNode.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/TransformNode.java
index 3fedaf99eb..75dd967780 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/TransformNode.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/TransformNode.java
@@ -21,39 +21,144 @@ package 
org.apache.iotdb.db.mpp.plan.planner.plan.node.process;
 
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor;
+import org.apache.iotdb.db.query.expression.Expression;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import com.google.common.collect.ImmutableList;
 
 import java.nio.ByteBuffer;
+import java.time.ZoneId;
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
+import java.util.Objects;
 
 public class TransformNode extends ProcessNode {
 
-  public TransformNode(PlanNodeId id) {
+  protected PlanNode childPlanNode;
+
+  protected final Expression[] outputExpressions;
+  protected final boolean keepNull;
+  protected final ZoneId zoneId;
+
+  private List<String> outputColumnNames;
+
+  public TransformNode(
+      PlanNodeId id,
+      PlanNode childPlanNode,
+      Expression[] outputExpressions,
+      boolean keepNull,
+      ZoneId zoneId) {
+    super(id);
+    this.childPlanNode = childPlanNode;
+    this.outputExpressions = outputExpressions;
+    this.keepNull = keepNull;
+    this.zoneId = zoneId;
+  }
+
+  public TransformNode(
+      PlanNodeId id, Expression[] outputExpressions, boolean keepNull, ZoneId 
zoneId) {
     super(id);
+    this.outputExpressions = outputExpressions;
+    this.keepNull = keepNull;
+    this.zoneId = zoneId;
+  }
+
+  @Override
+  public final List<PlanNode> getChildren() {
+    return ImmutableList.of(childPlanNode);
+  }
+
+  @Override
+  public final void addChild(PlanNode childPlanNode) {
+    this.childPlanNode = childPlanNode;
   }
 
   @Override
-  public List<PlanNode> getChildren() {
-    return null;
+  public final int allowedChildCount() {
+    return ONE_CHILD;
   }
 
   @Override
-  public void addChild(PlanNode child) {}
+  public final List<String> getOutputColumnNames() {
+    if (outputColumnNames == null) {
+      outputColumnNames = new ArrayList<>();
+      for (Expression expression : outputExpressions) {
+        outputColumnNames.add(expression.toString());
+      }
+    }
+    return outputColumnNames;
+  }
+
+  @Override
+  public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+    return visitor.visitTransform(this, context);
+  }
 
   @Override
   public PlanNode clone() {
-    return null;
+    return new TransformNode(getPlanNodeId(), outputExpressions, keepNull, 
zoneId);
   }
 
   @Override
-  public int allowedChildCount() {
-    return 0;
+  protected void serializeAttributes(ByteBuffer byteBuffer) {
+    PlanNodeType.TRANSFORM.serialize(byteBuffer);
+    ReadWriteIOUtils.write(outputExpressions.length, byteBuffer);
+    for (Expression expression : outputExpressions) {
+      Expression.serialize(expression, byteBuffer);
+    }
+    ReadWriteIOUtils.write(keepNull, byteBuffer);
+    ReadWriteIOUtils.write(zoneId.getId(), byteBuffer);
+  }
+
+  public static TransformNode deserialize(ByteBuffer byteBuffer) {
+    int outputExpressionsLength = ReadWriteIOUtils.readInt(byteBuffer);
+    Expression[] outputExpressions = new Expression[outputExpressionsLength];
+    for (int i = 0; i < outputExpressionsLength; ++i) {
+      outputExpressions[i] = Expression.deserialize(byteBuffer);
+    }
+    boolean keepNull = ReadWriteIOUtils.readBool(byteBuffer);
+    ZoneId zoneId = 
ZoneId.of(Objects.requireNonNull(ReadWriteIOUtils.readString(byteBuffer)));
+    PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
+    return new TransformNode(planNodeId, outputExpressions, keepNull, zoneId);
+  }
+
+  public final Expression[] getOutputExpressions() {
+    return outputExpressions;
+  }
+
+  public final boolean isKeepNull() {
+    return keepNull;
+  }
+
+  public final ZoneId getZoneId() {
+    return zoneId;
   }
 
   @Override
-  public List<String> getOutputColumnNames() {
-    return null;
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (!(o instanceof TransformNode)) {
+      return false;
+    }
+    if (!super.equals(o)) {
+      return false;
+    }
+    TransformNode that = (TransformNode) o;
+    return keepNull == that.keepNull
+        && childPlanNode.equals(that.childPlanNode)
+        && Arrays.equals(outputExpressions, that.outputExpressions)
+        && zoneId.equals(that.zoneId);
   }
 
   @Override
-  protected void serializeAttributes(ByteBuffer byteBuffer) {}
+  public int hashCode() {
+    int result = Objects.hash(super.hashCode(), childPlanNode, keepNull, 
zoneId);
+    result = 31 * result + Arrays.hashCode(outputExpressions);
+    return result;
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/query/udf/core/executor/UDTFContext.java
 
b/server/src/main/java/org/apache/iotdb/db/query/udf/core/executor/UDTFContext.java
index 2c2e19abd2..9350376708 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/query/udf/core/executor/UDTFContext.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/query/udf/core/executor/UDTFContext.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.query.udf.core.executor;
 
+import org.apache.iotdb.db.query.expression.Expression;
 import org.apache.iotdb.db.query.expression.ResultColumn;
 import org.apache.iotdb.db.query.expression.multi.FunctionExpression;
 import org.apache.iotdb.db.query.udf.service.UDFClassLoaderManager;
@@ -44,6 +45,12 @@ public class UDTFContext {
     }
   }
 
+  public void constructUdfExecutors(Expression[] outputExpressions) {
+    for (Expression expression : outputExpressions) {
+      expression.constructUdfExecutors(expressionName2Executor, zoneId);
+    }
+  }
+
   public void finalizeUDFExecutors(long queryId) {
     try {
       for (UDTFExecutor executor : expressionName2Executor.values()) {
diff --git 
a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/QueryLogicalPlanUtil.java
 
b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/QueryLogicalPlanUtil.java
index 055104c12b..c500cd278e 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/QueryLogicalPlanUtil.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/QueryLogicalPlanUtil.java
@@ -38,18 +38,20 @@ import 
org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationDescriptor
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationStep;
 import org.apache.iotdb.db.mpp.plan.statement.component.FilterNullPolicy;
 import org.apache.iotdb.db.mpp.plan.statement.component.OrderBy;
-import org.apache.iotdb.db.qp.constant.SQLConstant;
 import org.apache.iotdb.db.query.aggregation.AggregationType;
+import org.apache.iotdb.db.query.expression.Expression;
 import org.apache.iotdb.db.query.expression.binary.GreaterThanExpression;
 import org.apache.iotdb.db.query.expression.binary.LogicAndExpression;
 import org.apache.iotdb.db.query.expression.leaf.ConstantOperand;
 import org.apache.iotdb.db.query.expression.leaf.TimeSeriesOperand;
+import org.apache.iotdb.db.query.expression.leaf.TimestampOperand;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.filter.TimeFilter;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 
 import org.apache.commons.compress.utils.Sets;
 
+import java.time.ZoneId;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -136,8 +138,7 @@ public class QueryLogicalPlanUtil {
 
     GreaterThanExpression timeFilter =
         new GreaterThanExpression(
-            new TimeSeriesOperand(SQLConstant.TIME_PATH),
-            new ConstantOperand(TSDataType.INT64, "100"));
+            new TimestampOperand(), new ConstantOperand(TSDataType.INT64, 
"100"));
     GreaterThanExpression valueFilter1 =
         new GreaterThanExpression(
             new TimeSeriesOperand(schemaMap.get("root.sg.d1.s2")),
@@ -152,7 +153,17 @@ public class QueryLogicalPlanUtil {
             new LogicAndExpression(timeFilter, valueFilter2));
 
     FilterNode filterNode =
-        new FilterNode(new PlanNodeId("test_query_4"), timeJoinNode, 
expression);
+        new FilterNode(
+            new PlanNodeId("test_query_4"),
+            timeJoinNode,
+            new Expression[] {
+              new TimeSeriesOperand(schemaMap.get("root.sg.d1.s2")),
+              new TimeSeriesOperand(schemaMap.get("root.sg.d2.s1")),
+              new TimeSeriesOperand(schemaMap.get("root.sg.d2.s2")),
+            },
+            expression,
+            false,
+            ZoneId.systemDefault());
 
     ProjectNode projectNode =
         new ProjectNode(
@@ -212,8 +223,7 @@ public class QueryLogicalPlanUtil {
 
     GreaterThanExpression timeFilter =
         new GreaterThanExpression(
-            new TimeSeriesOperand(SQLConstant.TIME_PATH),
-            new ConstantOperand(TSDataType.INT64, "100"));
+            new TimestampOperand(), new ConstantOperand(TSDataType.INT64, 
"100"));
     GreaterThanExpression valueFilter1 =
         new GreaterThanExpression(
             new TimeSeriesOperand(schemaMap.get("root.sg.d1.s2")),
@@ -228,9 +238,27 @@ public class QueryLogicalPlanUtil {
             new LogicAndExpression(timeFilter, valueFilter2));
 
     FilterNode filterNode1 =
-        new FilterNode(new PlanNodeId("test_query_6"), timeJoinNode1, 
expression);
+        new FilterNode(
+            new PlanNodeId("test_query_6"),
+            timeJoinNode1,
+            new Expression[] {
+              new TimeSeriesOperand(schemaMap.get("root.sg.d1.s1")),
+              new TimeSeriesOperand(schemaMap.get("root.sg.d1.s2")),
+            },
+            expression,
+            false,
+            ZoneId.systemDefault());
     FilterNode filterNode2 =
-        new FilterNode(new PlanNodeId("test_query_7"), timeJoinNode2, 
expression);
+        new FilterNode(
+            new PlanNodeId("test_query_7"),
+            timeJoinNode2,
+            new Expression[] {
+              new TimeSeriesOperand(schemaMap.get("root.sg.d2.s1")),
+              new TimeSeriesOperand(schemaMap.get("root.sg.d2.s2")),
+            },
+            expression,
+            false,
+            ZoneId.systemDefault());
 
     DeviceViewNode deviceViewNode =
         new DeviceViewNode(
@@ -496,8 +524,7 @@ public class QueryLogicalPlanUtil {
 
     GreaterThanExpression timeFilter =
         new GreaterThanExpression(
-            new TimeSeriesOperand(SQLConstant.TIME_PATH),
-            new ConstantOperand(TSDataType.INT64, "100"));
+            new TimestampOperand(), new ConstantOperand(TSDataType.INT64, 
"100"));
     GreaterThanExpression valueFilter1 =
         new GreaterThanExpression(
             new TimeSeriesOperand(schemaMap.get("root.sg.d1.s2")),
@@ -511,7 +538,18 @@ public class QueryLogicalPlanUtil {
             new LogicAndExpression(timeFilter, valueFilter1),
             new LogicAndExpression(timeFilter, valueFilter2));
     FilterNode filterNode =
-        new FilterNode(new PlanNodeId("test_query_5"), timeJoinNode, 
expression);
+        new FilterNode(
+            new PlanNodeId("test_query_5"),
+            timeJoinNode,
+            new Expression[] {
+              new TimeSeriesOperand(schemaMap.get("root.sg.d1.s1")),
+              new TimeSeriesOperand(schemaMap.get("root.sg.d1.s2")),
+              new TimeSeriesOperand(schemaMap.get("root.sg.d2.s1")),
+              new TimeSeriesOperand(schemaMap.get("root.sg.d2.s2")),
+            },
+            expression,
+            false,
+            ZoneId.systemDefault());
 
     AggregationNode aggregationNode =
         new AggregationNode(
@@ -629,8 +667,7 @@ public class QueryLogicalPlanUtil {
 
     GreaterThanExpression timeFilter =
         new GreaterThanExpression(
-            new TimeSeriesOperand(SQLConstant.TIME_PATH),
-            new ConstantOperand(TSDataType.INT64, "100"));
+            new TimestampOperand(), new ConstantOperand(TSDataType.INT64, 
"100"));
     GreaterThanExpression valueFilter1 =
         new GreaterThanExpression(
             new TimeSeriesOperand(schemaMap.get("root.sg.d1.s2")),
@@ -645,9 +682,27 @@ public class QueryLogicalPlanUtil {
             new LogicAndExpression(timeFilter, valueFilter2));
 
     FilterNode filterNode1 =
-        new FilterNode(new PlanNodeId("test_query_6"), timeJoinNode1, 
expression);
+        new FilterNode(
+            new PlanNodeId("test_query_6"),
+            timeJoinNode1,
+            new Expression[] {
+              new TimeSeriesOperand(schemaMap.get("root.sg.d1.s1")),
+              new TimeSeriesOperand(schemaMap.get("root.sg.d1.s2")),
+            },
+            expression,
+            false,
+            ZoneId.systemDefault());
     FilterNode filterNode2 =
-        new FilterNode(new PlanNodeId("test_query_7"), timeJoinNode2, 
expression);
+        new FilterNode(
+            new PlanNodeId("test_query_7"),
+            timeJoinNode2,
+            new Expression[] {
+              new TimeSeriesOperand(schemaMap.get("root.sg.d2.s1")),
+              new TimeSeriesOperand(schemaMap.get("root.sg.d2.s2")),
+            },
+            expression,
+            false,
+            ZoneId.systemDefault());
 
     AggregationNode aggregationNode1 =
         new AggregationNode(
diff --git 
a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/process/FilterNodeSerdeTest.java
 
b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/process/FilterNodeSerdeTest.java
index a68782a73e..35346f29d8 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/process/FilterNodeSerdeTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/process/FilterNodeSerdeTest.java
@@ -25,6 +25,7 @@ import 
org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.FilterNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TimeJoinNode;
 import org.apache.iotdb.db.mpp.plan.statement.component.OrderBy;
+import org.apache.iotdb.db.query.expression.Expression;
 import org.apache.iotdb.db.query.expression.binary.GreaterThanExpression;
 import org.apache.iotdb.db.query.expression.leaf.ConstantOperand;
 import org.apache.iotdb.db.query.expression.leaf.TimeSeriesOperand;
@@ -33,6 +34,7 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.junit.Test;
 
 import java.nio.ByteBuffer;
+import java.time.ZoneId;
 
 import static org.junit.Assert.assertEquals;
 
@@ -46,9 +48,12 @@ public class FilterNodeSerdeTest {
         new FilterNode(
             new PlanNodeId("TestFilterNode"),
             timeJoinNode,
+            new Expression[] {new TimeSeriesOperand(new 
PartialPath("root.sg.d1.s1"))},
             new GreaterThanExpression(
                 new TimeSeriesOperand(new PartialPath("root.sg.d1.s1")),
-                new ConstantOperand(TSDataType.INT64, "100")));
+                new ConstantOperand(TSDataType.INT64, "100")),
+            false,
+            ZoneId.systemDefault());
 
     ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
     filterNode.serialize(byteBuffer);

Reply via email to