This is an automated email from the ASF dual-hosted git repository. hui pushed a commit to branch ml/windowSet in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 64b03425133cc6b758e93762b92994818638d7fe Author: Minghui Liu <[email protected]> AuthorDate: Mon Nov 14 10:27:00 2022 +0800 add WindowConcatOperator --- .../TimeRangeIteratorFactory.java | 3 +- ...plitOperator.java => WindowConcatOperator.java} | 82 ++-------------------- .../operator/process/WindowSplitOperator.java | 12 ++-- .../db/mpp/plan/planner/OperatorTreeGenerator.java | 32 ++++++++- 4 files changed, 45 insertions(+), 84 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/TimeRangeIteratorFactory.java b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/TimeRangeIteratorFactory.java index cd0ac8a08e..552cca4bb2 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/TimeRangeIteratorFactory.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/aggregation/timerangeiterator/TimeRangeIteratorFactory.java @@ -80,7 +80,8 @@ public class TimeRangeIteratorFactory { long endTime, long interval, long slidingStep, - List<Integer> samplingIndexes) { + List<Integer> samplingIndexes, + boolean outputPartialTimeWindow) { return new SampleWindowIterator(startTime, endTime, interval, slidingStep, samplingIndexes); } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/WindowSplitOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/WindowConcatOperator.java similarity index 50% copy from server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/WindowSplitOperator.java copy to server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/WindowConcatOperator.java index d8d4ebc1f7..1f40016f9a 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/WindowSplitOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/WindowConcatOperator.java @@ -26,15 +26,10 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.read.common.TimeRange; import org.apache.iotdb.tsfile.read.common.block.TsBlock; import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder; -import org.apache.iotdb.tsfile.read.common.block.TsBlockUtil; -import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder; -import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder; - -import com.google.common.util.concurrent.ListenableFuture; import java.util.List; -public class WindowSplitOperator implements ProcessOperator { +public class WindowConcatOperator implements ProcessOperator { protected final OperatorContext operatorContext; @@ -47,7 +42,7 @@ public class WindowSplitOperator implements ProcessOperator { private final TsBlockBuilder resultTsBlockBuilder; - public WindowSplitOperator( + public WindowConcatOperator( OperatorContext operatorContext, Operator child, ITimeRangeIterator sampleTimeRangeIterator, @@ -60,87 +55,22 @@ public class WindowSplitOperator implements ProcessOperator { @Override public OperatorContext getOperatorContext() { - return operatorContext; - } - - @Override - public ListenableFuture<?> isBlocked() { - return child.isBlocked(); + return null; } @Override public TsBlock next() { - // reset operator state - canCallNext = true; - - if (curTimeRange == null && sampleTimeRangeIterator.hasNextTimeRange()) { - // move to next time window - curTimeRange = sampleTimeRangeIterator.nextTimeRange(); - } - - if (!fetchData()) { - return null; - } else { - curTimeRange = null; - TsBlock resultTsBlock = resultTsBlockBuilder.build(); - resultTsBlockBuilder.reset(); - return resultTsBlock; - } - } - - private boolean fetchData() { - while (!consumeInput()) { - // NOTE: child.next() can only be invoked once - if (child.hasNext() && canCallNext) { - inputTsBlock = child.next(); - canCallNext = false; - } else { - return false; - } - } - return true; - } - - private boolean consumeInput() { - if (inputTsBlock == null) { - return false; - } - - inputTsBlock = TsBlockUtil.skipPointsOutOfTimeRange(inputTsBlock, curTimeRange, true); - if (inputTsBlock == null) { - return false; - } - - for (int readIndex = 0; readIndex < inputTsBlock.getPositionCount(); readIndex++) { - long time = inputTsBlock.getTimeByIndex(readIndex); - if (curTimeRange.contains(time)) { - writeData(readIndex); - } else { - inputTsBlock = inputTsBlock.subTsBlock(readIndex); - return true; - } - } - return false; - } - - private void writeData(int readIndex) { - TimeColumnBuilder timeColumnBuilder = resultTsBlockBuilder.getTimeColumnBuilder(); - timeColumnBuilder.writeLong(inputTsBlock.getTimeByIndex(readIndex)); - ColumnBuilder[] columnBuilders = resultTsBlockBuilder.getValueColumnBuilders(); - for (int columnIndex = 0; columnIndex < columnBuilders.length; columnIndex++) { - columnBuilders[columnIndex].write(inputTsBlock.getColumn(columnIndex), readIndex); - } - resultTsBlockBuilder.declarePosition(); + return null; } @Override public boolean hasNext() { - return curTimeRange != null || sampleTimeRangeIterator.hasNextTimeRange(); + return false; } @Override public boolean isFinished() { - return !this.hasNext(); + return false; } @Override diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/WindowSplitOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/WindowSplitOperator.java index d8d4ebc1f7..f2ee804f75 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/WindowSplitOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/WindowSplitOperator.java @@ -42,7 +42,7 @@ public class WindowSplitOperator implements ProcessOperator { protected TsBlock inputTsBlock; protected boolean canCallNext; - private final ITimeRangeIterator sampleTimeRangeIterator; + private final ITimeRangeIterator sampleTimeRangeSliceIterator; private TimeRange curTimeRange; private final TsBlockBuilder resultTsBlockBuilder; @@ -50,11 +50,11 @@ public class WindowSplitOperator implements ProcessOperator { public WindowSplitOperator( OperatorContext operatorContext, Operator child, - ITimeRangeIterator sampleTimeRangeIterator, + ITimeRangeIterator sampleTimeRangeSliceIterator, List<TSDataType> outputDataTypes) { this.operatorContext = operatorContext; this.child = child; - this.sampleTimeRangeIterator = sampleTimeRangeIterator; + this.sampleTimeRangeSliceIterator = sampleTimeRangeSliceIterator; this.resultTsBlockBuilder = new TsBlockBuilder(outputDataTypes); } @@ -73,9 +73,9 @@ public class WindowSplitOperator implements ProcessOperator { // reset operator state canCallNext = true; - if (curTimeRange == null && sampleTimeRangeIterator.hasNextTimeRange()) { + if (curTimeRange == null && sampleTimeRangeSliceIterator.hasNextTimeRange()) { // move to next time window - curTimeRange = sampleTimeRangeIterator.nextTimeRange(); + curTimeRange = sampleTimeRangeSliceIterator.nextTimeRange(); } if (!fetchData()) { @@ -135,7 +135,7 @@ public class WindowSplitOperator implements ProcessOperator { @Override public boolean hasNext() { - return curTimeRange != null || sampleTimeRangeIterator.hasNextTimeRange(); + return curTimeRange != null || sampleTimeRangeSliceIterator.hasNextTimeRange(); } @Override diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java index 4c69cd5720..888c3f94a7 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java @@ -54,6 +54,7 @@ import org.apache.iotdb.db.mpp.execution.operator.process.RawDataAggregationOper import org.apache.iotdb.db.mpp.execution.operator.process.SlidingWindowAggregationOperator; import org.apache.iotdb.db.mpp.execution.operator.process.TagAggregationOperator; import org.apache.iotdb.db.mpp.execution.operator.process.TransformOperator; +import org.apache.iotdb.db.mpp.execution.operator.process.WindowConcatOperator; import org.apache.iotdb.db.mpp.execution.operator.process.WindowSplitOperator; import org.apache.iotdb.db.mpp.execution.operator.process.fill.IFill; import org.apache.iotdb.db.mpp.execution.operator.process.fill.ILinearFill; @@ -149,6 +150,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SlidingWindowAggre import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SortNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TimeJoinNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TransformNode; +import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.WindowConcatNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.WindowSplitNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.last.LastQueryCollectNode; import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.last.LastQueryMergeNode; @@ -1596,7 +1598,8 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP groupByTimeParameter.getEndTime(), groupByTimeParameter.getInterval(), groupByTimeParameter.getSlidingStep(), - node.getSamplingIndexes()); + node.getSamplingIndexes(), + true); List<TSDataType> outputDataTypes = getOutputColumnTypes(node, context.getTypeProvider()); @@ -1604,6 +1607,33 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP return new WindowSplitOperator(operatorContext, child, timeRangeIterator, outputDataTypes); } + @Override + public Operator visitWindowConcat(WindowConcatNode node, LocalExecutionPlanContext context) { + Operator child = node.getChild().accept(this, context); + OperatorContext operatorContext = + context + .getInstanceContext() + .addOperatorContext( + context.getNextOperatorId(), + node.getPlanNodeId(), + WindowConcatOperator.class.getSimpleName()); + + GroupByTimeParameter groupByTimeParameter = node.getGroupByTimeParameter(); + ITimeRangeIterator timeRangeIterator = + TimeRangeIteratorFactory.getSampleTimeRangeIterator( + groupByTimeParameter.getStartTime(), + groupByTimeParameter.getEndTime(), + groupByTimeParameter.getInterval(), + groupByTimeParameter.getSlidingStep(), + node.getSamplingIndexes(), + false); + + List<TSDataType> outputDataTypes = getOutputColumnTypes(node, context.getTypeProvider()); + + context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1); + return new WindowConcatOperator(operatorContext, child, timeRangeIterator, outputDataTypes); + } + @Override public Operator visitSchemaFetchMerge( SchemaFetchMergeNode node, LocalExecutionPlanContext context) {
