This is an automated email from the ASF dual-hosted git repository. xiangweiwei pushed a commit to branch aggregator in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit af1cc09402f3a15ea7ce47ce5af079fe2a0b3d3b Author: Alima777 <[email protected]> AuthorDate: Fri Apr 29 20:04:46 2022 +0800 implement part accumulator 2 --- .../db/mpp/operator/aggregation/Accumulator.java | 3 +- .../aggregation/FirstValueAccumulator.java | 90 +++++++++++++++++++++- .../operator/aggregation/LastValueAccumulator.java | 90 +++++++++++++++++++++- .../operator/aggregation/MaxTimeAccumulator.java | 80 ++++++++++++++++++- .../operator/aggregation/MinTimeAccumulator.java | 80 ++++++++++++++++++- 5 files changed, 338 insertions(+), 5 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/Accumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/Accumulator.java index 5833a75959..eaa7dd99e5 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/Accumulator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/Accumulator.java @@ -59,7 +59,8 @@ public interface Accumulator { void reset(); /** - * For first_value or last_value in decreasing order, we can get final result by the first record. + * This method can only be used in seriesAggregateScanOperator. For first_value or last_value in + * decreasing order, we can get final result by the first record. */ boolean hasFinalResult(); diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/FirstValueAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/FirstValueAccumulator.java index 49ad6a5133..6af6164b49 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/FirstValueAccumulator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/FirstValueAccumulator.java @@ -17,5 +17,93 @@ * under the License. */ -package org.apache.iotdb.db.mpp.operator.aggregation;public class FirstValueAccumulator { +package org.apache.iotdb.db.mpp.operator.aggregation; + +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics; +import org.apache.iotdb.tsfile.read.common.TimeRange; +import org.apache.iotdb.tsfile.read.common.block.column.Column; +import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder; +import org.apache.iotdb.tsfile.utils.TsPrimitiveType; + +public class FirstValueAccumulator implements Accumulator { + + private boolean hasCandidateResult; + private TsPrimitiveType firstValue; + private long minTime = Long.MAX_VALUE; + + public FirstValueAccumulator(TSDataType seriesDataType) { + firstValue = TsPrimitiveType.getByType(seriesDataType); + } + + // Column should be like: | Time | Value | + @Override + public void addInput(Column[] column, TimeRange timeRange) { + long curTime = column[0].getLong(0); + if (curTime < timeRange.getMax() && curTime >= timeRange.getMin()) { + updateFirstValue(column[1].getObject(0), curTime); + } + } + + // partialResult should be like: | FirstValue | MinTime | + @Override + public void addIntermediate(Column[] partialResult) { + if (partialResult.length != 2) { + throw new IllegalArgumentException("partialResult of FirstValue should be 2"); + } + updateFirstValue(partialResult[0].getObject(0), partialResult[1].getLong(0)); + } + + @Override + public void addStatistics(Statistics statistics) { + updateFirstValue(statistics.getFirstValue(), statistics.getStartTime()); + } + + // finalResult should be single column, like: | finalFirstValue | + @Override + public void setFinal(Column finalResult) { + reset(); + firstValue.setObject(finalResult.getObject(0)); + } + + // columnBuilder should be double in FirstValueAccumulator + @Override + public void outputIntermediate(ColumnBuilder[] columnBuilders) { + columnBuilders[0].writeObject(firstValue.getValue()); + columnBuilders[1].writeLong(minTime); + } + + @Override + public void outputFinal(ColumnBuilder columnBuilder) { + columnBuilder.writeObject(firstValue.getValue()); + } + + @Override + public void reset() { + this.minTime = Long.MAX_VALUE; + this.firstValue.reset(); + } + + @Override + public boolean hasFinalResult() { + return hasCandidateResult; + } + + @Override + public TSDataType[] getIntermediateType() { + return new TSDataType[] {firstValue.getDataType(), TSDataType.INT64}; + } + + @Override + public TSDataType getFinalType() { + return firstValue.getDataType(); + } + + private void updateFirstValue(Object value, long curTime) { + hasCandidateResult = true; + if (curTime < minTime) { + minTime = curTime; + firstValue.setObject(value); + } + } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/LastValueAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/LastValueAccumulator.java index 97183aae52..1ecd65ae61 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/LastValueAccumulator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/LastValueAccumulator.java @@ -17,5 +17,93 @@ * under the License. */ -package org.apache.iotdb.db.mpp.operator.aggregation;public class LastValueAccumulator { +package org.apache.iotdb.db.mpp.operator.aggregation; + +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics; +import org.apache.iotdb.tsfile.read.common.TimeRange; +import org.apache.iotdb.tsfile.read.common.block.column.Column; +import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder; +import org.apache.iotdb.tsfile.utils.TsPrimitiveType; + +public class LastValueAccumulator implements Accumulator { + + private TsPrimitiveType lastValue; + private long maxTime = Long.MIN_VALUE; + + public LastValueAccumulator(TSDataType seriesDataType) { + lastValue = TsPrimitiveType.getByType(seriesDataType); + } + + // Column should be like: | Time | Value | + @Override + public void addInput(Column[] column, TimeRange timeRange) { + for (int i = 0; i < column[0].getPositionCount(); i++) { + long curTime = column[0].getLong(i); + if (curTime >= timeRange.getMin() && curTime < timeRange.getMax()) { + updateLastValue(column[1].getObject(0), curTime); + } + } + } + + // partialResult should be like: | LastValue | MaxTime | + @Override + public void addIntermediate(Column[] partialResult) { + if (partialResult.length != 2) { + throw new IllegalArgumentException("partialResult of LastValue should be 2"); + } + updateLastValue(partialResult[0].getObject(0), partialResult[1].getLong(0)); + } + + @Override + public void addStatistics(Statistics statistics) { + updateLastValue(statistics.getLastValue(), statistics.getEndTime()); + } + + // finalResult should be single column, like: | finalLastValue | + @Override + public void setFinal(Column finalResult) { + reset(); + lastValue.setObject(finalResult.getObject(0)); + } + + // columnBuilder should be double in LastValueAccumulator + @Override + public void outputIntermediate(ColumnBuilder[] columnBuilders) { + columnBuilders[0].writeObject(lastValue.getValue()); + columnBuilders[1].writeLong(maxTime); + } + + @Override + public void outputFinal(ColumnBuilder columnBuilder) { + columnBuilder.writeObject(lastValue.getValue()); + } + + @Override + public void reset() { + this.maxTime = Long.MIN_VALUE; + this.lastValue.reset(); + } + + @Override + public boolean hasFinalResult() { + return false; + } + + @Override + public TSDataType[] getIntermediateType() { + return new TSDataType[] {lastValue.getDataType(), TSDataType.INT64}; + } + + @Override + public TSDataType getFinalType() { + return lastValue.getDataType(); + } + + private void updateLastValue(Object value, long curTime) { + if (curTime > maxTime) { + maxTime = curTime; + lastValue.setObject(value); + } + } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/MaxTimeAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/MaxTimeAccumulator.java index c5d03f8442..3addbf26d9 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/MaxTimeAccumulator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/MaxTimeAccumulator.java @@ -17,5 +17,83 @@ * under the License. */ -package org.apache.iotdb.db.mpp.operator.aggregation;public class MaxTimeAccumulator { +package org.apache.iotdb.db.mpp.operator.aggregation; + +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics; +import org.apache.iotdb.tsfile.read.common.TimeRange; +import org.apache.iotdb.tsfile.read.common.block.column.Column; +import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder; + +public class MaxTimeAccumulator implements Accumulator { + + private long maxTime = Long.MIN_VALUE; + + public MaxTimeAccumulator() {} + + // Column should be like: | Time | + @Override + public void addInput(Column[] column, TimeRange timeRange) { + for (int i = 0; i < column[0].getPositionCount(); i++) { + long curTime = column[0].getLong(i); + if (curTime >= timeRange.getMin() && curTime < timeRange.getMax()) { + updateMaxTime(curTime); + } + } + } + + // partialResult should be like: | partialMaxTimeValue | + @Override + public void addIntermediate(Column[] partialResult) { + if (partialResult.length != 1) { + throw new IllegalArgumentException("partialResult of MaxTime should be 1"); + } + updateMaxTime(partialResult[0].getLong(0)); + } + + @Override + public void addStatistics(Statistics statistics) { + updateMaxTime(statistics.getEndTime()); + } + + // finalResult should be single column, like: | finalMaxTime | + @Override + public void setFinal(Column finalResult) { + maxTime = finalResult.getLong(0); + } + + // columnBuilder should be single in maxTimeAccumulator + @Override + public void outputIntermediate(ColumnBuilder[] columnBuilders) { + columnBuilders[0].writeLong(maxTime); + } + + @Override + public void outputFinal(ColumnBuilder columnBuilder) { + columnBuilder.writeLong(maxTime); + } + + @Override + public void reset() { + this.maxTime = Long.MIN_VALUE; + } + + @Override + public boolean hasFinalResult() { + return false; + } + + @Override + public TSDataType[] getIntermediateType() { + return new TSDataType[] {TSDataType.INT64}; + } + + @Override + public TSDataType getFinalType() { + return TSDataType.INT64; + } + + private void updateMaxTime(long curTime) { + maxTime = Math.max(maxTime, curTime); + } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/MinTimeAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/MinTimeAccumulator.java index 95bf611acf..893d8436eb 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/MinTimeAccumulator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/MinTimeAccumulator.java @@ -17,5 +17,83 @@ * under the License. */ -package org.apache.iotdb.db.mpp.operator.aggregation;public class MinTimeAccumulator { +package org.apache.iotdb.db.mpp.operator.aggregation; + +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics; +import org.apache.iotdb.tsfile.read.common.TimeRange; +import org.apache.iotdb.tsfile.read.common.block.column.Column; +import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder; + +public class MinTimeAccumulator implements Accumulator { + + private boolean hasCandidateResult; + private long minTime = Long.MAX_VALUE; + + public MinTimeAccumulator() {} + + // Column should be like: | Time | + @Override + public void addInput(Column[] column, TimeRange timeRange) { + long curTime = column[0].getLong(0); + if (curTime < timeRange.getMax() && curTime >= timeRange.getMin()) { + updateMinTime(curTime); + } + } + + // partialResult should be like: | partialMinTimeValue | + @Override + public void addIntermediate(Column[] partialResult) { + if (partialResult.length != 1) { + throw new IllegalArgumentException("partialResult of MinTime should be 1"); + } + updateMinTime(partialResult[0].getLong(0)); + } + + @Override + public void addStatistics(Statistics statistics) { + updateMinTime(statistics.getStartTime()); + } + + // finalResult should be single column, like: | finalMinTime | + @Override + public void setFinal(Column finalResult) { + minTime = finalResult.getLong(0); + } + + // columnBuilder should be single in minTimeAccumulator + @Override + public void outputIntermediate(ColumnBuilder[] columnBuilders) { + columnBuilders[0].writeLong(minTime); + } + + @Override + public void outputFinal(ColumnBuilder columnBuilder) { + columnBuilder.writeLong(minTime); + } + + @Override + public void reset() { + this.minTime = Long.MAX_VALUE; + } + + @Override + public boolean hasFinalResult() { + return hasCandidateResult; + } + + @Override + public TSDataType[] getIntermediateType() { + return new TSDataType[] {TSDataType.INT64}; + } + + @Override + public TSDataType getFinalType() { + return TSDataType.INT64; + } + + private void updateMinTime(long curTime) { + hasCandidateResult = true; + minTime = Math.min(minTime, curTime); + } }
