This is an automated email from the ASF dual-hosted git repository. hui pushed a commit to branch ml/windowSet in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 9f03e74dae5a75e75cf5e4ad988d65ee1126cb8e Author: Minghui Liu <[email protected]> AuthorDate: Mon Nov 14 16:21:20 2022 +0800 implement WindowConcatOperator --- .../main/java/org/apache/iotdb/SessionExample.java | 4 +- .../process/{ => window}/WindowConcatOperator.java | 44 +++++++++--- .../operator/process/window/WindowSliceQueue.java | 80 ++++++++++++++++++++++ .../process/{ => window}/WindowSplitOperator.java | 3 +- .../db/mpp/plan/planner/OperatorTreeGenerator.java | 4 +- 5 files changed, 121 insertions(+), 14 deletions(-) diff --git a/example/session/src/main/java/org/apache/iotdb/SessionExample.java b/example/session/src/main/java/org/apache/iotdb/SessionExample.java index 9baf3a8ad2..a488d292a6 100644 --- a/example/session/src/main/java/org/apache/iotdb/SessionExample.java +++ b/example/session/src/main/java/org/apache/iotdb/SessionExample.java @@ -72,8 +72,8 @@ public class SessionExample { // set session fetchSize session.setFetchSize(10000); - List<String> queryPaths = Arrays.asList("root.sg1.d1.s1", "root.sg1.d2.s1"); - List<Integer> indexes = Arrays.asList(0, 1, 2, 3); + List<String> queryPaths = Arrays.asList("root.sg1.d1.s1", "root.sg1.d1.s2"); + List<Integer> indexes = Arrays.asList(1, 2, 6, 7); List<SessionDataSet> windowBatch = session.fetchWindowBatch(queryPaths, null, 0, 32, 4, 3, indexes); for (SessionDataSet window : windowBatch) { diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/WindowConcatOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/window/WindowConcatOperator.java similarity index 64% rename from server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/WindowConcatOperator.java rename to server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/window/WindowConcatOperator.java index 269b579feb..4dc9f14f2d 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/WindowConcatOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/window/WindowConcatOperator.java @@ -17,15 +17,15 @@ * under the License. */ -package org.apache.iotdb.db.mpp.execution.operator.process; +package org.apache.iotdb.db.mpp.execution.operator.process.window; import org.apache.iotdb.db.mpp.aggregation.timerangeiterator.ITimeRangeIterator; import org.apache.iotdb.db.mpp.execution.operator.Operator; import org.apache.iotdb.db.mpp.execution.operator.OperatorContext; +import org.apache.iotdb.db.mpp.execution.operator.process.ProcessOperator; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.read.common.TimeRange; import org.apache.iotdb.tsfile.read.common.block.TsBlock; -import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder; import java.util.List; @@ -34,13 +34,11 @@ public class WindowConcatOperator implements ProcessOperator { protected final OperatorContext operatorContext; protected final Operator child; - protected TsBlock inputTsBlock; - protected boolean canCallNext; private final ITimeRangeIterator sampleTimeRangeIterator; private TimeRange curTimeRange; - private final TsBlockBuilder resultTsBlockBuilder; + private final WindowSliceQueue windowSliceQueue; public WindowConcatOperator( OperatorContext operatorContext, @@ -50,7 +48,7 @@ public class WindowConcatOperator implements ProcessOperator { this.operatorContext = operatorContext; this.child = child; this.sampleTimeRangeIterator = sampleTimeRangeIterator; - this.resultTsBlockBuilder = new TsBlockBuilder(outputDataTypes); + this.windowSliceQueue = new WindowSliceQueue(outputDataTypes); } @Override @@ -60,17 +58,45 @@ public class WindowConcatOperator implements ProcessOperator { @Override public TsBlock next() { - return child.next(); + if (!child.hasNext()) { + curTimeRange = null; + return windowSliceQueue.outputWindow(); + } + + TsBlock inputTsBlock = child.next(); + if (inputTsBlock == null) { + return null; + } + + if (curTimeRange == null && sampleTimeRangeIterator.hasNextTimeRange()) { + curTimeRange = sampleTimeRangeIterator.nextTimeRange(); + windowSliceQueue.updateTimeRange(curTimeRange); + } + + if (inputTsBlock.getStartTime() > curTimeRange.getMax()) { + TsBlock outputWindow = windowSliceQueue.outputWindow(); + if (sampleTimeRangeIterator.hasNextTimeRange()) { + curTimeRange = sampleTimeRangeIterator.nextTimeRange(); + windowSliceQueue.updateTimeRange(curTimeRange); + } else { + curTimeRange = null; + } + windowSliceQueue.processTsBlock(inputTsBlock); + return outputWindow; + } else { + windowSliceQueue.processTsBlock(inputTsBlock); + return null; + } } @Override public boolean hasNext() { - return child.hasNext(); + return curTimeRange != null || sampleTimeRangeIterator.hasNextTimeRange(); } @Override public boolean isFinished() { - return child.isFinished(); + return !this.hasNext(); } @Override diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/window/WindowSliceQueue.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/window/WindowSliceQueue.java new file mode 100644 index 0000000000..59531d2ced --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/window/WindowSliceQueue.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.mpp.execution.operator.process.window; + +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.read.common.TimeRange; +import org.apache.iotdb.tsfile.read.common.block.TsBlock; +import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder; +import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder; +import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder; + +import java.util.Deque; +import java.util.LinkedList; +import java.util.List; + +public class WindowSliceQueue { + + // cached window slice + private final Deque<TsBlock> deque = new LinkedList<>(); + + private TimeRange curTimeRange; + + private final TsBlockBuilder windowBuilder; + + public WindowSliceQueue(List<TSDataType> dataTypeList) { + this.windowBuilder = new TsBlockBuilder(dataTypeList); + } + + public void processTsBlock(TsBlock tsBlock) { + deque.addLast(tsBlock); + } + + public void updateTimeRange(TimeRange curTimeRange) { + this.curTimeRange = curTimeRange; + evictingExpiredSlice(); + } + + public void evictingExpiredSlice() { + while (!deque.isEmpty() && !curTimeRange.contains(deque.getFirst().getStartTime())) { + deque.removeFirst(); + } + } + + public TsBlock outputWindow() { + windowBuilder.reset(); + + TimeColumnBuilder timeColumnBuilder = windowBuilder.getTimeColumnBuilder(); + ColumnBuilder[] columnBuilders = windowBuilder.getValueColumnBuilders(); + int valueColumnCount = columnBuilders.length; + + for (TsBlock windowSlice : deque) { + int positionCount = windowSlice.getPositionCount(); + for (int index = 0; index < positionCount; index++) { + timeColumnBuilder.write(windowSlice.getTimeColumn(), index); + for (int columnIndex = 0; columnIndex < valueColumnCount; columnIndex++) { + columnBuilders[columnIndex].write(windowSlice.getColumn(columnIndex), index); + } + } + windowBuilder.declarePositions(positionCount); + } + return windowBuilder.build(); + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/WindowSplitOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/window/WindowSplitOperator.java similarity index 97% rename from server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/WindowSplitOperator.java rename to server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/window/WindowSplitOperator.java index 6b9544b1a1..039822429d 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/WindowSplitOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/window/WindowSplitOperator.java @@ -17,11 +17,12 @@ * under the License. */ -package org.apache.iotdb.db.mpp.execution.operator.process; +package org.apache.iotdb.db.mpp.execution.operator.process.window; import org.apache.iotdb.db.mpp.aggregation.timerangeiterator.ITimeRangeIterator; import org.apache.iotdb.db.mpp.execution.operator.Operator; import org.apache.iotdb.db.mpp.execution.operator.OperatorContext; +import org.apache.iotdb.db.mpp.execution.operator.process.ProcessOperator; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.read.common.TimeRange; import org.apache.iotdb.tsfile.read.common.block.TsBlock; diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java index d839451544..eb50d6ab37 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java @@ -54,8 +54,6 @@ import org.apache.iotdb.db.mpp.execution.operator.process.RawDataAggregationOper import org.apache.iotdb.db.mpp.execution.operator.process.SlidingWindowAggregationOperator; import org.apache.iotdb.db.mpp.execution.operator.process.TagAggregationOperator; import org.apache.iotdb.db.mpp.execution.operator.process.TransformOperator; -import org.apache.iotdb.db.mpp.execution.operator.process.WindowConcatOperator; -import org.apache.iotdb.db.mpp.execution.operator.process.WindowSplitOperator; import org.apache.iotdb.db.mpp.execution.operator.process.fill.IFill; import org.apache.iotdb.db.mpp.execution.operator.process.fill.ILinearFill; import org.apache.iotdb.db.mpp.execution.operator.process.fill.constant.BinaryConstantFill; @@ -91,6 +89,8 @@ import org.apache.iotdb.db.mpp.execution.operator.process.last.LastQueryOperator import org.apache.iotdb.db.mpp.execution.operator.process.last.LastQuerySortOperator; import org.apache.iotdb.db.mpp.execution.operator.process.last.LastQueryUtil; import org.apache.iotdb.db.mpp.execution.operator.process.last.UpdateLastCacheOperator; +import org.apache.iotdb.db.mpp.execution.operator.process.window.WindowConcatOperator; +import org.apache.iotdb.db.mpp.execution.operator.process.window.WindowSplitOperator; import org.apache.iotdb.db.mpp.execution.operator.schema.CountMergeOperator; import org.apache.iotdb.db.mpp.execution.operator.schema.DevicesCountOperator; import org.apache.iotdb.db.mpp.execution.operator.schema.DevicesSchemaScanOperator;
