This is an automated email from the ASF dual-hosted git repository. xingtanzjr pushed a commit to branch xingtanzjr/mpp-query-basis in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit f02e74eed0091af5d8a5b380c15a8537260baf02 Author: Steve Yurong Su <[email protected]> AuthorDate: Tue Mar 8 23:39:41 2022 +0800 design of source, sink and internal operators --- .../operator/internal/DeviceMergeOperator.java | 43 ++++++++++++++++ .../operator/internal/FillOperator.java | 25 ++++++++++ .../operator/internal/FilterInternalOperator.java | 26 ++++++++++ .../operator/internal/GroupByLevelOperator.java | 34 +++++++++++++ .../operator/internal/InternalOperator.java | 17 +++++++ .../operator/internal/LimitOperator.java | 24 +++++++++ .../operator/internal/OffsetOperator.java | 25 ++++++++++ .../operator/internal/SeriesAggregateOperator.java | 33 ++++++++++++ .../operator/internal/SortOperator.java | 23 +++++++++ .../operator/internal/TimeJoinOperator.java | 33 ++++++++++++ .../operator/internal/WithoutOperator.java | 25 ++++++++++ .../operator/source/SeriesScanOperator.java | 58 ++++++++++++++++++++++ 12 files changed, 366 insertions(+) diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/DeviceMergeOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/DeviceMergeOperator.java new file mode 100644 index 0000000..b912994 --- /dev/null +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/DeviceMergeOperator.java @@ -0,0 +1,43 @@ +package org.apache.iotdb.cluster.query.distribution.operator; + +import org.apache.iotdb.cluster.query.distribution.common.Tablet; +import org.apache.iotdb.cluster.query.distribution.common.TraversalOrder; + +import java.util.List; +import java.util.Map; + +/** + * DeviceMergeOperator is responsible for constructing a device-based view of a set of series. And + * output the result with specific order. The order could be 'order by device' or 'order by + * timestamp' + * + * <p>The types of involved devices should be same. If the device contains n series, the + * device-based view will contain n+2 columns, which are timestamp column, device name column and n + * value columns of involved series. + * + * <p>Children type: [TimeJoinOperator] + */ +public class DeviceMergeOperator extends InternalOperator<Tablet> { + // The result output order that this operator + private TraversalOrder mergeOrder; + + // Owned devices + private List<String> ownedDeviceNameList; + + // The map from deviceName to corresponding query result operator responsible for that device. + private Map<String, TimeJoinOperator> upstreamMap; + + @Override + public boolean hasNext() { + return false; + } + + // If the Tablet from TimeJoinOperator has n columns, the output of DeviceMergeOperator will + // contain n+1 columns where + // the additional column is `deviceName` + // And, the `alignedByDevice` in the TabletMetadata will be `true` + @Override + public Tablet getNextBatch() { + return null; + } +} diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/FillOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/FillOperator.java new file mode 100644 index 0000000..8ee610a --- /dev/null +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/FillOperator.java @@ -0,0 +1,25 @@ +package org.apache.iotdb.cluster.query.distribution.operator; + +import org.apache.iotdb.cluster.query.distribution.common.FillPolicy; +import org.apache.iotdb.cluster.query.distribution.common.Tablet; + +/** + * FillOperator is used to fill the empty field in one row. + * + * <p>Children type: [All the operators whose result set is Tablet] + */ +public class FillOperator extends InternalOperator<Tablet> { + + // The policy to discard the result from upstream operator + private FillPolicy fillPolicy; + + @Override + public boolean hasNext() { + return false; + } + + @Override + public Tablet getNextBatch() { + return null; + } +} diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/FilterInternalOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/FilterInternalOperator.java new file mode 100644 index 0000000..ecbda0f --- /dev/null +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/FilterInternalOperator.java @@ -0,0 +1,26 @@ +package org.apache.iotdb.cluster.query.distribution.operator; + +import org.apache.iotdb.cluster.query.distribution.common.Tablet; +import org.apache.iotdb.db.qp.logical.crud.FilterOperator; + +/** + * (We use FilterExecOperator to distinguish itself from the FilterOperator used in single-node + * IoTDB) The FilterExecOperator is responsible to filter the RowRecord from Tablet. + * + * <p>Children type: [All the operators whose result set is Tablet] + */ +public class FilterInternalOperator extends InternalOperator<Tablet> { + + // The filter + private FilterOperator rowFilter; + + @Override + public boolean hasNext() { + return false; + } + + @Override + public Tablet getNextBatch() { + return null; + } +} diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/GroupByLevelOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/GroupByLevelOperator.java new file mode 100644 index 0000000..c6e00d2 --- /dev/null +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/GroupByLevelOperator.java @@ -0,0 +1,34 @@ +package org.apache.iotdb.cluster.query.distribution.operator; + +import org.apache.iotdb.cluster.query.distribution.common.GroupByTimeParameter; +import org.apache.iotdb.cluster.query.distribution.common.LevelBucketInfo; +import org.apache.iotdb.cluster.query.distribution.common.Tablet; + +/** + * This operator is responsible for the final aggregation merge operation. It will arrange the data + * by time range firstly. And inside each time range, the data from same measurement and different + * devices will be rolled up by corresponding level into different buckets. If the bucketInfo is + * empty, the data from `same measurement and different devices` won't be rolled up. If the + * groupByTimeParameter is null, the data won't be split by time range. + * + * <p>Children type: [SeriesAggregateOperator] + */ +public class GroupByLevelOperator extends InternalOperator<Tablet> { + + // All the buckets that the SeriesBatchAggInfo from upstream will be divided into. + private LevelBucketInfo bucketInfo; + + // The parameter of `group by time` + // The GroupByLevelOperator also need GroupByTimeParameter + private GroupByTimeParameter groupByTimeParameter; + + @Override + public boolean hasNext() { + return false; + } + + @Override + public Tablet getNextBatch() { + return null; + } +} diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/InternalOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/InternalOperator.java new file mode 100644 index 0000000..0692381 --- /dev/null +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/InternalOperator.java @@ -0,0 +1,17 @@ +package org.apache.iotdb.cluster.query.distribution.operator; + +import org.apache.iotdb.cluster.query.distribution.common.TreeNode; + +/** + * @author xingtanzjr The base class of query executable operators, which is used to compose logical + * query plan. TODO: consider how to restrict the children type for each type of ExecOperator + */ +public abstract class InternalOperator<T> extends TreeNode<InternalOperator<?>> { + + // Judge whether current operator has more result + public abstract boolean hasNext(); + + // Get next result batch of this operator + // Return null if there is no more result to return + public abstract T getNextBatch(); +} diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/LimitOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/LimitOperator.java new file mode 100644 index 0000000..9075f64 --- /dev/null +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/LimitOperator.java @@ -0,0 +1,24 @@ +package org.apache.iotdb.cluster.query.distribution.operator; + +import org.apache.iotdb.cluster.query.distribution.common.Tablet; + +/** + * LimitOperator is used to select top n result. It uses the default order of upstream operators + * + * <p>Children type: [All the operators whose result set is Tablet] + */ +public class LimitOperator extends InternalOperator<Tablet> { + + // The limit count + private int limit; + + @Override + public boolean hasNext() { + return false; + } + + @Override + public Tablet getNextBatch() { + return null; + } +} diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/OffsetOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/OffsetOperator.java new file mode 100644 index 0000000..c6f79ac --- /dev/null +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/OffsetOperator.java @@ -0,0 +1,25 @@ +package org.apache.iotdb.cluster.query.distribution.operator; + +import org.apache.iotdb.cluster.query.distribution.common.Tablet; + +/** + * OffsetOperator is used to skip top n result from upstream operators. It uses the default order of + * upstream operators + * + * <p>Children type: [All the operators whose result set is Tablet] + */ +public class OffsetOperator extends InternalOperator<Tablet> { + + // The limit count + private int offset; + + @Override + public boolean hasNext() { + return false; + } + + @Override + public Tablet getNextBatch() { + return null; + } +} diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/SeriesAggregateOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/SeriesAggregateOperator.java new file mode 100644 index 0000000..96a5c86 --- /dev/null +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/SeriesAggregateOperator.java @@ -0,0 +1,33 @@ +package org.apache.iotdb.cluster.query.distribution.operator; + +import org.apache.iotdb.cluster.query.distribution.common.GroupByTimeParameter; +import org.apache.iotdb.cluster.query.distribution.common.SeriesBatchAggInfo; + +/** + * SeriesAggregateOperator is responsible to do the aggregation calculation for one series. This + * operator will split data in one series into many groups by time range and do the aggregation + * calculation for each group. If there is no split parameter, it will return one result which is + * the aggregation result of all data in current series. + * + * <p>Children type: [SeriesScanOperator] + */ +public class SeriesAggregateOperator extends InternalOperator<SeriesBatchAggInfo> { + + // The parameter of `group by time` + // Its value will be null if there is no `group by time` clause, + private GroupByTimeParameter groupByTimeParameter; + + // TODO: need consider how to represent the aggregation function and corresponding implementation + // We use a String to indicate the parameter temporarily + private String aggregationFunc; + + @Override + public boolean hasNext() { + return false; + } + + @Override + public SeriesBatchAggInfo getNextBatch() { + return null; + } +} diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/SortOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/SortOperator.java new file mode 100644 index 0000000..fa83be7 --- /dev/null +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/SortOperator.java @@ -0,0 +1,23 @@ +package org.apache.iotdb.cluster.query.distribution.operator; + +import org.apache.iotdb.cluster.query.distribution.common.Tablet; +import org.apache.iotdb.cluster.query.distribution.common.TraversalOrder; + +/** + * In general, the parameter in sortOperator should be pushed down to the upstream operators. In our + * optimized logical query plan, the sortOperator should not appear. + */ +public class SortOperator extends InternalOperator<Tablet> { + + private TraversalOrder sortOrder; + + @Override + public boolean hasNext() { + return false; + } + + @Override + public Tablet getNextBatch() { + return null; + } +} diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/TimeJoinOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/TimeJoinOperator.java new file mode 100644 index 0000000..c4ae6f3 --- /dev/null +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/TimeJoinOperator.java @@ -0,0 +1,33 @@ +package org.apache.iotdb.cluster.query.distribution.operator; + +import org.apache.iotdb.cluster.query.distribution.common.Tablet; +import org.apache.iotdb.cluster.query.distribution.common.TraversalOrder; +import org.apache.iotdb.cluster.query.distribution.common.WithoutPolicy; + +/** + * TimeJoinOperator is responsible for join two or more series. The join algorithm is like outer + * join by timestamp column. The output result of TimeJoinOperator is sorted by timestamp + * + * <p>Children type: [SeriesScanOperator] + */ +public class TimeJoinOperator extends InternalOperator<Tablet> { + + // This parameter indicates the order when executing multiway merge sort. + private TraversalOrder mergeOrder; + + // The policy to decide whether a row should be discarded + // The without policy is able to be push down to the TimeJoinOperator because we can know whether + // a row contains + // null or not in this operator the situation won't be changed by the downstream operators. + private WithoutPolicy withoutPolicy; + + @Override + public boolean hasNext() { + return false; + } + + @Override + public Tablet getNextBatch() { + return null; + } +} diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/WithoutOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/WithoutOperator.java new file mode 100644 index 0000000..535a3d6 --- /dev/null +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/internal/WithoutOperator.java @@ -0,0 +1,25 @@ +package org.apache.iotdb.cluster.query.distribution.operator; + +import org.apache.iotdb.cluster.query.distribution.common.Tablet; +import org.apache.iotdb.cluster.query.distribution.common.WithoutPolicy; + +/** + * WithoutOperator is used to discard specific result from upstream operators. + * + * <p>Children type: [All the operators whose result set is Tablet] + */ +public class WithoutOperator extends InternalOperator<Tablet> { + + // The policy to discard the result from upstream operator + private WithoutPolicy discardPolicy; + + @Override + public boolean hasNext() { + return false; + } + + @Override + public Tablet getNextBatch() { + return null; + } +} diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/source/SeriesScanOperator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/source/SeriesScanOperator.java new file mode 100644 index 0000000..9cd6043 --- /dev/null +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/distribution/operator/source/SeriesScanOperator.java @@ -0,0 +1,58 @@ +package org.apache.iotdb.cluster.query.distribution.operator; + +import org.apache.iotdb.cluster.query.distribution.common.SeriesBatchData; +import org.apache.iotdb.cluster.query.distribution.common.TraversalOrder; +import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics; +import org.apache.iotdb.tsfile.read.common.Path; +import org.apache.iotdb.tsfile.read.common.TimeRange; +import org.apache.iotdb.tsfile.read.filter.basic.Filter; + +/** + * SeriesScanOperator is responsible for read data and pre-aggregated statistic for a specific + * series. When reading data, the SeriesScanOperator can read the raw data batch by batch. And also, + * it can leverage the filter and other info to decrease the result set. Besides, the + * SeriesScanOperator can read the pre-aggregated statistic in TsFile. And return the statistic with + * a fix time range one by one. If the time range is narrower than the smallest pre-aggregated + * statistic or has overlap with pre-aggregated statistic, the SeriesScanOperator will read the raw + * data and calculate the aggregation result for specific time range. + * + * <p>Children type: [] + */ +public class SeriesScanOperator extends InternalOperator<SeriesBatchData> { + + // The path of the target series which will be scanned. + private Path seriesPath; + + // The order to traverse the data. + // Currently, we only support TIMESTAMP_ASC and TIMESTAMP_DESC here. + // The default order is TIMESTAMP_ASC, which means "order by timestamp asc" + private TraversalOrder scanOrder = TraversalOrder.TIMESTAMP_ASC; + + // Filter data in current series. + private Filter filter; + + // Limit for result set. The default value is -1, which means no limit + private int limit = -1; + + // offset for result set. The default value is 0 + private int offset; + + @Override + public boolean hasNext() { + return false; + } + + @Override + public SeriesBatchData getNextBatch() { + return null; + } + + // This method will only be invoked by SeriesAggregateOperator + // It will return the statistics of the series in given time range + // When calculate the statistics, the operator should use the most optimized way to do that. In + // other words, using + // raw data is the final way to do that. + public Statistics<?> getNextStatisticBetween(TimeRange timeRange) { + return null; + } +}
