This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch transform-filter-planner in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 35e02aedb6b6713a7a2ce0e172b6cf2a2726f413 Author: Steve Yurong Su <[email protected]> AuthorDate: Sun May 8 19:02:23 2022 +0800 TransformNode & FilterNode --- .../execution/operator/process/FilterOperator.java | 12 +- .../operator/process/TransformOperator.java | 24 +++- .../db/mpp/plan/planner/LocalExecutionPlanner.java | 56 ++++++++- .../mpp/plan/planner/plan/node/PlanNodeType.java | 6 +- .../plan/planner/plan/node/process/FilterNode.java | 86 +++++++------- .../planner/plan/node/process/TransformNode.java | 125 +++++++++++++++++++-- .../db/query/udf/core/executor/UDTFContext.java | 7 ++ .../db/mpp/plan/plan/QueryLogicalPlanUtil.java | 85 +++++++++++--- .../plan/node/process/FilterNodeSerdeTest.java | 7 +- 9 files changed, 328 insertions(+), 80 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FilterOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FilterOperator.java index f7a37068f2..79e5e2fd2f 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FilterOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/FilterOperator.java @@ -22,8 +22,8 @@ package org.apache.iotdb.db.mpp.execution.operator.process; import org.apache.iotdb.db.exception.query.QueryProcessException; import org.apache.iotdb.db.mpp.execution.operator.Operator; import org.apache.iotdb.db.mpp.execution.operator.OperatorContext; +import org.apache.iotdb.db.mpp.plan.analyze.TypeProvider; import org.apache.iotdb.db.query.expression.Expression; -import org.apache.iotdb.db.query.udf.core.executor.UDTFContext; import org.apache.iotdb.db.query.udf.core.reader.LayerPointReader; import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; @@ -33,6 +33,7 @@ import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder; import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder; import java.io.IOException; +import java.time.ZoneId; import java.util.ArrayList; import java.util.List; @@ -46,15 +47,18 @@ public class FilterOperator extends TransformOperator { List<TSDataType> inputDataTypes, Expression filterExpression, Expression[] outputExpressions, - UDTFContext udtfContext) + boolean keepNull, + ZoneId zoneId, + TypeProvider typeProvider) throws QueryProcessException, IOException { super( operatorContext, inputOperator, inputDataTypes, bindExpressions(filterExpression, outputExpressions), - udtfContext, - false); + keepNull, + zoneId, + typeProvider); } private static Expression[] bindExpressions( diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TransformOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TransformOperator.java index 76d1a4a0fb..7d47611f6d 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TransformOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/TransformOperator.java @@ -23,6 +23,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.exception.query.QueryProcessException; import org.apache.iotdb.db.mpp.execution.operator.Operator; import org.apache.iotdb.db.mpp.execution.operator.OperatorContext; +import org.apache.iotdb.db.mpp.plan.analyze.TypeProvider; import org.apache.iotdb.db.query.expression.Expression; import org.apache.iotdb.db.query.udf.core.executor.UDTFContext; import org.apache.iotdb.db.query.udf.core.layer.EvaluationDAGBuilder; @@ -42,11 +43,13 @@ import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder; import com.google.common.util.concurrent.ListenableFuture; import java.io.IOException; +import java.time.ZoneId; import java.util.ArrayList; import java.util.List; public class TransformOperator implements ProcessOperator { + // TODO: make it configurable protected static final int FETCH_SIZE = 10000; protected final float udfReaderMemoryBudgetInMB = @@ -60,10 +63,10 @@ public class TransformOperator implements ProcessOperator { protected final Operator inputOperator; protected final List<TSDataType> inputDataTypes; protected final Expression[] outputExpressions; - protected final UDTFContext udtfContext; protected final boolean keepNull; protected RawQueryInputLayer inputLayer; + protected UDTFContext udtfContext; protected LayerPointReader[] transformers; protected TimeSelector timeHeap; protected List<TSDataType> outputDataTypes; @@ -73,19 +76,21 @@ public class TransformOperator implements ProcessOperator { Operator inputOperator, List<TSDataType> inputDataTypes, Expression[] outputExpressions, - UDTFContext udtfContext, - boolean keepNull) + boolean keepNull, + ZoneId zoneId, + TypeProvider typeProvider) throws QueryProcessException, IOException { this.operatorContext = operatorContext; this.inputOperator = inputOperator; this.inputDataTypes = inputDataTypes; this.outputExpressions = outputExpressions; - this.udtfContext = udtfContext; this.keepNull = keepNull; initInputLayer(inputDataTypes); + initUdtfContext(zoneId); initTransformers(); readyForFirstIteration(); + updateTypeProvider(typeProvider); } private void initInputLayer(List<TSDataType> inputDataTypes) throws QueryProcessException { @@ -96,6 +101,11 @@ public class TransformOperator implements ProcessOperator { new TsBlockInputDataSet(inputOperator, inputDataTypes)); } + private void initUdtfContext(ZoneId zoneId) { + udtfContext = new UDTFContext(zoneId); + udtfContext.constructUdfExecutors(outputExpressions); + } + protected void initTransformers() throws QueryProcessException, IOException { UDFRegistrationService.getInstance().acquireRegistrationLock(); try { @@ -139,6 +149,12 @@ public class TransformOperator implements ProcessOperator { } } + private void updateTypeProvider(TypeProvider typeProvider) { + for (int i = 0; i < transformers.length; ++i) { + typeProvider.setType(outputExpressions[i].toString(), transformers[i].getDataType()); + } + } + @Override public boolean hasNext() { return !timeHeap.isEmpty(); diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java index 9fe53fb9e1..3a8919af44 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanner.java @@ -20,6 +20,7 @@ package org.apache.iotdb.db.mpp.plan.planner; import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.db.engine.storagegroup.DataRegion; +import org.apache.iotdb.db.exception.query.QueryProcessException; import org.apache.iotdb.db.metadata.path.AlignedPath; import org.apache.iotdb.db.metadata.schemaregion.ISchemaRegion; import org.apache.iotdb.db.mpp.aggregation.AccumulatorFactory; @@ -38,9 +39,11 @@ import org.apache.iotdb.db.mpp.execution.operator.Operator; import org.apache.iotdb.db.mpp.execution.operator.OperatorContext; import org.apache.iotdb.db.mpp.execution.operator.process.DeviceMergeOperator; import org.apache.iotdb.db.mpp.execution.operator.process.DeviceViewOperator; +import org.apache.iotdb.db.mpp.execution.operator.process.FilterOperator; import org.apache.iotdb.db.mpp.execution.operator.process.LimitOperator; import org.apache.iotdb.db.mpp.execution.operator.process.OffsetOperator; import org.apache.iotdb.db.mpp.execution.operator.process.TimeJoinOperator; +import org.apache.iotdb.db.mpp.execution.operator.process.TransformOperator; import org.apache.iotdb.db.mpp.execution.operator.process.merge.AscTimeComparator; import org.apache.iotdb.db.mpp.execution.operator.process.merge.ColumnMerger; import org.apache.iotdb.db.mpp.execution.operator.process.merge.DescTimeComparator; @@ -99,6 +102,9 @@ import org.apache.iotdb.db.utils.datastructure.TimeSelector; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.read.filter.basic.Filter; +import org.apache.commons.lang3.Validate; + +import java.io.IOException; import java.util.ArrayList; import java.util.LinkedHashMap; import java.util.List; @@ -433,12 +439,49 @@ public class LocalExecutionPlanner { @Override public Operator visitTransform(TransformNode node, LocalExecutionPlanContext context) { - return super.visitTransform(node, context); + final OperatorContext operatorContext = + context.instanceContext.addOperatorContext( + context.getNextOperatorId(), + node.getPlanNodeId(), + TransformNode.class.getSimpleName()); + final Operator inputOperator = generateOnlyChildOperator(node, context); + final List<TSDataType> inputDataTypes = getOutputColumnTypes(node, context.getTypeProvider()); + + try { + return new TransformOperator( + operatorContext, + inputOperator, + inputDataTypes, + node.getOutputExpressions(), + node.isKeepNull(), + node.getZoneId(), + context.getTypeProvider()); + } catch (QueryProcessException | IOException e) { + throw new RuntimeException(e); + } } @Override public Operator visitFilter(FilterNode node, LocalExecutionPlanContext context) { - return super.visitFilter(node, context); + final OperatorContext operatorContext = + context.instanceContext.addOperatorContext( + context.getNextOperatorId(), node.getPlanNodeId(), FilterNode.class.getSimpleName()); + final Operator inputOperator = generateOnlyChildOperator(node, context); + final List<TSDataType> inputDataTypes = getOutputColumnTypes(node, context.getTypeProvider()); + + try { + return new FilterOperator( + operatorContext, + inputOperator, + inputDataTypes, + node.getPredicate(), + node.getOutputExpressions(), + node.isKeepNull(), + node.getZoneId(), + context.getTypeProvider()); + } catch (QueryProcessException | IOException e) { + throw new RuntimeException(e); + } } @Override @@ -629,6 +672,15 @@ public class LocalExecutionPlanner { .map(typeProvider::getType) .collect(Collectors.toList()); } + + private Operator generateOnlyChildOperator(PlanNode node, LocalExecutionPlanContext context) { + List<Operator> children = + node.getChildren().stream() + .map(child -> child.accept(this, context)) + .collect(Collectors.toList()); + Validate.isTrue(children.size() == 1); + return children.get(0); + } } private static class InstanceHolder { diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNodeType.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNodeType.java index 4d3df73e2f..5662d0ca54 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNodeType.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNodeType.java @@ -44,6 +44,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.OffsetNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.ProjectNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SortNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TimeJoinNode; +import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TransformNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.sink.FragmentSinkNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesAggregationScanNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesScanNode; @@ -96,7 +97,8 @@ public enum PlanNodeType { ALIGNED_SERIES_SCAN((short) 33), ALIGNED_SERIES_AGGREGATE_SCAN((short) 34), DEVICE_MERGE((short) 35), - SCHEMA_FETCH_MERGE((short) 36); + SCHEMA_FETCH_MERGE((short) 36), + TRANSFORM((short) 37); private final short nodeType; @@ -194,6 +196,8 @@ public enum PlanNodeType { return AlignedSeriesAggregationScanNode.deserialize(buffer); case 36: return SchemaFetchMergeNode.deserialize(buffer); + case 37: + return TransformNode.deserialize(buffer); default: throw new IllegalArgumentException("Invalid node type: " + nodeType); } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/FilterNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/FilterNode.java index dfc10e0cba..338a519123 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/FilterNode.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/FilterNode.java @@ -23,74 +23,74 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId; import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType; import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor; import org.apache.iotdb.db.query.expression.Expression; - -import com.google.common.collect.ImmutableList; +import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; import java.nio.ByteBuffer; -import java.util.List; +import java.time.ZoneId; import java.util.Objects; -/** The FilterNode is responsible to filter the RowRecord from TsBlock. */ -public class FilterNode extends ProcessNode { - - private PlanNode child; +public class FilterNode extends TransformNode { private final Expression predicate; - public FilterNode(PlanNodeId id, Expression predicate) { - super(id); + public FilterNode( + PlanNodeId id, + PlanNode childPlanNode, + Expression[] outputExpressions, + Expression predicate, + boolean keepNull, + ZoneId zoneId) { + super(id, childPlanNode, outputExpressions, keepNull, zoneId); this.predicate = predicate; } - public FilterNode(PlanNodeId id, PlanNode child, Expression predicate) { - this(id, predicate); - this.child = child; - } - - public Expression getPredicate() { - return predicate; - } - - @Override - public List<PlanNode> getChildren() { - return ImmutableList.of(child); - } - - @Override - public void addChild(PlanNode child) { - this.child = child; + public FilterNode( + PlanNodeId id, + Expression[] outputExpressions, + Expression predicate, + boolean keepNull, + ZoneId zoneId) { + super(id, outputExpressions, keepNull, zoneId); + this.predicate = predicate; } @Override - public int allowedChildCount() { - return ONE_CHILD; + public <R, C> R accept(PlanVisitor<R, C> visitor, C context) { + return visitor.visitFilter(this, context); } @Override public PlanNode clone() { - return new FilterNode(getPlanNodeId(), predicate); - } - - @Override - public List<String> getOutputColumnNames() { - return child.getOutputColumnNames(); - } - - @Override - public <R, C> R accept(PlanVisitor<R, C> visitor, C context) { - return visitor.visitFilter(this, context); + return new FilterNode(getPlanNodeId(), outputExpressions, predicate, keepNull, zoneId); } @Override protected void serializeAttributes(ByteBuffer byteBuffer) { PlanNodeType.FILTER.serialize(byteBuffer); + ReadWriteIOUtils.write(outputExpressions.length, byteBuffer); + for (Expression expression : outputExpressions) { + Expression.serialize(expression, byteBuffer); + } Expression.serialize(predicate, byteBuffer); + ReadWriteIOUtils.write(keepNull, byteBuffer); + ReadWriteIOUtils.write(zoneId.getId(), byteBuffer); } public static FilterNode deserialize(ByteBuffer byteBuffer) { + int outputExpressionsLength = ReadWriteIOUtils.readInt(byteBuffer); + Expression[] outputExpressions = new Expression[outputExpressionsLength]; + for (int i = 0; i < outputExpressionsLength; ++i) { + outputExpressions[i] = Expression.deserialize(byteBuffer); + } Expression predicate = Expression.deserialize(byteBuffer); + boolean keepNull = ReadWriteIOUtils.readBool(byteBuffer); + ZoneId zoneId = ZoneId.of(Objects.requireNonNull(ReadWriteIOUtils.readString(byteBuffer))); PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer); - return new FilterNode(planNodeId, predicate); + return new FilterNode(planNodeId, outputExpressions, predicate, keepNull, zoneId); + } + + public Expression getPredicate() { + return predicate; } @Override @@ -98,18 +98,18 @@ public class FilterNode extends ProcessNode { if (this == o) { return true; } - if (o == null || getClass() != o.getClass()) { + if (!(o instanceof FilterNode)) { return false; } if (!super.equals(o)) { return false; } FilterNode that = (FilterNode) o; - return child.equals(that.child) && predicate.equals(that.predicate); + return predicate.equals(that.predicate); } @Override public int hashCode() { - return Objects.hash(super.hashCode(), child, predicate); + return Objects.hash(super.hashCode(), predicate); } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/TransformNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/TransformNode.java index 3fedaf99eb..75dd967780 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/TransformNode.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/TransformNode.java @@ -21,39 +21,144 @@ package org.apache.iotdb.db.mpp.plan.planner.plan.node.process; import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId; +import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType; +import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor; +import org.apache.iotdb.db.query.expression.Expression; +import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; + +import com.google.common.collect.ImmutableList; import java.nio.ByteBuffer; +import java.time.ZoneId; +import java.util.ArrayList; +import java.util.Arrays; import java.util.List; +import java.util.Objects; public class TransformNode extends ProcessNode { - public TransformNode(PlanNodeId id) { + protected PlanNode childPlanNode; + + protected final Expression[] outputExpressions; + protected final boolean keepNull; + protected final ZoneId zoneId; + + private List<String> outputColumnNames; + + public TransformNode( + PlanNodeId id, + PlanNode childPlanNode, + Expression[] outputExpressions, + boolean keepNull, + ZoneId zoneId) { + super(id); + this.childPlanNode = childPlanNode; + this.outputExpressions = outputExpressions; + this.keepNull = keepNull; + this.zoneId = zoneId; + } + + public TransformNode( + PlanNodeId id, Expression[] outputExpressions, boolean keepNull, ZoneId zoneId) { super(id); + this.outputExpressions = outputExpressions; + this.keepNull = keepNull; + this.zoneId = zoneId; + } + + @Override + public final List<PlanNode> getChildren() { + return ImmutableList.of(childPlanNode); + } + + @Override + public final void addChild(PlanNode childPlanNode) { + this.childPlanNode = childPlanNode; } @Override - public List<PlanNode> getChildren() { - return null; + public final int allowedChildCount() { + return ONE_CHILD; } @Override - public void addChild(PlanNode child) {} + public final List<String> getOutputColumnNames() { + if (outputColumnNames == null) { + outputColumnNames = new ArrayList<>(); + for (Expression expression : outputExpressions) { + outputColumnNames.add(expression.toString()); + } + } + return outputColumnNames; + } + + @Override + public <R, C> R accept(PlanVisitor<R, C> visitor, C context) { + return visitor.visitTransform(this, context); + } @Override public PlanNode clone() { - return null; + return new TransformNode(getPlanNodeId(), outputExpressions, keepNull, zoneId); } @Override - public int allowedChildCount() { - return 0; + protected void serializeAttributes(ByteBuffer byteBuffer) { + PlanNodeType.TRANSFORM.serialize(byteBuffer); + ReadWriteIOUtils.write(outputExpressions.length, byteBuffer); + for (Expression expression : outputExpressions) { + Expression.serialize(expression, byteBuffer); + } + ReadWriteIOUtils.write(keepNull, byteBuffer); + ReadWriteIOUtils.write(zoneId.getId(), byteBuffer); + } + + public static TransformNode deserialize(ByteBuffer byteBuffer) { + int outputExpressionsLength = ReadWriteIOUtils.readInt(byteBuffer); + Expression[] outputExpressions = new Expression[outputExpressionsLength]; + for (int i = 0; i < outputExpressionsLength; ++i) { + outputExpressions[i] = Expression.deserialize(byteBuffer); + } + boolean keepNull = ReadWriteIOUtils.readBool(byteBuffer); + ZoneId zoneId = ZoneId.of(Objects.requireNonNull(ReadWriteIOUtils.readString(byteBuffer))); + PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer); + return new TransformNode(planNodeId, outputExpressions, keepNull, zoneId); + } + + public final Expression[] getOutputExpressions() { + return outputExpressions; + } + + public final boolean isKeepNull() { + return keepNull; + } + + public final ZoneId getZoneId() { + return zoneId; } @Override - public List<String> getOutputColumnNames() { - return null; + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof TransformNode)) { + return false; + } + if (!super.equals(o)) { + return false; + } + TransformNode that = (TransformNode) o; + return keepNull == that.keepNull + && childPlanNode.equals(that.childPlanNode) + && Arrays.equals(outputExpressions, that.outputExpressions) + && zoneId.equals(that.zoneId); } @Override - protected void serializeAttributes(ByteBuffer byteBuffer) {} + public int hashCode() { + int result = Objects.hash(super.hashCode(), childPlanNode, keepNull, zoneId); + result = 31 * result + Arrays.hashCode(outputExpressions); + return result; + } } diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/executor/UDTFContext.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/executor/UDTFContext.java index 2c2e19abd2..9350376708 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/executor/UDTFContext.java +++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/executor/UDTFContext.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.query.udf.core.executor; +import org.apache.iotdb.db.query.expression.Expression; import org.apache.iotdb.db.query.expression.ResultColumn; import org.apache.iotdb.db.query.expression.multi.FunctionExpression; import org.apache.iotdb.db.query.udf.service.UDFClassLoaderManager; @@ -44,6 +45,12 @@ public class UDTFContext { } } + public void constructUdfExecutors(Expression[] outputExpressions) { + for (Expression expression : outputExpressions) { + expression.constructUdfExecutors(expressionName2Executor, zoneId); + } + } + public void finalizeUDFExecutors(long queryId) { try { for (UDTFExecutor executor : expressionName2Executor.values()) { diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/QueryLogicalPlanUtil.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/QueryLogicalPlanUtil.java index 055104c12b..c500cd278e 100644 --- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/QueryLogicalPlanUtil.java +++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/QueryLogicalPlanUtil.java @@ -38,18 +38,20 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationDescriptor import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationStep; import org.apache.iotdb.db.mpp.plan.statement.component.FilterNullPolicy; import org.apache.iotdb.db.mpp.plan.statement.component.OrderBy; -import org.apache.iotdb.db.qp.constant.SQLConstant; import org.apache.iotdb.db.query.aggregation.AggregationType; +import org.apache.iotdb.db.query.expression.Expression; import org.apache.iotdb.db.query.expression.binary.GreaterThanExpression; import org.apache.iotdb.db.query.expression.binary.LogicAndExpression; import org.apache.iotdb.db.query.expression.leaf.ConstantOperand; import org.apache.iotdb.db.query.expression.leaf.TimeSeriesOperand; +import org.apache.iotdb.db.query.expression.leaf.TimestampOperand; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.read.filter.TimeFilter; import org.apache.iotdb.tsfile.read.filter.basic.Filter; import org.apache.commons.compress.utils.Sets; +import java.time.ZoneId; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -136,8 +138,7 @@ public class QueryLogicalPlanUtil { GreaterThanExpression timeFilter = new GreaterThanExpression( - new TimeSeriesOperand(SQLConstant.TIME_PATH), - new ConstantOperand(TSDataType.INT64, "100")); + new TimestampOperand(), new ConstantOperand(TSDataType.INT64, "100")); GreaterThanExpression valueFilter1 = new GreaterThanExpression( new TimeSeriesOperand(schemaMap.get("root.sg.d1.s2")), @@ -152,7 +153,17 @@ public class QueryLogicalPlanUtil { new LogicAndExpression(timeFilter, valueFilter2)); FilterNode filterNode = - new FilterNode(new PlanNodeId("test_query_4"), timeJoinNode, expression); + new FilterNode( + new PlanNodeId("test_query_4"), + timeJoinNode, + new Expression[] { + new TimeSeriesOperand(schemaMap.get("root.sg.d1.s2")), + new TimeSeriesOperand(schemaMap.get("root.sg.d2.s1")), + new TimeSeriesOperand(schemaMap.get("root.sg.d2.s2")), + }, + expression, + false, + ZoneId.systemDefault()); ProjectNode projectNode = new ProjectNode( @@ -212,8 +223,7 @@ public class QueryLogicalPlanUtil { GreaterThanExpression timeFilter = new GreaterThanExpression( - new TimeSeriesOperand(SQLConstant.TIME_PATH), - new ConstantOperand(TSDataType.INT64, "100")); + new TimestampOperand(), new ConstantOperand(TSDataType.INT64, "100")); GreaterThanExpression valueFilter1 = new GreaterThanExpression( new TimeSeriesOperand(schemaMap.get("root.sg.d1.s2")), @@ -228,9 +238,27 @@ public class QueryLogicalPlanUtil { new LogicAndExpression(timeFilter, valueFilter2)); FilterNode filterNode1 = - new FilterNode(new PlanNodeId("test_query_6"), timeJoinNode1, expression); + new FilterNode( + new PlanNodeId("test_query_6"), + timeJoinNode1, + new Expression[] { + new TimeSeriesOperand(schemaMap.get("root.sg.d1.s1")), + new TimeSeriesOperand(schemaMap.get("root.sg.d1.s2")), + }, + expression, + false, + ZoneId.systemDefault()); FilterNode filterNode2 = - new FilterNode(new PlanNodeId("test_query_7"), timeJoinNode2, expression); + new FilterNode( + new PlanNodeId("test_query_7"), + timeJoinNode2, + new Expression[] { + new TimeSeriesOperand(schemaMap.get("root.sg.d2.s1")), + new TimeSeriesOperand(schemaMap.get("root.sg.d2.s2")), + }, + expression, + false, + ZoneId.systemDefault()); DeviceViewNode deviceViewNode = new DeviceViewNode( @@ -496,8 +524,7 @@ public class QueryLogicalPlanUtil { GreaterThanExpression timeFilter = new GreaterThanExpression( - new TimeSeriesOperand(SQLConstant.TIME_PATH), - new ConstantOperand(TSDataType.INT64, "100")); + new TimestampOperand(), new ConstantOperand(TSDataType.INT64, "100")); GreaterThanExpression valueFilter1 = new GreaterThanExpression( new TimeSeriesOperand(schemaMap.get("root.sg.d1.s2")), @@ -511,7 +538,18 @@ public class QueryLogicalPlanUtil { new LogicAndExpression(timeFilter, valueFilter1), new LogicAndExpression(timeFilter, valueFilter2)); FilterNode filterNode = - new FilterNode(new PlanNodeId("test_query_5"), timeJoinNode, expression); + new FilterNode( + new PlanNodeId("test_query_5"), + timeJoinNode, + new Expression[] { + new TimeSeriesOperand(schemaMap.get("root.sg.d1.s1")), + new TimeSeriesOperand(schemaMap.get("root.sg.d1.s2")), + new TimeSeriesOperand(schemaMap.get("root.sg.d2.s1")), + new TimeSeriesOperand(schemaMap.get("root.sg.d2.s2")), + }, + expression, + false, + ZoneId.systemDefault()); AggregationNode aggregationNode = new AggregationNode( @@ -629,8 +667,7 @@ public class QueryLogicalPlanUtil { GreaterThanExpression timeFilter = new GreaterThanExpression( - new TimeSeriesOperand(SQLConstant.TIME_PATH), - new ConstantOperand(TSDataType.INT64, "100")); + new TimestampOperand(), new ConstantOperand(TSDataType.INT64, "100")); GreaterThanExpression valueFilter1 = new GreaterThanExpression( new TimeSeriesOperand(schemaMap.get("root.sg.d1.s2")), @@ -645,9 +682,27 @@ public class QueryLogicalPlanUtil { new LogicAndExpression(timeFilter, valueFilter2)); FilterNode filterNode1 = - new FilterNode(new PlanNodeId("test_query_6"), timeJoinNode1, expression); + new FilterNode( + new PlanNodeId("test_query_6"), + timeJoinNode1, + new Expression[] { + new TimeSeriesOperand(schemaMap.get("root.sg.d1.s1")), + new TimeSeriesOperand(schemaMap.get("root.sg.d1.s2")), + }, + expression, + false, + ZoneId.systemDefault()); FilterNode filterNode2 = - new FilterNode(new PlanNodeId("test_query_7"), timeJoinNode2, expression); + new FilterNode( + new PlanNodeId("test_query_7"), + timeJoinNode2, + new Expression[] { + new TimeSeriesOperand(schemaMap.get("root.sg.d2.s1")), + new TimeSeriesOperand(schemaMap.get("root.sg.d2.s2")), + }, + expression, + false, + ZoneId.systemDefault()); AggregationNode aggregationNode1 = new AggregationNode( diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/process/FilterNodeSerdeTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/process/FilterNodeSerdeTest.java index a68782a73e..35346f29d8 100644 --- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/process/FilterNodeSerdeTest.java +++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/node/process/FilterNodeSerdeTest.java @@ -25,6 +25,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId; import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.FilterNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TimeJoinNode; import org.apache.iotdb.db.mpp.plan.statement.component.OrderBy; +import org.apache.iotdb.db.query.expression.Expression; import org.apache.iotdb.db.query.expression.binary.GreaterThanExpression; import org.apache.iotdb.db.query.expression.leaf.ConstantOperand; import org.apache.iotdb.db.query.expression.leaf.TimeSeriesOperand; @@ -33,6 +34,7 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.junit.Test; import java.nio.ByteBuffer; +import java.time.ZoneId; import static org.junit.Assert.assertEquals; @@ -46,9 +48,12 @@ public class FilterNodeSerdeTest { new FilterNode( new PlanNodeId("TestFilterNode"), timeJoinNode, + new Expression[] {new TimeSeriesOperand(new PartialPath("root.sg.d1.s1"))}, new GreaterThanExpression( new TimeSeriesOperand(new PartialPath("root.sg.d1.s1")), - new ConstantOperand(TSDataType.INT64, "100"))); + new ConstantOperand(TSDataType.INT64, "100")), + false, + ZoneId.systemDefault()); ByteBuffer byteBuffer = ByteBuffer.allocate(1024); filterNode.serialize(byteBuffer);
