This is an automated email from the ASF dual-hosted git repository. xiangweiwei pushed a commit to branch aggregator in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit aa5adcef267884faf6cf97c22caa3236d44210d1 Author: Alima777 <[email protected]> AuthorDate: Sun May 1 14:49:58 2022 +0800 add desc accumulator --- .../operator/aggregation/AccumulatorFactory.java | 14 +++--- .../db/mpp/operator/aggregation/Aggregator.java | 9 +++- .../aggregation/FirstValueAccumulator.java | 8 ++-- .../aggregation/FirstValueDescAccumulator.java | 47 +++++++++++++++++++ .../operator/aggregation/LastValueAccumulator.java | 6 +-- .../aggregation/LastValueDescAccumulator.java | 52 ++++++++++++++++++++++ .../operator/aggregation/MaxTimeAccumulator.java | 4 +- .../aggregation/MaxTimeDescAccumulator.java | 47 +++++++++++++++++++ .../operator/aggregation/MinTimeAccumulator.java | 6 +-- .../aggregation/MinTimeDescAccumulator.java | 42 +++++++++++++++++ .../operator/SeriesAggregateScanOperatorTest.java | 6 +-- 11 files changed, 220 insertions(+), 21 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/AccumulatorFactory.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/AccumulatorFactory.java index e389d543be..10619b9e35 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/AccumulatorFactory.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/AccumulatorFactory.java @@ -26,7 +26,7 @@ public class AccumulatorFactory { // TODO: Are we going to create different seriesScanOperator based on order by sequence? public static Accumulator createAccumulator( - AggregationType aggregationType, TSDataType tsDataType) { + AggregationType aggregationType, TSDataType tsDataType, boolean ascending) { switch (aggregationType) { case COUNT: return new CountAccumulator(); @@ -37,17 +37,21 @@ public class AccumulatorFactory { case EXTREME: return new ExtremeAccumulator(tsDataType); case MAX_TIME: - return new MaxTimeAccumulator(); + return ascending ? new MaxTimeAccumulator() : new MaxTimeDescAccumulator(); case MIN_TIME: - return new MinTimeAccumulator(); + return ascending ? new MinTimeAccumulator() : new MinTimeDescAccumulator(); case MAX_VALUE: return new MaxValueAccumulator(tsDataType); case MIN_VALUE: return new MinValueAccumulator(tsDataType); case LAST_VALUE: - return new LastValueAccumulator(tsDataType); + return ascending + ? new LastValueAccumulator(tsDataType) + : new LastValueDescAccumulator(tsDataType); case FIRST_VALUE: - return new FirstValueAccumulator(tsDataType); + return ascending + ? new FirstValueAccumulator(tsDataType) + : new FirstValueDescAccumulator(tsDataType); default: throw new IllegalArgumentException("Invalid Aggregation function: " + aggregationType); } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/Aggregator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/Aggregator.java index a808c734bc..617d89d4ae 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/Aggregator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/Aggregator.java @@ -34,11 +34,18 @@ public class Aggregator { private final Accumulator accumulator; // In some intermediate result input, inputLocation[] should include two columns - private final List<InputLocation[]> inputLocationList; + private List<InputLocation[]> inputLocationList; private final AggregationStep step; private TimeRange timeRange = new TimeRange(0, Long.MAX_VALUE); + // Used for SeriesAggregateScanOperator + public Aggregator(Accumulator accumulator, AggregationStep step) { + this.accumulator = accumulator; + this.step = step; + } + + // Used for aggregateOperator public Aggregator( Accumulator accumulator, AggregationStep step, List<InputLocation[]> inputLocationList) { this.accumulator = accumulator; diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/FirstValueAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/FirstValueAccumulator.java index 6af6164b49..8fa0801faf 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/FirstValueAccumulator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/FirstValueAccumulator.java @@ -28,9 +28,9 @@ import org.apache.iotdb.tsfile.utils.TsPrimitiveType; public class FirstValueAccumulator implements Accumulator { - private boolean hasCandidateResult; - private TsPrimitiveType firstValue; - private long minTime = Long.MAX_VALUE; + protected boolean hasCandidateResult; + protected TsPrimitiveType firstValue; + protected long minTime = Long.MAX_VALUE; public FirstValueAccumulator(TSDataType seriesDataType) { firstValue = TsPrimitiveType.getByType(seriesDataType); @@ -99,7 +99,7 @@ public class FirstValueAccumulator implements Accumulator { return firstValue.getDataType(); } - private void updateFirstValue(Object value, long curTime) { + protected void updateFirstValue(Object value, long curTime) { hasCandidateResult = true; if (curTime < minTime) { minTime = curTime; diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/FirstValueDescAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/FirstValueDescAccumulator.java new file mode 100644 index 0000000000..87b939438e --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/FirstValueDescAccumulator.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.mpp.operator.aggregation; + +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.read.common.TimeRange; +import org.apache.iotdb.tsfile.read.common.block.column.Column; + +public class FirstValueDescAccumulator extends FirstValueAccumulator { + + public FirstValueDescAccumulator(TSDataType seriesDataType) { + super(seriesDataType); + } + + // Column should be like: | Time | Value | + @Override + public void addInput(Column[] column, TimeRange timeRange) { + for (int i = 0; i < column[0].getPositionCount(); i++) { + long curTime = column[0].getLong(i); + if (curTime >= timeRange.getMin() && curTime < timeRange.getMax()) { + updateFirstValue(column[1].getObject(0), curTime); + } + } + } + + @Override + public boolean hasFinalResult() { + return false; + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/LastValueAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/LastValueAccumulator.java index 1ecd65ae61..901759b687 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/LastValueAccumulator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/LastValueAccumulator.java @@ -28,8 +28,8 @@ import org.apache.iotdb.tsfile.utils.TsPrimitiveType; public class LastValueAccumulator implements Accumulator { - private TsPrimitiveType lastValue; - private long maxTime = Long.MIN_VALUE; + protected TsPrimitiveType lastValue; + protected long maxTime = Long.MIN_VALUE; public LastValueAccumulator(TSDataType seriesDataType) { lastValue = TsPrimitiveType.getByType(seriesDataType); @@ -100,7 +100,7 @@ public class LastValueAccumulator implements Accumulator { return lastValue.getDataType(); } - private void updateLastValue(Object value, long curTime) { + protected void updateLastValue(Object value, long curTime) { if (curTime > maxTime) { maxTime = curTime; lastValue.setObject(value); diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/LastValueDescAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/LastValueDescAccumulator.java new file mode 100644 index 0000000000..3d3f61644f --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/LastValueDescAccumulator.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.mpp.operator.aggregation; + +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.read.common.TimeRange; +import org.apache.iotdb.tsfile.read.common.block.column.Column; + +public class LastValueDescAccumulator extends LastValueAccumulator { + + private boolean hasCandidateResult = false; + + public LastValueDescAccumulator(TSDataType seriesDataType) { + super(seriesDataType); + } + + // Column should be like: | Time | Value | + @Override + public void addInput(Column[] column, TimeRange timeRange) { + long curTime = column[0].getLong(0); + if (curTime < timeRange.getMax() && curTime >= timeRange.getMin()) { + updateLastValue(column[1].getObject(0), curTime); + } + } + + @Override + public boolean hasFinalResult() { + return hasCandidateResult; + } + + protected void updateLastValue(Object value, long curTime) { + hasCandidateResult = true; + super.updateLastValue(value, curTime); + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/MaxTimeAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/MaxTimeAccumulator.java index 3addbf26d9..cda25af4a1 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/MaxTimeAccumulator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/MaxTimeAccumulator.java @@ -27,7 +27,7 @@ import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder; public class MaxTimeAccumulator implements Accumulator { - private long maxTime = Long.MIN_VALUE; + protected long maxTime = Long.MIN_VALUE; public MaxTimeAccumulator() {} @@ -93,7 +93,7 @@ public class MaxTimeAccumulator implements Accumulator { return TSDataType.INT64; } - private void updateMaxTime(long curTime) { + protected void updateMaxTime(long curTime) { maxTime = Math.max(maxTime, curTime); } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/MaxTimeDescAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/MaxTimeDescAccumulator.java new file mode 100644 index 0000000000..01fb65f541 --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/MaxTimeDescAccumulator.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.mpp.operator.aggregation; + +import org.apache.iotdb.tsfile.read.common.TimeRange; +import org.apache.iotdb.tsfile.read.common.block.column.Column; + +public class MaxTimeDescAccumulator extends MaxTimeAccumulator { + + private boolean hasCandidateResult = false; + + // Column should be like: | Time | + @Override + public void addInput(Column[] column, TimeRange timeRange) { + long curTime = column[0].getLong(0); + if (curTime < timeRange.getMax() && curTime >= timeRange.getMin()) { + updateMaxTime(curTime); + } + } + + @Override + public boolean hasFinalResult() { + return hasCandidateResult; + } + + protected void updateMaxTime(long curTime) { + hasCandidateResult = true; + super.updateMaxTime(curTime); + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/MinTimeAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/MinTimeAccumulator.java index 893d8436eb..b80adbe470 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/MinTimeAccumulator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/MinTimeAccumulator.java @@ -27,8 +27,8 @@ import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder; public class MinTimeAccumulator implements Accumulator { - private boolean hasCandidateResult; - private long minTime = Long.MAX_VALUE; + protected boolean hasCandidateResult; + protected long minTime = Long.MAX_VALUE; public MinTimeAccumulator() {} @@ -92,7 +92,7 @@ public class MinTimeAccumulator implements Accumulator { return TSDataType.INT64; } - private void updateMinTime(long curTime) { + protected void updateMinTime(long curTime) { hasCandidateResult = true; minTime = Math.min(minTime, curTime); } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/MinTimeDescAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/MinTimeDescAccumulator.java new file mode 100644 index 0000000000..cb9136eca9 --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/MinTimeDescAccumulator.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.mpp.operator.aggregation; + +import org.apache.iotdb.tsfile.read.common.TimeRange; +import org.apache.iotdb.tsfile.read.common.block.column.Column; + +public class MinTimeDescAccumulator extends MinTimeAccumulator { + + // Column should be like: | Time | + @Override + public void addInput(Column[] column, TimeRange timeRange) { + for (int i = 0; i < column[0].getPositionCount(); i++) { + long curTime = column[0].getLong(i); + if (curTime >= timeRange.getMin() && curTime < timeRange.getMax()) { + updateMinTime(curTime); + } + } + } + + @Override + public boolean hasFinalResult() { + return false; + } +} diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesAggregateScanOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesAggregateScanOperatorTest.java index c21314b5d9..2b4d7c4c7c 100644 --- a/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesAggregateScanOperatorTest.java +++ b/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesAggregateScanOperatorTest.java @@ -93,9 +93,9 @@ public class SeriesAggregateScanOperatorTest { initSeriesAggregateScanOperator( Collections.singletonList( new Aggregator( - AccumulatorFactory.createAccumulator(AggregationType.COUNT, TSDataType.INT32), - AggregationStep.SINGLE, - null)), + AccumulatorFactory.createAccumulator( + AggregationType.COUNT, TSDataType.INT32, true), + AggregationStep.SINGLE)), null, true, null);
