This is an automated email from the ASF dual-hosted git repository.

xiangweiwei pushed a commit to branch aggregator
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit aa5adcef267884faf6cf97c22caa3236d44210d1
Author: Alima777 <[email protected]>
AuthorDate: Sun May 1 14:49:58 2022 +0800

    add desc accumulator
---
 .../operator/aggregation/AccumulatorFactory.java   | 14 +++---
 .../db/mpp/operator/aggregation/Aggregator.java    |  9 +++-
 .../aggregation/FirstValueAccumulator.java         |  8 ++--
 .../aggregation/FirstValueDescAccumulator.java     | 47 +++++++++++++++++++
 .../operator/aggregation/LastValueAccumulator.java |  6 +--
 .../aggregation/LastValueDescAccumulator.java      | 52 ++++++++++++++++++++++
 .../operator/aggregation/MaxTimeAccumulator.java   |  4 +-
 .../aggregation/MaxTimeDescAccumulator.java        | 47 +++++++++++++++++++
 .../operator/aggregation/MinTimeAccumulator.java   |  6 +--
 .../aggregation/MinTimeDescAccumulator.java        | 42 +++++++++++++++++
 .../operator/SeriesAggregateScanOperatorTest.java  |  6 +--
 11 files changed, 220 insertions(+), 21 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/AccumulatorFactory.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/AccumulatorFactory.java
