This is an automated email from the ASF dual-hosted git repository. xiangweiwei pushed a commit to branch aggregator in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit e9a1b58d1b32fee0924e5a8e8f0a519918f0e612 Author: Alima777 <[email protected]> AuthorDate: Fri Apr 29 09:55:01 2022 +0800 add processTsBlocks --- .../db/mpp/operator/aggregation/Aggregator.java | 28 ++++++++++++++++++++-- .../operator/aggregation/AggregatorFactory.java | 14 +---------- 2 files changed, 27 insertions(+), 15 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/Aggregator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/Aggregator.java index 94fe9080cb..8a05f460c5 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/Aggregator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/Aggregator.java @@ -25,6 +25,7 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics; import org.apache.iotdb.tsfile.read.common.TimeRange; import org.apache.iotdb.tsfile.read.common.block.TsBlock; +import org.apache.iotdb.tsfile.read.common.block.column.Column; import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder; import java.util.List; @@ -32,7 +33,8 @@ import java.util.List; public class Aggregator { private final Accumulator accumulator; - private final List<InputLocation> inputLocationList; + // In some intermediate result input, inputLocation[] should include two columns + private final List<InputLocation[]> inputLocationList; private final AggregationStep step; private final TSDataType intermediateType; private final TSDataType finalType; @@ -42,7 +44,7 @@ public class Aggregator { public Aggregator( Accumulator accumulator, AggregationStep step, - List<InputLocation> inputLocationList, + List<InputLocation[]> inputLocationList, TSDataType intermediateType, TSDataType finalType) { this.accumulator = accumulator; @@ -52,6 +54,7 @@ public class Aggregator { this.finalType = finalType; } + // Used for SeriesAggregateScanOperator public void processTsBlock(TsBlock tsBlock) { if (step.isInputRaw()) { accumulator.addInput(tsBlock.getTimeAndValueColumn(0), timeRange); @@ -60,6 +63,27 @@ public class Aggregator { } } + // Used for aggregateOperator + public void processTsBlocks(TsBlock[] tsBlock) { + for (InputLocation[] inputLocations : inputLocationList) { + if (step.isInputRaw()) { + TsBlock rawTsBlock = tsBlock[inputLocations[0].getTsBlockIndex()]; + Column[] timeValueColumn = new Column[2]; + timeValueColumn[0] = rawTsBlock.getTimeColumn(); + timeValueColumn[1] = rawTsBlock.getColumn(inputLocations[0].getValueColumnIndex()); + accumulator.addInput(timeValueColumn, timeRange); + } else { + Column[] columns = new Column[inputLocations.length]; + for (int i = 0; i < inputLocations.length; i++) { + columns[i] = + tsBlock[inputLocations[i].getTsBlockIndex()].getColumn( + inputLocations[i].getValueColumnIndex()); + } + accumulator.addIntermediate(columns); + } + } + } + public void outputResult(ColumnBuilder[] columnBuilder) { if (step.isOutputPartial()) { accumulator.outputIntermediate(columnBuilder); diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/AggregatorFactory.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/AggregatorFactory.java index ce493b3256..fb49e8c8c1 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/AggregatorFactory.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/AggregatorFactory.java @@ -19,16 +19,4 @@ package org.apache.iotdb.db.mpp.operator.aggregation; -public class AggregatorFactory { - - public static Aggregator createAggregator() { - Accumulator accumulator; - if (step.isInputRaw()) { - accumulator = accumulatorFactory.createAccumulator(lambdaProviders); - } else { - accumulator = accumulatorFactory.createIntermediateAccumulator(lambdaProviders); - } - return new Aggregator( - accumulator, step, intermediateType, finalType, inputChannels, maskChannel); - } -} +public class AggregatorFactory {}
