This is an automated email from the ASF dual-hosted git repository. xingtanzjr pushed a commit to branch xingtanzjr/mpp-query-basis in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit fe5d9d228ebff8d3dc5ef0739fcfbf8e282ffd99 Author: Jinrui.Zhang <[email protected]> AuthorDate: Thu Mar 10 22:18:43 2022 +0800 complete basic definition of all nodes --- .../query/distribution/common/LevelBucketInfo.java | 15 ------ .../common/{TraversalOrder.java => OrderBy.java} | 2 +- .../distribution/common/SeriesBatchAggInfo.java | 21 -------- .../query/distribution/common/SeriesBatchData.java | 14 ----- .../cluster/query/distribution/plan/PlanNode.java | 7 ++- .../query/distribution/plan/PlanNodeId.java | 34 ++++++++++++ .../distribution/plan/process/DeviceMergeNode.java | 40 ++++++++++---- .../query/distribution/plan/process/FillNode.java | 17 ++++-- .../distribution/plan/process/FilterNode.java | 19 ++++--- .../plan/process/GroupByLevelNode.java | 34 ++++++------ .../query/distribution/plan/process/LimitNode.java | 17 ++++-- .../distribution/plan/process/OffsetNode.java | 17 ++++-- .../distribution/plan/process/ProcessNode.java | 7 ++- .../plan/process/SeriesAggregateNode.java | 36 ------------- .../query/distribution/plan/process/SortNode.java | 23 +++++--- .../distribution/plan/process/TimeJoinNode.java | 40 +++++++++++--- .../distribution/plan/process/WithoutNode.java | 18 ++++--- .../query/distribution/plan/sink/CsvSinkNode.java | 13 ++++- .../distribution/plan/sink/FragmentSinkNode.java | 20 +++++++ .../query/distribution/plan/sink/SinkNode.java | 10 +++- .../distribution/plan/sink/ThriftSinkNode.java | 16 +++++- .../distribution/plan/source/CsvSourceNode.java | 11 +++- .../plan/source/SeriesAggregateNode.java | 63 ++++++++++++++++++++++ .../distribution/plan/source/SeriesScanNode.java | 45 ++++++++++------ .../query/distribution/plan/source/SourceNode.java | 8 ++- 25 files changed, 363 insertions(+), 184 deletions(-) diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/common/LevelBucketInfo.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/common/LevelBucketInfo.java deleted file mode 100644 index 44ee37d..0000000 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/common/LevelBucketInfo.java +++ /dev/null @@ -1,15 +0,0 @@ -package org.apache.iotdb.cluster.query.distribution.common; - -import java.util.List; -import java.util.Map; - -/** - * This class is used to store all the buckets for the GroupByLevelOperator - * It stores the levels index and all the enumerated values in each level by a HashMap - * Using the HashMap, the operator could calculate all the buckets using combination of values from each level - */ -public class LevelBucketInfo { - // eg: If the clause is `group by level = 1, 2, 3`, the map should be like - // map{1 -> ['a', 'b'], 2 -> ['aa', 'bb'], 3 -> ['aaa', 'bbb']} - private Map<Integer, List<String>> levelMap; -} diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/common/TraversalOrder.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/common/OrderBy.java similarity index 87% rename from cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/common/TraversalOrder.java rename to cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/common/OrderBy.java index ea88253..07f005f 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/common/TraversalOrder.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/common/OrderBy.java @@ -3,7 +3,7 @@ package org.apache.iotdb.cluster.query.distribution.common; /** * The traversal order for operators by timestamp */ -public enum TraversalOrder { +public enum OrderBy { TIMESTAMP_ASC, TIMESTAMP_DESC, DEVICE_NAME_ASC, diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/common/SeriesBatchAggInfo.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/common/SeriesBatchAggInfo.java deleted file mode 100644 index c1726d5..0000000 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/common/SeriesBatchAggInfo.java +++ /dev/null @@ -1,21 +0,0 @@ -package org.apache.iotdb.cluster.query.distribution.common; - -import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics; -import org.apache.iotdb.tsfile.read.common.Path; -import org.apache.iotdb.tsfile.read.common.TimeRange; - -/** - * SeriesBatchAggInfo is the "batch" result of SeriesAggregateOperator when its getNextBatch() is invoked. - */ -public class SeriesBatchAggInfo { - // Path of the series. - // Path will be used in the downstream operators. - // GroupByLevelOperator will use it to divide the data into different buckets to do the rollup operation. - private Path path; - - // Time range of current statistic. - private TimeRange timeRange; - - // Statistics for the series in current time range - private Statistics<?> statistics; -} diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/common/SeriesBatchData.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/common/SeriesBatchData.java deleted file mode 100644 index 3384d49..0000000 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/common/SeriesBatchData.java +++ /dev/null @@ -1,14 +0,0 @@ -package org.apache.iotdb.cluster.query.distribution.common; - -import org.apache.iotdb.tsfile.read.common.BatchData; - -/** - * @author xingtanzjr - * TODO: currently we only use it to describe the result set of SeriesScanOperator - * The BatchData is suitable as the encapsulation of part of result set of SeriesScanOperator - * BatchData is the class defined and generally used in single-node IoTDB - * We leverage it as the `batch` here. We can consider a more general name or make some modifications for it. - */ -public class SeriesBatchData extends BatchData { - -} diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/PlanNode.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/PlanNode.java index 527b697..d4b763d 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/PlanNode.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/PlanNode.java @@ -7,6 +7,9 @@ import org.apache.iotdb.cluster.query.distribution.common.TreeNode; * The base class of query executable operators, which is used to compose logical query plan. * TODO: consider how to restrict the children type for each type of ExecOperator */ -public abstract class PlanNode<T> extends TreeNode<PlanNode<?>> { - +public abstract class PlanNode<T> extends TreeNode<PlanNode<T>> { + private PlanNodeId id; + public PlanNode(PlanNodeId id) { + this.id = id; + } } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/PlanNodeId.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/PlanNodeId.java new file mode 100644 index 0000000..b9d2888 --- /dev/null +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/PlanNodeId.java @@ -0,0 +1,34 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.iotdb.cluster.query.distribution.plan; + +public class PlanNodeId { + private String id; + public PlanNodeId(String id) { + this.id = id; + } + + public String getId() { + return this.id; + } + + @Override + public String toString() { + return this.id; + } +} diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/DeviceMergeNode.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/DeviceMergeNode.java index fbadebe..9cd2549 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/DeviceMergeNode.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/DeviceMergeNode.java @@ -1,8 +1,10 @@ package org.apache.iotdb.cluster.query.distribution.plan.process; import org.apache.iotdb.cluster.query.distribution.common.TsBlock; -import org.apache.iotdb.cluster.query.distribution.common.TraversalOrder; +import org.apache.iotdb.cluster.query.distribution.common.OrderBy; +import org.apache.iotdb.cluster.query.distribution.common.WithoutPolicy; import org.apache.iotdb.cluster.query.distribution.plan.PlanNode; +import org.apache.iotdb.cluster.query.distribution.plan.PlanNodeId; import java.util.List; import java.util.Map; @@ -11,18 +13,36 @@ import java.util.Map; * DeviceMergeOperator is responsible for constructing a device-based view of a set of series. And output the result with * specific order. The order could be 'order by device' or 'order by timestamp' * - * The types of involved devices should be same. If the device contains n series, the device-based view will contain n+2 - * columns, which are timestamp column, device name column and n value columns of involved series. + * Each output from its children should have the same schema. That means, the columns should be same between these TsBlocks. + * If the input TsBlock contains n columns, the device-based view will contain n+1 columns where the new column is Device + * column. * - * Children type: [TimeJoinOperator] */ -public class DeviceMergeNode extends ProcessNode<TsBlock> { +public class DeviceMergeNode extends ProcessNode { // The result output order that this operator - private TraversalOrder mergeOrder; + private OrderBy mergeOrder; - // Owned devices - private List<String> ownedDeviceNameList; + // The policy to decide whether a row should be discarded + // The without policy is able to be push down to the DeviceMergeNode because we can know whether a row contains + // null or not. + private WithoutPolicy withoutPolicy; - // The map from deviceName to corresponding query result operator responsible for that device. - private Map<String, TimeJoinNode> upstreamMap; + // The map from deviceName to corresponding query result node responsible for that device. + // DeviceNode means the node whose output TsBlock contains the data belonged to one device. + private Map<String, PlanNode<TsBlock>> childDeviceNodeMap; + + public DeviceMergeNode(PlanNodeId id) { + super(id); + } + + public DeviceMergeNode(PlanNodeId id, Map<String, PlanNode<TsBlock>> deviceNodeMap) { + this(id); + this.childDeviceNodeMap = deviceNodeMap; + this.children.addAll(deviceNodeMap.values()); + } + + public void addChildDeviceNode(String deviceName, PlanNode<TsBlock> childNode) { + this.childDeviceNodeMap.put(deviceName, childNode); + this.children.add(childNode); + } } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/FillNode.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/FillNode.java index 8b08640..430948d 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/FillNode.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/FillNode.java @@ -3,14 +3,23 @@ package org.apache.iotdb.cluster.query.distribution.plan.process; import org.apache.iotdb.cluster.query.distribution.common.FillPolicy; import org.apache.iotdb.cluster.query.distribution.common.TsBlock; import org.apache.iotdb.cluster.query.distribution.plan.PlanNode; +import org.apache.iotdb.cluster.query.distribution.plan.PlanNodeId; /** - * FillOperator is used to fill the empty field in one row. + * FillNode is used to fill the empty field in one row. * - * Children type: [All the operators whose result set is Tablet] */ -public class FillNode extends ProcessNode<TsBlock> { +public class FillNode extends ProcessNode { - // The policy to discard the result from upstream operator + // The policy to discard the result from upstream node private FillPolicy fillPolicy; + + public FillNode(PlanNodeId id) { + super(id); + } + + public FillNode(PlanNodeId id, FillPolicy fillPolicy) { + this(id); + this.fillPolicy = fillPolicy; + } } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/FilterNode.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/FilterNode.java index a579d4e..e7a4fce 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/FilterNode.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/FilterNode.java @@ -1,17 +1,22 @@ package org.apache.iotdb.cluster.query.distribution.plan.process; -import org.apache.iotdb.cluster.query.distribution.common.TsBlock; -import org.apache.iotdb.cluster.query.distribution.plan.PlanNode; +import org.apache.iotdb.cluster.query.distribution.plan.PlanNodeId; import org.apache.iotdb.db.qp.logical.crud.FilterOperator; /** - * (We use FilterExecOperator to distinguish itself from the FilterOperator used in single-node IoTDB) - * The FilterExecOperator is responsible to filter the RowRecord from Tablet. - * - * Children type: [All the operators whose result set is Tablet] + * The FilterNode is responsible to filter the RowRecord from TsBlock. */ -public class FilterNode extends ProcessNode<TsBlock> { +public class FilterNode extends ProcessNode { // The filter private FilterOperator rowFilter; + + public FilterNode(PlanNodeId id) { + super(id); + } + + public FilterNode(PlanNodeId id, FilterOperator rowFilter) { + this(id); + this.rowFilter = rowFilter; + } } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/GroupByLevelNode.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/GroupByLevelNode.java index 2007900..54ff8eb 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/GroupByLevelNode.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/GroupByLevelNode.java @@ -1,25 +1,25 @@ package org.apache.iotdb.cluster.query.distribution.plan.process; -import org.apache.iotdb.cluster.query.distribution.common.GroupByTimeParameter; -import org.apache.iotdb.cluster.query.distribution.common.LevelBucketInfo; -import org.apache.iotdb.cluster.query.distribution.common.TsBlock; -import org.apache.iotdb.cluster.query.distribution.plan.PlanNode; +import org.apache.iotdb.cluster.query.distribution.plan.PlanNodeId; /** - * This operator is responsible for the final aggregation merge operation. - * It will arrange the data by time range firstly. And inside each time range, the data from same measurement and - * different devices will be rolled up by corresponding level into different buckets. - * If the bucketInfo is empty, the data from `same measurement and different devices` won't be rolled up. - * If the groupByTimeParameter is null, the data won't be split by time range. - * - * Children type: [SeriesAggregateOperator] + * This node is responsible for the final aggregation merge operation. + * It will process the data from TsBlock row by row. + * For one row, it will rollup the fields which have the same aggregate function and belong to one bucket. + * Here, that two columns belong to one bucket means the partial paths of device after rolling up in specific level + * are the same. + * For example, let's say there are two columns `root.sg.d1.s1` and `root.sg.d2.s1`. + * If the group by level parameter is [0, 1], then these two columns will belong to one bucket and the bucket name + * is `root.sg.*.s1`. + * If the group by level parameter is [0, 2], then these two columns will not belong to one bucket. And the total buckets + * are `root.*.d1.s1` and `root.*.d2.s1` */ -public class GroupByLevelNode extends ProcessNode<TsBlock> { +public class GroupByLevelNode extends ProcessNode { - // All the buckets that the SeriesBatchAggInfo from upstream will be divided into. - private LevelBucketInfo bucketInfo; + private int[] groupByLevels; - // The parameter of `group by time` - // The GroupByLevelOperator also need GroupByTimeParameter - private GroupByTimeParameter groupByTimeParameter; + public GroupByLevelNode(PlanNodeId id, int[] groupByLevels) { + super(id); + this.groupByLevels = groupByLevels; + } } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/LimitNode.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/LimitNode.java index e675086..63d4370 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/LimitNode.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/LimitNode.java @@ -1,15 +1,22 @@ package org.apache.iotdb.cluster.query.distribution.plan.process; -import org.apache.iotdb.cluster.query.distribution.common.TsBlock; -import org.apache.iotdb.cluster.query.distribution.plan.PlanNode; +import org.apache.iotdb.cluster.query.distribution.plan.PlanNodeId; /** - * LimitOperator is used to select top n result. It uses the default order of upstream operators + * LimitNode is used to select top n result. It uses the default order of upstream nodes * - * Children type: [All the operators whose result set is Tablet] */ -public class LimitNode extends ProcessNode<TsBlock> { +public class LimitNode extends ProcessNode { // The limit count private int limit; + + public LimitNode(PlanNodeId id) { + super(id); + } + + public LimitNode(PlanNodeId id, int limit) { + this(id); + this.limit = limit; + } } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/OffsetNode.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/OffsetNode.java index 78ac9eb..3ee19ab 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/OffsetNode.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/OffsetNode.java @@ -1,15 +1,22 @@ package org.apache.iotdb.cluster.query.distribution.plan.process; -import org.apache.iotdb.cluster.query.distribution.common.TsBlock; -import org.apache.iotdb.cluster.query.distribution.plan.PlanNode; +import org.apache.iotdb.cluster.query.distribution.plan.PlanNodeId; /** - * OffsetOperator is used to skip top n result from upstream operators. It uses the default order of upstream operators + * OffsetNode is used to skip top n result from upstream nodes. It uses the default order of upstream nodes * - * Children type: [All the operators whose result set is Tablet] */ -public class OffsetNode extends ProcessNode<TsBlock> { +public class OffsetNode extends ProcessNode { // The limit count private int offset; + + public OffsetNode(PlanNodeId id) { + super(id); + } + + public OffsetNode(PlanNodeId id, int offset) { + this(id); + this.offset = offset; + } } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/ProcessNode.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/ProcessNode.java index e7bffae..d5165b0 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/ProcessNode.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/ProcessNode.java @@ -1,6 +1,11 @@ package org.apache.iotdb.cluster.query.distribution.plan.process; +import org.apache.iotdb.cluster.query.distribution.common.TsBlock; import org.apache.iotdb.cluster.query.distribution.plan.PlanNode; +import org.apache.iotdb.cluster.query.distribution.plan.PlanNodeId; -public class ProcessNode<T> extends PlanNode<T> { +public class ProcessNode extends PlanNode<TsBlock> { + public ProcessNode(PlanNodeId id) { + super(id); + } } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/SeriesAggregateNode.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/SeriesAggregateNode.java deleted file mode 100644 index 46ef991..0000000 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/SeriesAggregateNode.java +++ /dev/null @@ -1,36 +0,0 @@ -package org.apache.iotdb.cluster.query.distribution.plan.process; - -import org.apache.iotdb.cluster.query.distribution.common.GroupByTimeParameter; -import org.apache.iotdb.cluster.query.distribution.common.SeriesBatchAggInfo; -import org.apache.iotdb.cluster.query.distribution.common.TsBlock; -import org.apache.iotdb.cluster.query.distribution.plan.PlanNode; -import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics; -import org.apache.iotdb.tsfile.read.common.TimeRange; - -/** - * SeriesAggregateOperator is responsible to do the aggregation calculation for one series. - * This operator will split data in one series into many groups by time range and do the aggregation calculation for each - * group. - * If there is no split parameter, it will return one result which is the aggregation result of all data in current series. - * - * Children type: [SeriesScanOperator] - */ -public class SeriesAggregateNode extends ProcessNode<TsBlock> { - - // The parameter of `group by time` - // Its value will be null if there is no `group by time` clause, - private GroupByTimeParameter groupByTimeParameter; - - // TODO: need consider how to represent the aggregation function and corresponding implementation - // We use a String to indicate the parameter temporarily - private String aggregationFunc; - - // This method will only be invoked by SeriesAggregateOperator - // It will return the statistics of the series in given time range - // When calculate the statistics, the operator should use the most optimized way to do that. In - // other words, using - // raw data is the final way to do that. - public Statistics<?> getNextStatisticBetween(TimeRange timeRange) { - return null; - } -} diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/SortNode.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/SortNode.java index a718247..705a4c1 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/SortNode.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/SortNode.java @@ -1,15 +1,22 @@ package org.apache.iotdb.cluster.query.distribution.plan.process; -import org.apache.iotdb.cluster.query.distribution.common.TsBlock; -import org.apache.iotdb.cluster.query.distribution.common.TraversalOrder; -import org.apache.iotdb.cluster.query.distribution.plan.PlanNode; +import org.apache.iotdb.cluster.query.distribution.common.OrderBy; +import org.apache.iotdb.cluster.query.distribution.plan.PlanNodeId; /** - * In general, the parameter in sortOperator should be pushed down to the upstream operators. - * In our optimized logical query plan, the sortOperator should not appear. + * In general, the parameter in sortNode should be pushed down to the upstream operators. + * In our optimized logical query plan, the sortNode should not appear. */ -public class SortNode extends ProcessNode<TsBlock> { +public class SortNode extends ProcessNode { - private TraversalOrder sortOrder; - + private OrderBy sortOrder; + + public SortNode(PlanNodeId id) { + super(id); + } + + public SortNode(PlanNodeId id, OrderBy sortOrder) { + this(id); + this.sortOrder = sortOrder; + } } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/TimeJoinNode.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/TimeJoinNode.java index 08a7617..ffbeb29 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/TimeJoinNode.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/TimeJoinNode.java @@ -1,24 +1,48 @@ package org.apache.iotdb.cluster.query.distribution.plan.process; import org.apache.iotdb.cluster.query.distribution.common.TsBlock; -import org.apache.iotdb.cluster.query.distribution.common.TraversalOrder; +import org.apache.iotdb.cluster.query.distribution.common.OrderBy; import org.apache.iotdb.cluster.query.distribution.common.WithoutPolicy; import org.apache.iotdb.cluster.query.distribution.plan.PlanNode; +import org.apache.iotdb.cluster.query.distribution.plan.PlanNodeId; + +import java.util.Arrays; /** - * TimeJoinOperator is responsible for join two or more series. - * The join algorithm is like outer join by timestamp column. + * TimeJoinOperator is responsible for join two or more TsBlock. + * The join algorithm is like outer join by timestamp column. It will join two or more TsBlock by Timestamp column. * The output result of TimeJoinOperator is sorted by timestamp - * - * Children type: [SeriesScanOperator] */ -public class TimeJoinNode extends ProcessNode<TsBlock> { +//TODO: define the TimeJoinMergeNode for distributed plan +public class TimeJoinNode extends ProcessNode { // This parameter indicates the order when executing multiway merge sort. - private TraversalOrder mergeOrder; + private OrderBy mergeOrder; // The policy to decide whether a row should be discarded // The without policy is able to be push down to the TimeJoinOperator because we can know whether a row contains - // null or not in this operator the situation won't be changed by the downstream operators. + // null or not. private WithoutPolicy withoutPolicy; + + public TimeJoinNode(PlanNodeId id) { + super(id); + this.mergeOrder = OrderBy.TIMESTAMP_ASC; + } + + public TimeJoinNode(PlanNodeId id, PlanNode<TsBlock>... children) { + super(id); + this.children.addAll(Arrays.asList(children)); + } + + public void addChild(PlanNode<TsBlock> child) { + this.children.add(child); + } + + public void setMergeOrder(OrderBy mergeOrder) { + this.mergeOrder = mergeOrder; + } + + public void setWithoutPolicy(WithoutPolicy withoutPolicy) { + this.withoutPolicy = withoutPolicy; + } } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/WithoutNode.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/WithoutNode.java index b3f1d9e..6afc25b 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/WithoutNode.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/process/WithoutNode.java @@ -1,16 +1,22 @@ package org.apache.iotdb.cluster.query.distribution.plan.process; -import org.apache.iotdb.cluster.query.distribution.common.TsBlock; import org.apache.iotdb.cluster.query.distribution.common.WithoutPolicy; -import org.apache.iotdb.cluster.query.distribution.plan.PlanNode; +import org.apache.iotdb.cluster.query.distribution.plan.PlanNodeId; /** - * WithoutOperator is used to discard specific result from upstream operators. - * - * Children type: [All the operators whose result set is Tablet] + * WithoutNode is used to discard specific rows from upstream node. */ -public class WithoutNode extends ProcessNode<TsBlock> { +public class WithoutNode extends ProcessNode { // The policy to discard the result from upstream operator private WithoutPolicy discardPolicy; + + public WithoutNode(PlanNodeId id) { + super(id); + } + + public WithoutNode(PlanNodeId id, WithoutPolicy discardPolicy) { + this(id); + this.discardPolicy = discardPolicy; + } } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/sink/CsvSinkNode.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/sink/CsvSinkNode.java index 3e9dc4c..6bafca7 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/sink/CsvSinkNode.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/sink/CsvSinkNode.java @@ -19,11 +19,20 @@ package org.apache.iotdb.cluster.query.distribution.plan.sink; -import org.apache.iotdb.cluster.query.distribution.common.SeriesBatchData; +import org.apache.iotdb.cluster.query.distribution.plan.PlanNodeId; + +public class CsvSinkNode extends SinkNode { + public CsvSinkNode(PlanNodeId id) { + super(id); + } -public class CsvSinkNode extends SinkNode<SeriesBatchData> { @Override public void close() throws Exception { } + + @Override + public void send() { + + } } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/sink/FragmentSinkNode.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/sink/FragmentSinkNode.java new file mode 100644 index 0000000..348c00d --- /dev/null +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/sink/FragmentSinkNode.java @@ -0,0 +1,20 @@ +package org.apache.iotdb.cluster.query.distribution.plan.sink; + + +import org.apache.iotdb.cluster.query.distribution.plan.PlanNodeId; + +public class FragmentSinkNode extends SinkNode { + public FragmentSinkNode(PlanNodeId id) { + super(id); + } + + @Override + public void send() { + + } + + @Override + public void close() throws Exception { + + } +} diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/sink/SinkNode.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/sink/SinkNode.java index a94a0ff..de8ab9a 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/sink/SinkNode.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/sink/SinkNode.java @@ -19,9 +19,15 @@ package org.apache.iotdb.cluster.query.distribution.plan.sink; +import org.apache.iotdb.cluster.query.distribution.common.TsBlock; import org.apache.iotdb.cluster.query.distribution.plan.PlanNode; +import org.apache.iotdb.cluster.query.distribution.plan.PlanNodeId; -// 构建与客户端的联系。 -public abstract class SinkNode<T> extends PlanNode<T> implements AutoCloseable { +public abstract class SinkNode extends PlanNode<TsBlock> implements AutoCloseable { + public SinkNode(PlanNodeId id) { + super(id); + } + + public abstract void send(); } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/sink/ThriftSinkNode.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/sink/ThriftSinkNode.java index beed5a0..17db1f0 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/sink/ThriftSinkNode.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/sink/ThriftSinkNode.java @@ -19,10 +19,22 @@ package org.apache.iotdb.cluster.query.distribution.plan.sink; -import org.apache.iotdb.cluster.query.distribution.common.SeriesBatchData; +import org.apache.iotdb.cluster.query.distribution.plan.PlanNodeId; -public class ThriftSinkNode extends SinkNode<SeriesBatchData> { +/** + * not implemented in current IoTDB yet + */ +public class ThriftSinkNode extends SinkNode { + + public ThriftSinkNode(PlanNodeId id) { + super(id); + } @Override public void close() throws Exception {} + + @Override + public void send() { + + } } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/source/CsvSourceNode.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/source/CsvSourceNode.java index 85f402a..5ff967e 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/source/CsvSourceNode.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/source/CsvSourceNode.java @@ -19,9 +19,16 @@ package org.apache.iotdb.cluster.query.distribution.plan.source; -import org.apache.iotdb.cluster.query.distribution.common.SeriesBatchData; +import org.apache.iotdb.cluster.query.distribution.plan.PlanNodeId; -public class CsvSourceNode extends SourceNode<SeriesBatchData> { +/** + * Not implemented in current version. + */ +public class CsvSourceNode extends SourceNode { + + public CsvSourceNode(PlanNodeId id) { + super(id); + } @Override public void close() throws Exception {} diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/source/SeriesAggregateNode.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/source/SeriesAggregateNode.java new file mode 100644 index 0000000..0a7de03 --- /dev/null +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/source/SeriesAggregateNode.java @@ -0,0 +1,63 @@ +package org.apache.iotdb.cluster.query.distribution.plan.source; + +import org.apache.iotdb.cluster.query.distribution.common.GroupByTimeParameter; +import org.apache.iotdb.cluster.query.distribution.plan.PlanNodeId; +import org.apache.iotdb.db.query.expression.unary.FunctionExpression; +import org.apache.iotdb.tsfile.read.filter.basic.Filter; + +/** + * SeriesAggregateOperator is responsible to do the aggregation calculation for one series. It will read the + * target series and calculate the aggregation result by the aggregation digest or raw data of this series. + * + * The aggregation result will be represented as a TsBlock + * + * This operator will split data of the target series into many groups by time range and do the aggregation calculation + * for each group. Each result will be one row of the result TsBlock. The timestamp of each row is the start time of the + * time range group. + * + * If there is no time range split parameter, the result TsBlock will only contain one row, which represent the whole + * aggregation result of this series. And the timestamp will be 0, which is meaningless. + */ +public class SeriesAggregateNode extends SourceNode { + + // The parameter of `group by time` + // Its value will be null if there is no `group by time` clause, + private GroupByTimeParameter groupByTimeParameter; + + // The aggregation function, which contains the function name and related series. + // (Currently we only support one series in the aggregation function) + // TODO: need consider whether it is suitable the aggregation function using FunctionExpression + private FunctionExpression aggregateFunc; + + private Filter filter; + + public SeriesAggregateNode(PlanNodeId id) { + super(id); + } + + public SeriesAggregateNode(PlanNodeId id, FunctionExpression aggregateFunc) { + this(id); + this.aggregateFunc = aggregateFunc; + } + + public SeriesAggregateNode(PlanNodeId id, FunctionExpression aggregateFunc, GroupByTimeParameter groupByTimeParameter) { + this(id, aggregateFunc); + this.groupByTimeParameter = groupByTimeParameter; + } + + @Override + public void open() throws Exception { + + } + + @Override + public void close() throws Exception { + + } + + // This method is used when do the PredicatePushDown. + // The filter is not put in the constructor because the filter is only clear in the predicate push-down stage + public void setFilter(Filter filter) { + this.filter = filter; + } +} diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/source/SeriesScanNode.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/source/SeriesScanNode.java index 7ff8978..738a6ea 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/source/SeriesScanNode.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/source/SeriesScanNode.java @@ -1,24 +1,18 @@ package org.apache.iotdb.cluster.query.distribution.plan.source; -import org.apache.iotdb.cluster.query.distribution.common.TraversalOrder; -import org.apache.iotdb.cluster.query.distribution.common.TsBlock; -import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics; +import org.apache.iotdb.cluster.query.distribution.common.OrderBy; +import org.apache.iotdb.cluster.query.distribution.plan.PlanNodeId; import org.apache.iotdb.tsfile.read.common.Path; -import org.apache.iotdb.tsfile.read.common.TimeRange; import org.apache.iotdb.tsfile.read.filter.basic.Filter; /** - * SeriesScanOperator is responsible for read data and pre-aggregated statistic for a specific - * series. When reading data, the SeriesScanOperator can read the raw data batch by batch. And also, - * it can leverage the filter and other info to decrease the result set. Besides, the - * SeriesScanOperator can read the pre-aggregated statistic in TsFile. And return the statistic with - * a fix time range one by one. If the time range is narrower than the smallest pre-aggregated - * statistic or has overlap with pre-aggregated statistic, the SeriesScanOperator will read the raw - * data and calculate the aggregation result for specific time range. + * SeriesScanOperator is responsible for read data a specific series. When reading data, the SeriesScanOperator + * can read the raw data batch by batch. And also, it can leverage the filter and other info to decrease the + * result set. * - * <p>Children type: [] + * <p>Children type: no child is allowed for SeriesScanNode */ -public class SeriesScanNode extends SourceNode<TsBlock> { +public class SeriesScanNode extends SourceNode { // The path of the target series which will be scanned. private Path seriesPath; @@ -26,20 +20,41 @@ public class SeriesScanNode extends SourceNode<TsBlock> { // The order to traverse the data. // Currently, we only support TIMESTAMP_ASC and TIMESTAMP_DESC here. // The default order is TIMESTAMP_ASC, which means "order by timestamp asc" - private TraversalOrder scanOrder = TraversalOrder.TIMESTAMP_ASC; + private OrderBy scanOrder = OrderBy.TIMESTAMP_ASC; // Filter data in current series. private Filter filter; // Limit for result set. The default value is -1, which means no limit - private int limit = -1; + private int limit; // offset for result set. The default value is 0 private int offset; + public SeriesScanNode(PlanNodeId id, Path seriesPath) { + super(id); + this.seriesPath = seriesPath; + } + + public void setFilter(Filter filter) { + this.filter = filter; + } + @Override public void close() throws Exception {} @Override public void open() throws Exception {} + + public void setScanOrder(OrderBy scanOrder) { + this.scanOrder = scanOrder; + } + + public void setLimit(int limit) { + this.limit = limit; + } + + public void setOffset(int offset) { + this.offset = offset; + } } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/source/SourceNode.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/source/SourceNode.java index ed0e39a..b72d0f7 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/source/SourceNode.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/plan/source/SourceNode.java @@ -19,9 +19,15 @@ package org.apache.iotdb.cluster.query.distribution.plan.source; +import org.apache.iotdb.cluster.query.distribution.common.TsBlock; import org.apache.iotdb.cluster.query.distribution.plan.PlanNode; +import org.apache.iotdb.cluster.query.distribution.plan.PlanNodeId; -public abstract class SourceNode<T> extends PlanNode<T> implements AutoCloseable{ +public abstract class SourceNode extends PlanNode<TsBlock> implements AutoCloseable{ + + public SourceNode(PlanNodeId id) { + super(id); + } public abstract void open() throws Exception; }
