This is an automated email from the ASF dual-hosted git repository.

caogaofei pushed a commit to branch add_extream
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 7859ad2d8cffddacfcba8d73ef7236ce6db5ca0f
Author: Beyyes <[email protected]>
AuthorDate: Sat Oct 19 13:01:00 2024 +0800

    add extreme
---
 .../db/it/IoTDBMultiIDsWithAttributesTableIT.java  |  11 +-
 .../relational/aggregation/AccumulatorFactory.java |   2 +
 .../relational/aggregation/ExtremeAccumulator.java | 307 +++++++++++++++++++++
 3 files changed, 317 insertions(+), 3 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBMultiIDsWithAttributesTableIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBMultiIDsWithAttributesTableIT.java
index 01d62a5c348..568457e8c3d 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBMultiIDsWithAttributesTableIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBMultiIDsWithAttributesTableIT.java
@@ -1051,15 +1051,20 @@ public class IoTDBMultiIDsWithAttributesTableIT {
   }
 
   @Test
-  public void maxByMinByTest() {
-    String[] expectedHeader1 = buildHeaders(10);
+  public void maxByMinByExtremeTest() {
+    expectedHeader = buildHeaders(10);
     sql =
         "select 
max_by(time,floatnum),min_by(time,floatnum),max_by(time,date),min_by(time,date),max_by(time,floatnum),min_by(time,floatnum),max_by(time,ts),min_by(time,ts),max_by(time,stringv),min_by(time,stringv)
 from table0";
     retArray =
         new String[] {
           
"1970-01-01T00:00:00.100Z,1970-01-01T00:00:00.040Z,1971-08-20T11:33:20.000Z,1971-08-20T11:33:20.000Z,1970-01-01T00:00:00.100Z,1970-01-01T00:00:00.040Z,1971-01-01T00:00:10.000Z,1971-01-01T00:01:40.000Z,1971-01-01T00:01:40.000Z,1971-01-01T00:00:01.000Z,",
         };
-    tableResultSetEqualTest(sql, expectedHeader1, retArray, DATABASE_NAME);
+    tableResultSetEqualTest(sql, expectedHeader, retArray, DATABASE_NAME);
+
+    expectedHeader = buildHeaders(3);
+    sql = "select extreme(num),extreme(bignum),extreme(floatnum) from table0";
+    retArray = new String[] {"15,3147483648,4654.231,"};
+    tableResultSetEqualTest(sql, expectedHeader, retArray, DATABASE_NAME);
   }
 
   // ==================================================================
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/AccumulatorFactory.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/AccumulatorFactory.java
index 55a2c1e28a5..0dc87039ecb 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/AccumulatorFactory.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/AccumulatorFactory.java
@@ -150,6 +150,8 @@ public class AccumulatorFactory {
         return new TableMaxByAccumulator(inputDataTypes.get(0), 
inputDataTypes.get(1));
       case MIN_BY:
         return new TableMinByAccumulator(inputDataTypes.get(0), 
inputDataTypes.get(1));
+      case EXTREME:
+        return new ExtremeAccumulator(inputDataTypes.get(0));
       default:
         throw new IllegalArgumentException("Invalid Aggregation function: " + 
aggregationType);
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/ExtremeAccumulator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/ExtremeAccumulator.java
new file mode 100644
index 00000000000..4b8cba3d73e
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/ExtremeAccumulator.java
@@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation;
+
+import org.apache.tsfile.block.column.Column;
+import org.apache.tsfile.block.column.ColumnBuilder;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.file.metadata.statistics.Statistics;
+import org.apache.tsfile.utils.RamUsageEstimator;
+import org.apache.tsfile.utils.TsPrimitiveType;
+import org.apache.tsfile.write.UnSupportedDataTypeException;
+
+public class ExtremeAccumulator implements TableAccumulator {
+  private static final long INSTANCE_SIZE =
+      RamUsageEstimator.shallowSizeOfInstance(ExtremeAccumulator.class);
+  private final TSDataType seriesDataType;
+  private final TsPrimitiveType extremeResult;
+  private boolean initResult;
+
+  public ExtremeAccumulator(TSDataType seriesDataType) {
+    this.seriesDataType = seriesDataType;
+    this.extremeResult = TsPrimitiveType.getByType(seriesDataType);
+  }
+
+  @Override
+  public long getEstimatedSize() {
+    return INSTANCE_SIZE;
+  }
+
+  @Override
+  public TableAccumulator copy() {
+    return new ExtremeAccumulator(seriesDataType);
+  }
+
+  @Override
+  public void addInput(Column[] arguments) {
+    switch (seriesDataType) {
+      case INT32:
+        addIntInput(arguments[0]);
+        return;
+      case INT64:
+        addLongInput(arguments[0]);
+        return;
+      case FLOAT:
+        addFloatInput(arguments[0]);
+        return;
+      case DOUBLE:
+        addDoubleInput(arguments[0]);
+        return;
+      case TEXT:
+      case STRING:
+      case BLOB:
+      case BOOLEAN:
+      case DATE:
+      case TIMESTAMP:
+      default:
+        throw new UnSupportedDataTypeException(
+            String.format("Unsupported data type in Extreme: %s", 
seriesDataType));
+    }
+  }
+
+  // partialResult should be like: | PartialExtremeValue |
+  @Override
+  public void addIntermediate(Column argument) {
+    for (int i = 0; i < argument.getPositionCount(); i++) {
+      if (argument.isNull(i)) {
+        continue;
+      }
+
+      switch (seriesDataType) {
+        case INT32:
+          updateIntResult(argument.getInt(i));
+          break;
+        case INT64:
+          updateLongResult(argument.getLong(i));
+          break;
+        case FLOAT:
+          updateFloatResult(argument.getFloat(i));
+          break;
+        case DOUBLE:
+          updateDoubleResult(argument.getDouble(i));
+          break;
+        case TEXT:
+        case STRING:
+        case BLOB:
+        case BOOLEAN:
+        case DATE:
+        case TIMESTAMP:
+        default:
+          throw new UnSupportedDataTypeException(
+              String.format("Unsupported data type in Extreme: %s", 
seriesDataType));
+      }
+    }
+  }
+
+  @Override
+  public void addStatistics(Statistics[] statistics) {
+    if (statistics == null || statistics[0] == null) {
+      return;
+    }
+
+    switch (seriesDataType) {
+      case INT32:
+        updateIntResult((int) statistics[0].getMaxValue());
+        updateIntResult((int) statistics[0].getMinValue());
+        break;
+      case INT64:
+        updateLongResult((long) statistics[0].getMaxValue());
+        updateLongResult((long) statistics[0].getMinValue());
+        break;
+      case FLOAT:
+        updateFloatResult((float) statistics[0].getMaxValue());
+        updateFloatResult((float) statistics[0].getMinValue());
+        break;
+      case DOUBLE:
+        updateDoubleResult((double) statistics[0].getMaxValue());
+        updateDoubleResult((double) statistics[0].getMinValue());
+        break;
+      case TEXT:
+      case STRING:
+      case BLOB:
+      case BOOLEAN:
+      case DATE:
+      case TIMESTAMP:
+      default:
+        throw new UnSupportedDataTypeException(
+            String.format("Unsupported data type in Extreme: %s", 
seriesDataType));
+    }
+  }
+
+  // columnBuilder should be single in ExtremeAccumulator
+  @Override
+  public void evaluateIntermediate(ColumnBuilder columnBuilder) {
+    if (!initResult) {
+      columnBuilder.appendNull();
+      return;
+    }
+
+    switch (seriesDataType) {
+      case INT32:
+        columnBuilder.writeInt(extremeResult.getInt());
+        break;
+      case INT64:
+        columnBuilder.writeLong(extremeResult.getLong());
+        break;
+      case FLOAT:
+        columnBuilder.writeFloat(extremeResult.getFloat());
+        break;
+      case DOUBLE:
+        columnBuilder.writeDouble(extremeResult.getDouble());
+        break;
+      case TEXT:
+      case STRING:
+      case BLOB:
+      case BOOLEAN:
+      case DATE:
+      case TIMESTAMP:
+      default:
+        throw new UnSupportedDataTypeException(
+            String.format("Unsupported data type in Extreme: %s", 
seriesDataType));
+    }
+  }
+
+  @Override
+  public void evaluateFinal(ColumnBuilder columnBuilder) {
+    if (!initResult) {
+      columnBuilder.appendNull();
+      return;
+    }
+
+    switch (seriesDataType) {
+      case INT32:
+        columnBuilder.writeInt(extremeResult.getInt());
+        break;
+      case INT64:
+        columnBuilder.writeLong(extremeResult.getLong());
+        break;
+      case FLOAT:
+        columnBuilder.writeFloat(extremeResult.getFloat());
+        break;
+      case DOUBLE:
+        columnBuilder.writeDouble(extremeResult.getDouble());
+        break;
+      case TEXT:
+      case STRING:
+      case BLOB:
+      case BOOLEAN:
+      case DATE:
+      case TIMESTAMP:
+      default:
+        throw new UnSupportedDataTypeException(
+            String.format("Unsupported data type in Extreme: %s", 
seriesDataType));
+    }
+  }
+
+  @Override
+  public void reset() {
+    initResult = false;
+    extremeResult.reset();
+  }
+
+  @Override
+  public boolean hasFinalResult() {
+    return false;
+  }
+
+  private void addIntInput(Column column) {
+    for (int i = 0; i < column.getPositionCount(); i++) {
+      if (!column.isNull(i)) {
+        updateIntResult(column.getInt(i));
+      }
+    }
+  }
+
+  private void updateIntResult(int val) {
+    int absExtVal = Math.abs(val);
+    int candidateResult = extremeResult.getInt();
+    int absCandidateResult = Math.abs(extremeResult.getInt());
+
+    if (!initResult
+        || (absExtVal > absCandidateResult)
+        || (absExtVal == absCandidateResult) && val > candidateResult) {
+      initResult = true;
+      extremeResult.setInt(val);
+    }
+  }
+
+  private void addLongInput(Column column) {
+    for (int i = 0; i < column.getPositionCount(); i++) {
+      if (!column.isNull(i)) {
+        updateLongResult(column.getLong(i));
+      }
+    }
+  }
+
+  private void updateLongResult(long val) {
+    long absExtVal = Math.abs(val);
+    long candidateResult = extremeResult.getLong();
+    long absCandidateResult = Math.abs(extremeResult.getLong());
+
+    if (!initResult
+        || (absExtVal > absCandidateResult)
+        || (absExtVal == absCandidateResult) && val > candidateResult) {
+      initResult = true;
+      extremeResult.setLong(val);
+    }
+  }
+
+  private void addFloatInput(Column column) {
+    for (int i = 0; i < column.getPositionCount(); i++) {
+      if (!column.isNull(i)) {
+        updateFloatResult(column.getFloat(i));
+      }
+    }
+  }
+
+  private void updateFloatResult(float val) {
+    float absExtVal = Math.abs(val);
+    float candidateResult = extremeResult.getFloat();
+    float absCandidateResult = Math.abs(extremeResult.getFloat());
+
+    if (!initResult
+        || (absExtVal > absCandidateResult)
+        || (absExtVal == absCandidateResult) && val > candidateResult) {
+      initResult = true;
+      extremeResult.setFloat(val);
+    }
+  }
+
+  private void addDoubleInput(Column column) {
+    for (int i = 0; i < column.getPositionCount(); i++) {
+      if (!column.isNull(i)) {
+        updateDoubleResult(column.getDouble(i));
+      }
+    }
+  }
+
+  private void updateDoubleResult(double val) {
+    double absExtVal = Math.abs(val);
+    double candidateResult = extremeResult.getDouble();
+    double absCandidateResult = Math.abs(extremeResult.getDouble());
+
+    if (!initResult
+        || (absExtVal > absCandidateResult)
+        || (absExtVal == absCandidateResult) && val > candidateResult) {
+      initResult = true;
+      extremeResult.setDouble(val);
+    }
+  }
+}

Reply via email to