index e389d543be..10619b9e35 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/AccumulatorFactory.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/AccumulatorFactory.java
@@ -26,7 +26,7 @@ public class AccumulatorFactory {
 
   // TODO: Are we going to create different seriesScanOperator based on order 
by sequence?
   public static Accumulator createAccumulator(
-      AggregationType aggregationType, TSDataType tsDataType) {
+      AggregationType aggregationType, TSDataType tsDataType, boolean 
ascending) {
     switch (aggregationType) {
       case COUNT:
         return new CountAccumulator();
@@ -37,17 +37,21 @@ public class AccumulatorFactory {
       case EXTREME:
         return new ExtremeAccumulator(tsDataType);
       case MAX_TIME:
-        return new MaxTimeAccumulator();
+        return ascending ? new MaxTimeAccumulator() : new 
MaxTimeDescAccumulator();
       case MIN_TIME:
-        return new MinTimeAccumulator();
+        return ascending ? new MinTimeAccumulator() : new 
MinTimeDescAccumulator();
       case MAX_VALUE:
         return new MaxValueAccumulator(tsDataType);
       case MIN_VALUE:
         return new MinValueAccumulator(tsDataType);
       case LAST_VALUE:
-        return new LastValueAccumulator(tsDataType);
+        return ascending
+            ? new LastValueAccumulator(tsDataType)
+            : new LastValueDescAccumulator(tsDataType);
       case FIRST_VALUE:
-        return new FirstValueAccumulator(tsDataType);
+        return ascending
+            ? new FirstValueAccumulator(tsDataType)
+            : new FirstValueDescAccumulator(tsDataType);
       default:
         throw new IllegalArgumentException("Invalid Aggregation function: " + 
aggregationType);
     }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/Aggregator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/Aggregator.java
index a808c734bc..617d89d4ae 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/Aggregator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/Aggregator.java
@@ -34,11 +34,18 @@ public class Aggregator {
 
   private final Accumulator accumulator;
   // In some intermediate result input, inputLocation[] should include two 
columns
-  private final List<InputLocation[]> inputLocationList;
+  private List<InputLocation[]> inputLocationList;
   private final AggregationStep step;
 
   private TimeRange timeRange = new TimeRange(0, Long.MAX_VALUE);
 
+  // Used for SeriesAggregateScanOperator
+  public Aggregator(Accumulator accumulator, AggregationStep step) {
+    this.accumulator = accumulator;
+    this.step = step;
+  }
+
+  // Used for aggregateOperator
   public Aggregator(
       Accumulator accumulator, AggregationStep step, List<InputLocation[]> 
inputLocationList) {
     this.accumulator = accumulator;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/FirstValueAccumulator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/FirstValueAccumulator.java
index 6af6164b49..8fa0801faf 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/FirstValueAccumulator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/FirstValueAccumulator.java
@@ -28,9 +28,9 @@ import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
 
 public class FirstValueAccumulator implements Accumulator {
 
-  private boolean hasCandidateResult;
-  private TsPrimitiveType firstValue;
-  private long minTime = Long.MAX_VALUE;
+  protected boolean hasCandidateResult;
+  protected TsPrimitiveType firstValue;
+  protected long minTime = Long.MAX_VALUE;
 
   public FirstValueAccumulator(TSDataType seriesDataType) {
     firstValue = TsPrimitiveType.getByType(seriesDataType);
@@ -99,7 +99,7 @@ public class FirstValueAccumulator implements Accumulator {
     return firstValue.getDataType();
   }
 
-  private void updateFirstValue(Object value, long curTime) {
+  protected void updateFirstValue(Object value, long curTime) {
     hasCandidateResult = true;
     if (curTime < minTime) {
       minTime = curTime;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/FirstValueDescAccumulator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/FirstValueDescAccumulator.java
new file mode 100644
index 0000000000..87b939438e
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/FirstValueDescAccumulator.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.operator.aggregation;
+
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.TimeRange;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+
+public class FirstValueDescAccumulator extends FirstValueAccumulator {
+
+  public FirstValueDescAccumulator(TSDataType seriesDataType) {
+    super(seriesDataType);
+  }
+
+  // Column should be like: | Time | Value |
+  @Override
+  public void addInput(Column[] column, TimeRange timeRange) {
+    for (int i = 0; i < column[0].getPositionCount(); i++) {
+      long curTime = column[0].getLong(i);
+      if (curTime >= timeRange.getMin() && curTime < timeRange.getMax()) {
+        updateFirstValue(column[1].getObject(0), curTime);
+      }
+    }
+  }
+
+  @Override
+  public boolean hasFinalResult() {
+    return false;
+  }
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/LastValueAccumulator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/LastValueAccumulator.java
index 1ecd65ae61..901759b687 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/LastValueAccumulator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/LastValueAccumulator.java
@@ -28,8 +28,8 @@ import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
 
 public class LastValueAccumulator implements Accumulator {
 
-  private TsPrimitiveType lastValue;
-  private long maxTime = Long.MIN_VALUE;
+  protected TsPrimitiveType lastValue;
+  protected long maxTime = Long.MIN_VALUE;
 
   public LastValueAccumulator(TSDataType seriesDataType) {
     lastValue = TsPrimitiveType.getByType(seriesDataType);
@@ -100,7 +100,7 @@ public class LastValueAccumulator implements Accumulator {
     return lastValue.getDataType();
   }
 
-  private void updateLastValue(Object value, long curTime) {
+  protected void updateLastValue(Object value, long curTime) {
     if (curTime > maxTime) {
       maxTime = curTime;
       lastValue.setObject(value);
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/LastValueDescAccumulator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/LastValueDescAccumulator.java
new file mode 100644
index 0000000000..3d3f61644f
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/LastValueDescAccumulator.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.operator.aggregation;
+
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.TimeRange;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+
+public class LastValueDescAccumulator extends LastValueAccumulator {
+
+  private boolean hasCandidateResult = false;
+
+  public LastValueDescAccumulator(TSDataType seriesDataType) {
+    super(seriesDataType);
+  }
+
+  // Column should be like: | Time | Value |
+  @Override
+  public void addInput(Column[] column, TimeRange timeRange) {
+    long curTime = column[0].getLong(0);
+    if (curTime < timeRange.getMax() && curTime >= timeRange.getMin()) {
+      updateLastValue(column[1].getObject(0), curTime);
+    }
+  }
+
+  @Override
+  public boolean hasFinalResult() {
+    return hasCandidateResult;
+  }
+
+  protected void updateLastValue(Object value, long curTime) {
+    hasCandidateResult = true;
+    super.updateLastValue(value, curTime);
+  }
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/MaxTimeAccumulator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/MaxTimeAccumulator.java
index 3addbf26d9..cda25af4a1 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/MaxTimeAccumulator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/MaxTimeAccumulator.java
@@ -27,7 +27,7 @@ import 
org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
 
 public class MaxTimeAccumulator implements Accumulator {
 
-  private long maxTime = Long.MIN_VALUE;
+  protected long maxTime = Long.MIN_VALUE;
 
   public MaxTimeAccumulator() {}
 
@@ -93,7 +93,7 @@ public class MaxTimeAccumulator implements Accumulator {
     return TSDataType.INT64;
   }
 
-  private void updateMaxTime(long curTime) {
+  protected void updateMaxTime(long curTime) {
     maxTime = Math.max(maxTime, curTime);
   }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/MaxTimeDescAccumulator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/MaxTimeDescAccumulator.java
new file mode 100644
index 0000000000..01fb65f541
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/MaxTimeDescAccumulator.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.operator.aggregation;
+
+import org.apache.iotdb.tsfile.read.common.TimeRange;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+
+public class MaxTimeDescAccumulator extends MaxTimeAccumulator {
+
+  private boolean hasCandidateResult = false;
+
+  // Column should be like: | Time |
+  @Override
+  public void addInput(Column[] column, TimeRange timeRange) {
+    long curTime = column[0].getLong(0);
+    if (curTime < timeRange.getMax() && curTime >= timeRange.getMin()) {
+      updateMaxTime(curTime);
+    }
+  }
+
+  @Override
+  public boolean hasFinalResult() {
+    return hasCandidateResult;
+  }
+
+  protected void updateMaxTime(long curTime) {
+    hasCandidateResult = true;
+    super.updateMaxTime(curTime);
+  }
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/MinTimeAccumulator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/MinTimeAccumulator.java
index 893d8436eb..b80adbe470 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/MinTimeAccumulator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/MinTimeAccumulator.java
@@ -27,8 +27,8 @@ import 
org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
 
 public class MinTimeAccumulator implements Accumulator {
 
-  private boolean hasCandidateResult;
-  private long minTime = Long.MAX_VALUE;
+  protected boolean hasCandidateResult;
+  protected long minTime = Long.MAX_VALUE;
 
   public MinTimeAccumulator() {}
 
@@ -92,7 +92,7 @@ public class MinTimeAccumulator implements Accumulator {
     return TSDataType.INT64;
   }
 
-  private void updateMinTime(long curTime) {
+  protected void updateMinTime(long curTime) {
     hasCandidateResult = true;
     minTime = Math.min(minTime, curTime);
   }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/MinTimeDescAccumulator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/MinTimeDescAccumulator.java
new file mode 100644
index 0000000000..cb9136eca9
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/MinTimeDescAccumulator.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.operator.aggregation;
+
+import org.apache.iotdb.tsfile.read.common.TimeRange;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+
+public class MinTimeDescAccumulator extends MinTimeAccumulator {
+
+  // Column should be like: | Time |
+  @Override
+  public void addInput(Column[] column, TimeRange timeRange) {
+    for (int i = 0; i < column[0].getPositionCount(); i++) {
+      long curTime = column[0].getLong(i);
+      if (curTime >= timeRange.getMin() && curTime < timeRange.getMax()) {
+        updateMinTime(curTime);
+      }
+    }
+  }
+
+  @Override
+  public boolean hasFinalResult() {
+    return false;
+  }
+}
diff --git 
a/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesAggregateScanOperatorTest.java
 
b/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesAggregateScanOperatorTest.java
index c21314b5d9..2b4d7c4c7c 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesAggregateScanOperatorTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesAggregateScanOperatorTest.java
@@ -93,9 +93,9 @@ public class SeriesAggregateScanOperatorTest {
         initSeriesAggregateScanOperator(
             Collections.singletonList(
                 new Aggregator(
-                    
AccumulatorFactory.createAccumulator(AggregationType.COUNT, TSDataType.INT32),
-                    AggregationStep.SINGLE,
-                    null)),
+                    AccumulatorFactory.createAccumulator(
+                        AggregationType.COUNT, TSDataType.INT32, true),
+                    AggregationStep.SINGLE)),
             null,
             true,
             null);

Reply via email to