This is an automated email from the ASF dual-hosted git repository.

lancelly pushed a commit to branch min_by
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit c64a5c037c8cc978b10678b74ac9e093809fe523
Author: lancelly <[email protected]>
AuthorDate: Thu Feb 22 19:26:00 2024 +0800

    minby
---
 .../constant/BuiltinAggregationFunctionEnum.java   |   3 +-
 .../db/it/aggregation/IoTDBAggregationIT.java      |  52 +++
 .../db/it/aggregation/maxby/IoTDBMaxByIT.java      |   3 +-
 .../db/it/aggregation/minby/IoTDBMinBy2IT.java     |  41 +++
 .../db/it/aggregation/minby/IoTDBMinBy3IT.java     |  48 +++
 .../minby/IoTDBMinByAlignedSeriesIT.java           |  71 ++++
 .../IoTDBMaxByIT.java => minby/IoTDBMinByIT.java}  | 131 ++++---
 .../java/org/apache/iotdb/tool/ExportTsFile.java   |   3 +-
 .../apache/iotdb/jdbc/IoTDBDatabaseMetadata.java   |   1 +
 .../sql/factory/IoTDBDynamicTableFactory.java      |   1 +
 .../execution/aggregation/AccumulatorFactory.java  |   4 +
 .../execution/aggregation/MaxByAccumulator.java    | 404 +--------------------
 ...cumulator.java => MaxMinByBaseAccumulator.java} |  63 ++--
 .../execution/aggregation/MinByAccumulator.java    |  48 +++
 .../SlidingWindowAggregatorFactory.java            |  34 ++
 .../plan/analyze/ExpressionTypeAnalyzer.java       |   1 +
 .../db/queryengine/plan/parser/ASTVisitor.java     |   1 +
 .../plan/parameter/AggregationDescriptor.java      |   3 +
 .../org/apache/iotdb/db/utils/SchemaUtils.java     |   5 +
 .../apache/iotdb/db/utils/TypeInferenceUtils.java  |   3 +
 .../iotdb/db/utils/constant/SqlConstant.java       |   1 +
 .../iotdb/db/utils/constant/TestConstant.java      |   4 +
 .../execution/aggregation/AccumulatorTest.java     |  33 ++
 .../udf/builtin/BuiltinAggregationFunction.java    |   5 +-
 .../thrift-commons/src/main/thrift/common.thrift   |   1 +
 25 files changed, 476 insertions(+), 488 deletions(-)

diff --git 
a/integration-test/src/main/java/org/apache/iotdb/itbase/constant/BuiltinAggregationFunctionEnum.java
 
b/integration-test/src/main/java/org/apache/iotdb/itbase/constant/BuiltinAggregationFunctionEnum.java
index bf41380cfe4..7c2c283d30e 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/itbase/constant/BuiltinAggregationFunctionEnum.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/itbase/constant/BuiltinAggregationFunctionEnum.java
@@ -41,7 +41,8 @@ public enum BuiltinAggregationFunctionEnum {
   COUNT("count"),
   AVG("avg"),
   SUM("sum"),
-  MAX_BY("max_by");
+  MAX_BY("max_by"),
+  MIN_BY("min_by");
 
   private final String functionName;
 
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/IoTDBAggregationIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/IoTDBAggregationIT.java
index b209d05b3a5..aff7582bf26 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/IoTDBAggregationIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/IoTDBAggregationIT.java
@@ -46,6 +46,7 @@ import static 
org.apache.iotdb.db.utils.constant.TestConstant.lastValue;
 import static org.apache.iotdb.db.utils.constant.TestConstant.maxBy;
 import static org.apache.iotdb.db.utils.constant.TestConstant.maxTime;
 import static org.apache.iotdb.db.utils.constant.TestConstant.maxValue;
+import static org.apache.iotdb.db.utils.constant.TestConstant.minBy;
 import static org.apache.iotdb.db.utils.constant.TestConstant.minTime;
 import static org.apache.iotdb.db.utils.constant.TestConstant.minValue;
 import static org.apache.iotdb.db.utils.constant.TestConstant.sum;
@@ -1033,4 +1034,55 @@ public class IoTDBAggregationIT {
       fail(e.getMessage());
     }
   }
+
+  @Test
+  public void minByTest() {
+    String[] retArray = new String[] {"0,500", "0,500", "0,500"};
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+
+      int cnt;
+      try (ResultSet resultSet =
+          statement.executeQuery(
+              "SELECT min_by(time, s0) "
+                  + "FROM root.vehicle.d0 WHERE time >= 100 AND time < 9000")) 
{
+        cnt = 0;
+        while (resultSet.next()) {
+          String ans =
+              resultSet.getString(TIMESTAMP_STR) + "," + 
resultSet.getString(minBy("Time", d0s0));
+          Assert.assertEquals(retArray[cnt], ans);
+          cnt++;
+        }
+        Assert.assertEquals(1, cnt);
+      }
+
+      try (ResultSet resultSet =
+          statement.executeQuery("SELECT min_by(time,s0) FROM root.vehicle.d0 
WHERE time < 2500")) {
+        while (resultSet.next()) {
+          String ans =
+              resultSet.getString(TIMESTAMP_STR) + "," + 
resultSet.getString(minBy("Time", d0s0));
+          Assert.assertEquals(retArray[cnt], ans);
+          cnt++;
+        }
+        Assert.assertEquals(2, cnt);
+      }
+
+      // keep the correctness of `order by time desc`
+      cnt = 0;
+      try (ResultSet resultSet =
+          statement.executeQuery(
+              "SELECT min_by(time,s0) FROM root.vehicle.d0 WHERE time >= 100 
AND time < 9000 order by time desc")) {
+        while (resultSet.next()) {
+          String ans =
+              resultSet.getString(TIMESTAMP_STR) + "," + 
resultSet.getString(minBy("Time", d0s0));
+          Assert.assertEquals(retArray[cnt], ans);
+          cnt++;
+        }
+        Assert.assertEquals(1, cnt);
+      }
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
 }
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/maxby/IoTDBMaxByIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/maxby/IoTDBMaxByIT.java
index b73f35bbf1e..5b706720b0c 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/maxby/IoTDBMaxByIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/maxby/IoTDBMaxByIT.java
@@ -96,7 +96,6 @@ public class IoTDBMaxByIT {
         "INSERT INTO root.db.d2(timestamp,x1,x2,x3,x4,x5,x6) values(1, 1, 1, 
1, 1, true, \"1\")",
         "INSERT INTO root.db.d2(timestamp,x1,x2,x3,x4,x5,x6) values(2, 2, 2, 
2, 2, false, \"2\")",
         "INSERT INTO root.db.d2(timestamp,x1,x2,x3,x4,x5,x6) values(3, 3, 3, 
3, 3, false, \"3\")",
-        "INSERT INTO root.db.d2(timestamp,x1,x2,x3,x4,x5,x6) values(1, 4, 4, 
4, 4, true, \"1\")",
         "INSERT INTO root.db.d2(timestamp,y1,y2,y3,y4,y5,y6) values(2, 2, 2, 
2, 2, true, \"4\")",
         "INSERT INTO root.db.d2(timestamp,y1,y2,y3,y4,y5,y6) values(3, 3, 3, 
3, 3, false, \"3\")",
         "INSERT INTO root.db.d2(timestamp,y1,y2,y3,y4,y5,y6) values(4, 1, 1, 
1, 1, false, \"1\")",
@@ -107,7 +106,7 @@ public class IoTDBMaxByIT {
         "flush"
       };
 
-  protected static final String UNSUPPORTED_TYPE_MESSAGE = "Unsupported data 
type in MaxBy:";
+  protected static final String UNSUPPORTED_TYPE_MESSAGE = "Unsupported data 
type in MaxBy/MinBy:";
 
   @BeforeClass
   public static void setUp() throws Exception {
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/minby/IoTDBMinBy2IT.java
 
b/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/minby/IoTDBMinBy2IT.java
new file mode 100644
index 00000000000..9efa6ff1906
--- /dev/null
+++ 
b/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/minby/IoTDBMinBy2IT.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.it.aggregation.minby;
+
+import org.apache.iotdb.it.env.EnvFactory;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+import static org.apache.iotdb.db.it.utils.TestUtils.prepareData;
+
+public class IoTDBMinBy2IT extends IoTDBMinByIT {
+  @BeforeClass
+  public static void setUp() throws Exception {
+    
EnvFactory.getEnv().getConfig().getCommonConfig().setDegreeOfParallelism(4);
+    EnvFactory.getEnv().initClusterEnvironment();
+    prepareData(NON_ALIGNED_DATASET);
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    EnvFactory.getEnv().cleanClusterEnvironment();
+  }
+}
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/minby/IoTDBMinBy3IT.java
 
b/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/minby/IoTDBMinBy3IT.java
new file mode 100644
index 00000000000..e4c6523ac4d
--- /dev/null
+++ 
b/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/minby/IoTDBMinBy3IT.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.it.aggregation.minby;
+
+import org.apache.iotdb.it.env.EnvFactory;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+import static org.apache.iotdb.db.it.utils.TestUtils.prepareData;
+
+public class IoTDBMinBy3IT extends IoTDBMinByIT {
+  @BeforeClass
+  public static void setUp() throws Exception {
+    EnvFactory.getEnv()
+        .getConfig()
+        .getCommonConfig()
+        .setEnableSeqSpaceCompaction(false)
+        .setEnableUnseqSpaceCompaction(false)
+        .setEnableCrossSpaceCompaction(false)
+        .setMaxTsBlockLineNumber(1)
+        .setMaxNumberOfPointsInPage(1);
+    EnvFactory.getEnv().initClusterEnvironment();
+    prepareData(NON_ALIGNED_DATASET);
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    EnvFactory.getEnv().cleanClusterEnvironment();
+  }
+}
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/minby/IoTDBMinByAlignedSeriesIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/minby/IoTDBMinByAlignedSeriesIT.java
new file mode 100644
index 00000000000..56f7b5ecb09
--- /dev/null
+++ 
b/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/minby/IoTDBMinByAlignedSeriesIT.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.it.aggregation.minby;
+
+import org.apache.iotdb.it.env.EnvFactory;
+
+import org.junit.BeforeClass;
+
+import static org.apache.iotdb.db.it.utils.TestUtils.prepareData;
+
+public class IoTDBMinByAlignedSeriesIT extends IoTDBMinByIT {
+  protected static final String[] ALIGNED_DATASET =
+      new String[] {
+        // x input
+        "CREATE ALIGNED TIMESERIES root.db.d1(x1 INT32, x2 INT64, x3 FLOAT, x4 
DOUBLE, x5 BOOLEAN, x6 TEXT)",
+        // y input
+        "CREATE ALIGNED TIMESERIES root.db.d1(y1 INT32, y2 INT64, y3 FLOAT, y4 
DOUBLE, y5 BOOLEAN, y6 TEXT)",
+        "INSERT INTO root.db.d1(timestamp,x1,x2,x3,x4,x5,x6) values(1, 1, 1, 
1, 1, true, \"1\")",
+        "INSERT INTO root.db.d1(timestamp,x1,x2,x3,x4,x5,x6) values(2, 2, 2, 
2, 2, false, \"2\")",
+        "INSERT INTO root.db.d1(timestamp,x1,x2,x3,x4,x5,x6) values(3, 3, 3, 
3, 3, false, \"3\")",
+        "INSERT INTO root.db.d1(timestamp,y1,y2,y3,y4,y5,y6) values(2, 3, 3, 
3, 3, true, \"4\")",
+        "INSERT INTO root.db.d1(timestamp,y1,y2,y3,y4,y5,y6) values(3, 2, 2, 
2, 2, false, \"3\")",
+        "INSERT INTO root.db.d1(timestamp,y1,y2,y3,y4,y5,y6) values(4, 1, 1, 
1, 1, false, \"4\")",
+        "INSERT INTO root.db.d1(timestamp,x1,x2,x3,x4,x5,x6) values(8, 3, 3, 
3, 3, false, \"3\")",
+        "INSERT INTO root.db.d1(timestamp,y1,y2,y3,y4,y5,y6) values(8, 8, 8, 
8, 8, false, \"4\")",
+        "INSERT INTO root.db.d1(timestamp,x1,x2,x3,x4,x5,x6) values(12, 3, 3, 
3, 3, false, \"3\")",
+        "INSERT INTO root.db.d1(timestamp,y1,y2,y3,y4,y5,y6) values(12, 0, 0, 
0, 0, false, \"4\")",
+        "INSERT INTO root.db.d1(timestamp,x1,x2,x3,x4,x5,x6) values(13, 4, 4, 
4, 4, false, \"4\")",
+        "INSERT INTO root.db.d1(timestamp,y1,y2,y3,y4,y5,y6) values(13, 0, 0, 
0, 0, false, \"4\")",
+        "flush",
+        // For Align By Device
+        "CREATE ALIGNED TIMESERIES root.db.d2(x1 INT32, x2 INT64, x3 FLOAT, x4 
DOUBLE, x5 BOOLEAN, x6 TEXT)",
+        "CREATE ALIGNED TIMESERIES root.db.d2(y1 INT32, y2 INT64, y3 FLOAT, y4 
DOUBLE, y5 BOOLEAN, y6 TEXT)",
+        "INSERT INTO root.db.d2(timestamp,x1,x2,x3,x4,x5,x6) values(1, 1, 1, 
1, 1, true, \"1\")",
+        "INSERT INTO root.db.d2(timestamp,x1,x2,x3,x4,x5,x6) values(2, 2, 2, 
2, 2, false, \"2\")",
+        "INSERT INTO root.db.d2(timestamp,x1,x2,x3,x4,x5,x6) values(3, 3, 3, 
3, 3, false, \"3\")",
+        "INSERT INTO root.db.d2(timestamp,y1,y2,y3,y4,y5,y6) values(2, 3, 3, 
3, 3, true, \"4\")",
+        "INSERT INTO root.db.d2(timestamp,y1,y2,y3,y4,y5,y6) values(3, 2, 2, 
2, 2, false, \"3\")",
+        "INSERT INTO root.db.d2(timestamp,y1,y2,y3,y4,y5,y6) values(4, 1, 1, 
1, 1, false, \"4\")",
+        "INSERT INTO root.db.d2(timestamp,x1,x2,x3,x4,x5,x6) values(8, 3, 3, 
3, 3, false, \"3\")",
+        "INSERT INTO root.db.d2(timestamp,y1,y2,y3,y4,y5,y6) values(8, 8, 8, 
8, 8, false, \"4\")",
+        "INSERT INTO root.db.d2(timestamp,x1,x2,x3,x4,x5,x6) values(12, 3, 3, 
3, 3, false, \"3\")",
+        "INSERT INTO root.db.d2(timestamp,y1,y2,y3,y4,y5,y6) values(12, 0, 0, 
0, 0, false, \"4\")",
+        "INSERT INTO root.db.d2(timestamp,x1,x2,x3,x4,x5,x6) values(13, 4, 4, 
4, 4, false, \"4\")",
+        "INSERT INTO root.db.d2(timestamp,y1,y2,y3,y4,y5,y6) values(13, 0, 0, 
0, 0, false, \"4\")",
+      };
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    
EnvFactory.getEnv().getConfig().getCommonConfig().setPartitionInterval(1000);
+    EnvFactory.getEnv().initClusterEnvironment();
+    prepareData(ALIGNED_DATASET);
+  }
+}
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/maxby/IoTDBMaxByIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/minby/IoTDBMinByIT.java
similarity index 78%
copy from 
integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/maxby/IoTDBMaxByIT.java
copy to 
integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/minby/IoTDBMinByIT.java
index b73f35bbf1e..f33236839d9 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/maxby/IoTDBMaxByIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/minby/IoTDBMinByIT.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.it.aggregation.maxby;
+package org.apache.iotdb.db.it.aggregation.minby;
 
 import org.apache.iotdb.it.env.EnvFactory;
 import org.apache.iotdb.it.framework.IoTDBTestRunner;
@@ -41,13 +41,13 @@ import java.util.Map;
 import static org.apache.iotdb.db.it.utils.TestUtils.prepareData;
 import static org.apache.iotdb.db.it.utils.TestUtils.resultSetEqualTest;
 import static org.apache.iotdb.db.utils.constant.TestConstant.TIMESTAMP_STR;
-import static org.apache.iotdb.db.utils.constant.TestConstant.maxBy;
+import static org.apache.iotdb.db.utils.constant.TestConstant.minBy;
 import static org.apache.iotdb.itbase.constant.TestConstant.DEVICE;
 import static org.junit.Assert.fail;
 
 @RunWith(IoTDBTestRunner.class)
 @Category({LocalStandaloneIT.class, ClusterIT.class})
-public class IoTDBMaxByIT {
+public class IoTDBMinByIT {
   protected static final String[] NON_ALIGNED_DATASET =
       new String[] {
         "CREATE DATABASE root.db",
@@ -68,9 +68,9 @@ public class IoTDBMaxByIT {
         "INSERT INTO root.db.d1(timestamp,x1,x2,x3,x4,x5,x6) values(1, 1, 1, 
1, 1, true, \"1\")",
         "INSERT INTO root.db.d1(timestamp,x1,x2,x3,x4,x5,x6) values(2, 2, 2, 
2, 2, false, \"2\")",
         "INSERT INTO root.db.d1(timestamp,x1,x2,x3,x4,x5,x6) values(3, 3, 3, 
3, 3, false, \"3\")",
-        "INSERT INTO root.db.d1(timestamp,y1,y2,y3,y4,y5,y6) values(2, 2, 2, 
2, 2, true, \"4\")",
-        "INSERT INTO root.db.d1(timestamp,y1,y2,y3,y4,y5,y6) values(3, 3, 3, 
3, 3, false, \"3\")",
-        "INSERT INTO root.db.d1(timestamp,y1,y2,y3,y4,y5,y6) values(4, 4, 4, 
4, 4, false, \"4\")",
+        "INSERT INTO root.db.d1(timestamp,y1,y2,y3,y4,y5,y6) values(2, 3, 3, 
3, 3, true, \"4\")",
+        "INSERT INTO root.db.d1(timestamp,y1,y2,y3,y4,y5,y6) values(3, 2, 2, 
2, 2, false, \"3\")",
+        "INSERT INTO root.db.d1(timestamp,y1,y2,y3,y4,y5,y6) values(4, 1, 1, 
1, 1, false, \"4\")",
         "INSERT INTO root.db.d1(timestamp,x1,x2,x3,x4,x5,x6) values(8, 3, 3, 
3, 3, false, \"3\")",
         "INSERT INTO root.db.d1(timestamp,y1,y2,y3,y4,y5,y6) values(8, 8, 8, 
8, 8, false, \"4\")",
         "INSERT INTO root.db.d1(timestamp,x1,x2,x3,x4,x5,x6) values(12, 3, 3, 
3, 3, false, \"3\")",
@@ -96,18 +96,17 @@ public class IoTDBMaxByIT {
         "INSERT INTO root.db.d2(timestamp,x1,x2,x3,x4,x5,x6) values(1, 1, 1, 
1, 1, true, \"1\")",
         "INSERT INTO root.db.d2(timestamp,x1,x2,x3,x4,x5,x6) values(2, 2, 2, 
2, 2, false, \"2\")",
         "INSERT INTO root.db.d2(timestamp,x1,x2,x3,x4,x5,x6) values(3, 3, 3, 
3, 3, false, \"3\")",
-        "INSERT INTO root.db.d2(timestamp,x1,x2,x3,x4,x5,x6) values(1, 4, 4, 
4, 4, true, \"1\")",
-        "INSERT INTO root.db.d2(timestamp,y1,y2,y3,y4,y5,y6) values(2, 2, 2, 
2, 2, true, \"4\")",
-        "INSERT INTO root.db.d2(timestamp,y1,y2,y3,y4,y5,y6) values(3, 3, 3, 
3, 3, false, \"3\")",
+        "INSERT INTO root.db.d2(timestamp,y1,y2,y3,y4,y5,y6) values(2, 3, 3, 
3, 3, true, \"4\")",
+        "INSERT INTO root.db.d2(timestamp,y1,y2,y3,y4,y5,y6) values(3, 2, 2, 
2, 2, false, \"3\")",
         "INSERT INTO root.db.d2(timestamp,y1,y2,y3,y4,y5,y6) values(4, 1, 1, 
1, 1, false, \"1\")",
         "INSERT INTO root.db.d2(timestamp,x1,x2,x3,x4,x5,x6) values(12, 3, 3, 
3, 3, false, \"3\")",
-        "INSERT INTO root.db.d2(timestamp,y1,y2,y3,y4,y5,y6) values(12, 9, 9, 
9, 9, false, \"1\")",
+        "INSERT INTO root.db.d2(timestamp,y1,y2,y3,y4,y5,y6) values(12, 0, 0, 
0, 0, false, \"1\")",
         "INSERT INTO root.db.d2(timestamp,x1,x2,x3,x4,x5,x6) values(13, 4, 4, 
4, 4, false, \"4\")",
-        "INSERT INTO root.db.d2(timestamp,y1,y2,y3,y4,y5,y6) values(13, 9, 9, 
9, 9, false, \"1\")",
+        "INSERT INTO root.db.d2(timestamp,y1,y2,y3,y4,y5,y6) values(13, 0, 0, 
0, 0, false, \"1\")",
         "flush"
       };
 
-  protected static final String UNSUPPORTED_TYPE_MESSAGE = "Unsupported data 
type in MaxBy:";
+  protected static final String UNSUPPORTED_TYPE_MESSAGE = "Unsupported data 
type in MaxBy/MinBy:";
 
   @BeforeClass
   public static void setUp() throws Exception {
@@ -127,7 +126,7 @@ public class IoTDBMaxByIT {
         Statement statement = connection.createStatement()) {
       try {
         try (ResultSet resultSet =
-            statement.executeQuery("SELECT max_by(x1, y5) FROM root.db.d1")) {
+            statement.executeQuery("SELECT min_by(x1, y5) FROM root.db.d1")) {
           resultSet.next();
           fail();
         }
@@ -136,7 +135,7 @@ public class IoTDBMaxByIT {
       }
       try {
         try (ResultSet resultSet =
-            statement.executeQuery("SELECT max_by(x1, y6) FROM root.db.d1")) {
+            statement.executeQuery("SELECT min_by(x1, y6) FROM root.db.d1")) {
           resultSet.next();
           fail();
         }
@@ -145,7 +144,7 @@ public class IoTDBMaxByIT {
       }
       try {
         try (ResultSet resultSet =
-            statement.executeQuery("SELECT max_by(x5, y5) FROM root.db.d1")) {
+            statement.executeQuery("SELECT min_by(x5, y5) FROM root.db.d1")) {
           resultSet.next();
           fail();
         }
@@ -154,7 +153,7 @@ public class IoTDBMaxByIT {
       }
       try {
         try (ResultSet resultSet =
-            statement.executeQuery("SELECT max_by(x5, y6) FROM root.db.d1")) {
+            statement.executeQuery("SELECT min_by(x5, y6) FROM root.db.d1")) {
           resultSet.next();
           fail();
         }
@@ -163,7 +162,7 @@ public class IoTDBMaxByIT {
       }
       try {
         try (ResultSet resultSet =
-            statement.executeQuery("SELECT max_by(x6, y5) FROM root.db.d1")) {
+            statement.executeQuery("SELECT min_by(x6, y5) FROM root.db.d1")) {
           resultSet.next();
           fail();
         }
@@ -172,7 +171,7 @@ public class IoTDBMaxByIT {
       }
       try {
         try (ResultSet resultSet =
-            statement.executeQuery("SELECT max_by(x6, y6) FROM root.db.d1")) {
+            statement.executeQuery("SELECT min_by(x6, y6) FROM root.db.d1")) {
           resultSet.next();
           fail();
         }
@@ -199,7 +198,7 @@ public class IoTDBMaxByIT {
         String y = expectedHeader.getKey();
         resultSetEqualTest(
             String.format(
-                "select 
max_by(x1,%s),max_by(x2,%s),max_by(x3,%s),max_by(x4,%s),max_by(x5,%s),max_by(x6,%s)
 from root.db.d1 where time <= 3",
+                "select 
min_by(x1,%s),min_by(x2,%s),min_by(x3,%s),min_by(x4,%s),min_by(x5,%s),min_by(x6,%s)
 from root.db.d1 where time <= 3",
                 y, y, y, y, y, y),
             expectedHeader.getValue(),
             retArray);
@@ -224,7 +223,7 @@ public class IoTDBMaxByIT {
         String y = expectedHeader.getKey();
         resultSetEqualTest(
             String.format(
-                "select 
max_by(x1,%s),max_by(x2,%s),max_by(x3,%s),max_by(x4,%s),max_by(x5,%s),max_by(x6,%s)
 from root.db.d1 where time <= 4",
+                "select 
min_by(x1,%s),min_by(x2,%s),min_by(x3,%s),min_by(x4,%s),min_by(x5,%s),min_by(x6,%s)
 from root.db.d1 where time <= 4",
                 y, y, y, y, y, y),
             expectedHeader.getValue(),
             retArray);
@@ -249,7 +248,7 @@ public class IoTDBMaxByIT {
         String y = expectedHeader.getKey();
         resultSetEqualTest(
             String.format(
-                "select 
max_by(time,%s),max_by(time,%s),max_by(time,%s),max_by(time,%s),max_by(time,%s),max_by(time,%s)
 from root.db.d1 where time <= 3",
+                "select 
min_by(time,%s),min_by(time,%s),min_by(time,%s),min_by(time,%s),min_by(time,%s),min_by(time,%s)
 from root.db.d1 where time <= 3",
                 y, y, y, y, y, y),
             expectedHeader.getValue(),
             retArray);
@@ -259,7 +258,7 @@ public class IoTDBMaxByIT {
         String y = expectedHeader.getKey();
         resultSetEqualTest(
             String.format(
-                "select 
max_by(time,%s),max_by(time,%s),max_by(time,%s),max_by(time,%s),max_by(time,%s),max_by(time,%s)
 from root.db.d1 where time <= 4",
+                "select 
min_by(time,%s),min_by(time,%s),min_by(time,%s),min_by(time,%s),min_by(time,%s),min_by(time,%s)
 from root.db.d1 where time <= 4",
                 y, y, y, y, y, y),
             expectedHeader.getValue(),
             retArray1);
@@ -276,18 +275,18 @@ public class IoTDBMaxByIT {
         Statement statement = connection.createStatement()) {
       String[] expectedHeader =
           new String[] {
-            "max_by(root.db.d1.x1 + 1 - 3, -cos(sin(root.db.d1.y2 / 10)))",
-            "max_by(root.db.d1.x2 * 2 / 3, -cos(sin(root.db.d1.y2 / 10)))",
-            "max_by(floor(root.db.d1.x3), -cos(sin(root.db.d1.y2 / 10)))",
-            "max_by(ceil(root.db.d1.x4), -cos(sin(root.db.d1.y2 / 10)))",
-            "max_by(root.db.d1.x5, -cos(sin(root.db.d1.y2 / 10)))",
-            "max_by(REPLACE(root.db.d1.x6, '3', '4'), -cos(sin(root.db.d1.y2 / 
10)))",
+            "min_by(root.db.d1.x1 + 1 - 3, -cos(sin(root.db.d1.y2 / 10)))",
+            "min_by(root.db.d1.x2 * 2 / 3, -cos(sin(root.db.d1.y2 / 10)))",
+            "min_by(floor(root.db.d1.x3), -cos(sin(root.db.d1.y2 / 10)))",
+            "min_by(ceil(root.db.d1.x4), -cos(sin(root.db.d1.y2 / 10)))",
+            "min_by(root.db.d1.x5, -cos(sin(root.db.d1.y2 / 10)))",
+            "min_by(REPLACE(root.db.d1.x6, '3', '4'), -cos(sin(root.db.d1.y2 / 
10)))",
           };
       String[] retArray = new String[] {"1.0,2.0,3.0,3.0,false,4,"};
       String y = "-cos(sin(y2 / 10))";
       resultSetEqualTest(
           String.format(
-              "select max_by(x1 + 1 - 3,%s),max_by(x2 * 2 / 
3,%s),max_by(floor(x3),%s),max_by(ceil(x4),%s),max_by(x5,%s),max_by(replace(x6, 
'3', '4'),%s) from root.db.d1 where time <= 3",
+              "select min_by(x1 + 1 - 3,%s),min_by(x2 * 2 / 
3,%s),min_by(floor(x3),%s),min_by(ceil(x4),%s),min_by(x5,%s),min_by(replace(x6, 
'3', '4'),%s) from root.db.d1 where time <= 3",
               y, y, y, y, y, y),
           expectedHeader,
           retArray);
@@ -304,12 +303,12 @@ public class IoTDBMaxByIT {
       String[] expectedHeader =
           new String[] {
             DEVICE,
-            "max_by(x1 + 1 - 3, -cos(sin(y2 / 10)))",
-            "max_by(x2 * 2 / 3, -cos(sin(y2 / 10)))",
-            "max_by(floor(x3), -cos(sin(y2 / 10)))",
-            "max_by(ceil(x4), -cos(sin(y2 / 10)))",
-            "max_by(x5, -cos(sin(y2 / 10)))",
-            "max_by(REPLACE(x6, '3', '4'), -cos(sin(y2 / 10)))",
+            "min_by(x1 + 1 - 3, -cos(sin(y2 / 10)))",
+            "min_by(x2 * 2 / 3, -cos(sin(y2 / 10)))",
+            "min_by(floor(x3), -cos(sin(y2 / 10)))",
+            "min_by(ceil(x4), -cos(sin(y2 / 10)))",
+            "min_by(x5, -cos(sin(y2 / 10)))",
+            "min_by(REPLACE(x6, '3', '4'), -cos(sin(y2 / 10)))",
           };
       String[] retArray =
           new String[] {
@@ -318,7 +317,7 @@ public class IoTDBMaxByIT {
       String y = "-cos(sin(y2 / 10))";
       resultSetEqualTest(
           String.format(
-              "select max_by(x1 + 1 - 3,%s),max_by(x2 * 2 / 
3,%s),max_by(floor(x3),%s),max_by(ceil(x4),%s),max_by(x5,%s),max_by(replace(x6, 
'3', '4'),%s) from root.db.** where time <= 3 align by device",
+              "select min_by(x1 + 1 - 3,%s),min_by(x2 * 2 / 
3,%s),min_by(floor(x3),%s),min_by(ceil(x4),%s),min_by(x5,%s),min_by(replace(x6, 
'3', '4'),%s) from root.db.** where time <= 3 align by device",
               y, y, y, y, y, y),
           expectedHeader,
           retArray);
@@ -335,12 +334,12 @@ public class IoTDBMaxByIT {
       String[] expectedHeader =
           new String[] {
             TIMESTAMP_STR,
-            "max_by(root.db.d1.x1, root.db.d1.y2)",
-            "max_by(root.db.d1.x2, root.db.d1.y2)",
-            "max_by(root.db.d1.x3, root.db.d1.y2)",
-            "max_by(root.db.d1.x4, root.db.d1.y2)",
-            "max_by(root.db.d1.x5, root.db.d1.y2)",
-            "max_by(root.db.d1.x6, root.db.d1.y2)",
+            "min_by(root.db.d1.x1, root.db.d1.y2)",
+            "min_by(root.db.d1.x2, root.db.d1.y2)",
+            "min_by(root.db.d1.x3, root.db.d1.y2)",
+            "min_by(root.db.d1.x4, root.db.d1.y2)",
+            "min_by(root.db.d1.x5, root.db.d1.y2)",
+            "min_by(root.db.d1.x6, root.db.d1.y2)",
           };
       String y = "y2";
       // order by time ASC
@@ -350,7 +349,7 @@ public class IoTDBMaxByIT {
           };
       resultSetEqualTest(
           String.format(
-              "select 
max_by(x1,%s),max_by(x2,%s),max_by(x3,%s),max_by(x4,%s),max_by(x5,%s),max_by(x6,%s)
 from root.db.d1 where time <= 10 group by ([0,9),4ms) ",
+              "select 
min_by(x1,%s),min_by(x2,%s),min_by(x3,%s),min_by(x4,%s),min_by(x5,%s),min_by(x6,%s)
 from root.db.d1 where time <= 10 group by ([0,9),4ms) ",
               y, y, y, y, y, y),
           expectedHeader,
           retArray1);
@@ -365,7 +364,7 @@ public class IoTDBMaxByIT {
           };
       resultSetEqualTest(
           String.format(
-              "select 
max_by(x1,%s),max_by(x2,%s),max_by(x3,%s),max_by(x4,%s),max_by(x5,%s),max_by(x6,%s)
 from root.db.d1 group by ([0,14),4ms) order by time desc",
+              "select 
min_by(x1,%s),min_by(x2,%s),min_by(x3,%s),min_by(x4,%s),min_by(x5,%s),min_by(x6,%s)
 from root.db.d1 group by ([0,14),4ms) order by time desc",
               y, y, y, y, y, y),
           expectedHeader,
           retArray2);
@@ -392,16 +391,16 @@ public class IoTDBMaxByIT {
         String[] expectedHeader =
             new String[] {
               TIMESTAMP_STR,
-              String.format("max_by(root.db.d1.x1, root.db.d1.%s)", y),
-              String.format("max_by(root.db.d1.x2, root.db.d1.%s)", y),
-              String.format("max_by(root.db.d1.x3, root.db.d1.%s)", y),
-              String.format("max_by(root.db.d1.x4, root.db.d1.%s)", y),
-              String.format("max_by(root.db.d1.x5, root.db.d1.%s)", y),
-              String.format("max_by(root.db.d1.x6, root.db.d1.%s)", y),
+              String.format("min_by(root.db.d1.x1, root.db.d1.%s)", y),
+              String.format("min_by(root.db.d1.x2, root.db.d1.%s)", y),
+              String.format("min_by(root.db.d1.x3, root.db.d1.%s)", y),
+              String.format("min_by(root.db.d1.x4, root.db.d1.%s)", y),
+              String.format("min_by(root.db.d1.x5, root.db.d1.%s)", y),
+              String.format("min_by(root.db.d1.x6, root.db.d1.%s)", y),
             };
         resultSetEqualTest(
             String.format(
-                "select 
max_by(x1,%s),max_by(x2,%s),max_by(x3,%s),max_by(x4,%s),max_by(x5,%s),max_by(x6,%s)
 from root.db.d1 where time <= 10 group by ([0,9),4ms,2ms) ",
+                "select 
min_by(x1,%s),min_by(x2,%s),min_by(x3,%s),min_by(x4,%s),min_by(x5,%s),min_by(x6,%s)
 from root.db.d1 where time <= 10 group by ([0,9),4ms,2ms) ",
                 y, y, y, y, y, y),
             expectedHeader,
             retArray);
@@ -419,18 +418,18 @@ public class IoTDBMaxByIT {
       String[] expectedHeader =
           new String[] {
             TIMESTAMP_STR,
-            "max_by(root.db.d1.x1, root.db.d1.y2)",
-            "max_by(root.db.d1.x2, root.db.d1.y2)",
-            "max_by(root.db.d1.x3, root.db.d1.y2)",
-            "max_by(root.db.d1.x4, root.db.d1.y2)",
-            "max_by(root.db.d1.x5, root.db.d1.y2)",
-            "max_by(root.db.d1.x6, root.db.d1.y2)",
+            "min_by(root.db.d1.x1, root.db.d1.y2)",
+            "min_by(root.db.d1.x2, root.db.d1.y2)",
+            "min_by(root.db.d1.x3, root.db.d1.y2)",
+            "min_by(root.db.d1.x4, root.db.d1.y2)",
+            "min_by(root.db.d1.x5, root.db.d1.y2)",
+            "min_by(root.db.d1.x6, root.db.d1.y2)",
           };
       String[] retArray = new String[] {"8,3,3,3.0,3.0,false,3,"};
       String y = "y2";
       resultSetEqualTest(
           String.format(
-              "select 
max_by(x1,%s),max_by(x2,%s),max_by(x3,%s),max_by(x4,%s),max_by(x5,%s),max_by(x6,%s)
 from root.db.d1 group by ([0,9),4ms) having max_by(time, %s) > 4",
+              "select 
min_by(x1,%s),min_by(x2,%s),min_by(x3,%s),min_by(x4,%s),min_by(x5,%s),min_by(x6,%s)
 from root.db.d1 group by ([0,9),4ms) having min_by(time, %s) > 4",
               y, y, y, y, y, y, y),
           expectedHeader,
           retArray);
@@ -449,16 +448,16 @@ public class IoTDBMaxByIT {
       for (String y : yArray) {
         String[] expectedHeader =
             new String[] {
-              String.format("max_by(root.*.*.x1, root.*.*.%s)", y),
-              String.format("max_by(root.*.*.x2, root.*.*.%s)", y),
-              String.format("max_by(root.*.*.x3, root.*.*.%s)", y),
-              String.format("max_by(root.*.*.x4, root.*.*.%s)", y),
-              String.format("max_by(root.*.*.x5, root.*.*.%s)", y),
-              String.format("max_by(root.*.*.x6, root.*.*.%s)", y),
+              String.format("min_by(root.*.*.x1, root.*.*.%s)", y),
+              String.format("min_by(root.*.*.x2, root.*.*.%s)", y),
+              String.format("min_by(root.*.*.x3, root.*.*.%s)", y),
+              String.format("min_by(root.*.*.x4, root.*.*.%s)", y),
+              String.format("min_by(root.*.*.x5, root.*.*.%s)", y),
+              String.format("min_by(root.*.*.x6, root.*.*.%s)", y),
             };
         resultSetEqualTest(
             String.format(
-                "select 
max_by(x1,%s),max_by(x2,%s),max_by(x3,%s),max_by(x4,%s),max_by(x5,%s),max_by(x6,%s)
 from root.db.** group by level = 0",
+                "select 
min_by(x1,%s),min_by(x2,%s),min_by(x3,%s),min_by(x4,%s),min_by(x5,%s),min_by(x6,%s)
 from root.db.** group by level = 0",
                 y, y, y, y, y, y),
             expectedHeader,
             retArray);
@@ -479,7 +478,7 @@ public class IoTDBMaxByIT {
               res.put(
                   y,
                   Arrays.stream(xInput)
-                      .map(x -> maxBy("Time".equals(x) ? x : device + "." + x, 
device + "." + y))
+                      .map(x -> minBy("Time".equals(x) ? x : device + "." + x, 
device + "." + y))
                       .toArray(String[]::new));
             });
     return res;
diff --git 
a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/ExportTsFile.java 
b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/ExportTsFile.java
index 4f193ceff0d..7eb12390d21 100644
--- a/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/ExportTsFile.java
+++ b/iotdb-client/cli/src/main/java/org/apache/iotdb/tool/ExportTsFile.java
@@ -188,7 +188,8 @@ public class ExportTsFile extends AbstractTsFileTool {
         || sqlLower.contains("variance(")
         || sqlLower.contains("var_pop(")
         || sqlLower.contains("var_samp(")
-        || sqlLower.contains("max_by(")) {
+        || sqlLower.contains("max_by(")
+        || sqlLower.contains("min_by(")) {
       ioTPrinter.println("The sql you entered is invalid, please don't use 
aggregate query.");
       System.exit(CODE_ERROR);
     }
diff --git 
a/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBDatabaseMetadata.java
 
b/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBDatabaseMetadata.java
index e20a007f096..3de83632122 100644
--- 
a/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBDatabaseMetadata.java
+++ 
b/iotdb-client/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBDatabaseMetadata.java
@@ -224,6 +224,7 @@ public class IoTDBDatabaseMetadata implements 
DatabaseMetaData {
       "LATEST",
       "LIKE",
       "MAX_BY",
+      "MIN_BY",
       "METADATA",
       "MERGE",
       "MOVE",
diff --git 
a/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/factory/IoTDBDynamicTableFactory.java
 
b/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/factory/IoTDBDynamicTableFactory.java
index f5305180e95..363750f2c81 100644
--- 
a/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/factory/IoTDBDynamicTableFactory.java
+++ 
b/iotdb-connector/flink-sql-iotdb-connector/src/main/java/org/apache/iotdb/flink/sql/factory/IoTDBDynamicTableFactory.java
@@ -203,6 +203,7 @@ public class IoTDBDynamicTableFactory
             || sqlLower.contains("max_time(")
             || sqlLower.contains("min_time(")
             || sqlLower.contains("max_by(")
+            || sqlLower.contains("min_by(")
             || sqlLower.contains("stddev(")
             || sqlLower.contains("stddev_pop(")
             || sqlLower.contains("stddev_samp(")
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/AccumulatorFactory.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/AccumulatorFactory.java
index 99c8f3fe9a2..1f8a8e8154b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/AccumulatorFactory.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/AccumulatorFactory.java
@@ -67,6 +67,7 @@ public class AccumulatorFactory {
   public static boolean isMultiInputAggregation(TAggregationType 
aggregationType) {
     switch (aggregationType) {
       case MAX_BY:
+      case MIN_BY:
         return true;
       default:
         return false;
@@ -79,6 +80,9 @@ public class AccumulatorFactory {
       case MAX_BY:
         checkState(inputDataTypes.size() == 2, "Wrong inputDataTypes size.");
         return new MaxByAccumulator(inputDataTypes.get(0), 
inputDataTypes.get(1));
+      case MIN_BY:
+        checkState(inputDataTypes.size() == 2, "Wrong inputDataTypes size.");
+        return new MinByAccumulator(inputDataTypes.get(0), 
inputDataTypes.get(1));
       default:
         throw new IllegalArgumentException("Invalid Aggregation function: " + 
aggregationType);
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/MaxByAccumulator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/MaxByAccumulator.java
index b2226238016..b5d1ee92da5 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/MaxByAccumulator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/MaxByAccumulator.java
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
@@ -19,410 +19,30 @@
 
 package org.apache.iotdb.db.queryengine.execution.aggregation;
 
-import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
-import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
-import org.apache.iotdb.tsfile.read.common.block.column.Column;
-import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
-import org.apache.iotdb.tsfile.utils.Binary;
-import org.apache.iotdb.tsfile.utils.BitMap;
-import org.apache.iotdb.tsfile.utils.BytesUtils;
-import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
 
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.util.Collections;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-/** max(x,y) returns the value of x associated with the maximum value of y 
over all input values. */
-public class MaxByAccumulator implements Accumulator {
-
-  private final TSDataType xDataType;
-
-  private final TSDataType yDataType;
-
-  private final TsPrimitiveType yMaxValue;
-
-  private final TsPrimitiveType xResult;
-
-  private boolean xNull = true;
-
-  private boolean initResult;
-
-  private long yTimeStamp = Long.MAX_VALUE;
-
-  private static final String UNSUPPORTED_TYPE_MESSAGE = "Unsupported data 
type in MaxBy: %s";
-
-  public MaxByAccumulator(TSDataType xDataType, TSDataType yDataType) {
-    this.xDataType = xDataType;
-    this.yDataType = yDataType;
-    this.xResult = TsPrimitiveType.getByType(xDataType);
-    this.yMaxValue = TsPrimitiveType.getByType(yDataType);
-  }
-
-  // Column should be like: | Time | x | y |
-  @Override
-  public void addInput(Column[] column, BitMap bitMap) {
-    checkArgument(column.length == 3, "Length of input Column[] for MaxBy 
should be 3");
-    switch (yDataType) {
-      case INT32:
-        addIntInput(column, bitMap);
-        return;
-      case INT64:
-        addLongInput(column, bitMap);
-        return;
-      case FLOAT:
-        addFloatInput(column, bitMap);
-        return;
-      case DOUBLE:
-        addDoubleInput(column, bitMap);
-        return;
-      case TEXT:
-      case BOOLEAN:
-      default:
-        throw new 
UnSupportedDataTypeException(String.format(UNSUPPORTED_TYPE_MESSAGE, 
yDataType));
-    }
-  }
-
-  // partialResult should be like: | partialMaxByBinary |
-  @Override
-  public void addIntermediate(Column[] partialResult) {
-    checkArgument(partialResult.length == 1, "partialResult of MaxBy should be 
1");
-    // Return if y is null.
-    if (partialResult[0].isNull(0)) {
-      return;
-    }
-    byte[] bytes = partialResult[0].getBinary(0).getValues();
-    updateFromBytesIntermediateInput(bytes);
-  }
-
-  @Override
-  public void addStatistics(Statistics statistics) {
-    throw new UnsupportedOperationException(getClass().getName());
-  }
-
-  // finalResult should be single column, like: | finalXValue |
-  @Override
-  public void setFinal(Column finalResult) {
-    if (finalResult.isNull(0)) {
-      return;
-    }
-    initResult = true;
-    updateX(finalResult, 0);
-  }
-
-  // columnBuilders should be like | TextIntermediateColumnBuilder |
-  @Override
-  public void outputIntermediate(ColumnBuilder[] columnBuilders) {
-    checkArgument(columnBuilders.length == 1, "partialResult of MaxValue 
should be 1");
-    if (!initResult) {
-      columnBuilders[0].appendNull();
-      return;
-    }
-    columnBuilders[0].writeBinary(new Binary(serialize()));
-  }
-
-  @Override
-  public void outputFinal(ColumnBuilder columnBuilder) {
-    if (!initResult) {
-      columnBuilder.appendNull();
-      return;
-    }
-    writeX(columnBuilder);
+public class MaxByAccumulator extends MaxMinByBaseAccumulator {
+  protected MaxByAccumulator(TSDataType xDataType, TSDataType yDataType) {
+    super(xDataType, yDataType);
   }
 
   @Override
-  public void reset() {
-    initResult = false;
-    xNull = true;
-    this.xResult.reset();
-    this.yMaxValue.reset();
-    yTimeStamp = Long.MAX_VALUE;
+  protected boolean check(int yValue, int yExtremeValue) {
+    return yValue > yExtremeValue;
   }
 
   @Override
-  public boolean hasFinalResult() {
-    return false;
+  protected boolean check(long yValue, long yExtremeValue) {
+    return yValue > yExtremeValue;
   }
 
   @Override
-  public TSDataType[] getIntermediateType() {
-    return new TSDataType[] {TSDataType.TEXT};
+  protected boolean check(float yValue, float yExtremeValue) {
+    return yValue > yExtremeValue;
   }
 
   @Override
-  public TSDataType getFinalType() {
-    return xDataType;
-  }
-
-  private void addIntInput(Column[] column, BitMap bitMap) {
-    int count = column[0].getPositionCount();
-    for (int i = 0; i < count; i++) {
-      if (bitMap != null && !bitMap.isMarked(i)) {
-        continue;
-      }
-      if (!column[2].isNull(i)) {
-        updateIntResult(column[0].getLong(i), column[2].getInt(i), column[1], 
i);
-      }
-    }
-  }
-
-  private void updateIntResult(long time, int yMaxVal, Column xColumn, int 
xIndex) {
-    if (!initResult
-        || yMaxVal > yMaxValue.getInt()
-        || (yMaxVal == yMaxValue.getInt() && time < yTimeStamp)) {
-      initResult = true;
-      yTimeStamp = time;
-      yMaxValue.setInt(yMaxVal);
-      updateX(xColumn, xIndex);
-    }
-  }
-
-  private void addLongInput(Column[] column, BitMap bitMap) {
-    int count = column[0].getPositionCount();
-    for (int i = 0; i < count; i++) {
-      if (bitMap != null && !bitMap.isMarked(i)) {
-        continue;
-      }
-      if (!column[2].isNull(i)) {
-        updateLongResult(column[0].getLong(i), column[2].getLong(i), 
column[1], i);
-      }
-    }
-  }
-
-  private void updateLongResult(long time, long yMaxVal, Column xColumn, int 
xIndex) {
-    if (!initResult
-        || yMaxVal > yMaxValue.getLong()
-        || (yMaxVal == yMaxValue.getLong() && time < yTimeStamp)) {
-      initResult = true;
-      yTimeStamp = time;
-      yMaxValue.setLong(yMaxVal);
-      updateX(xColumn, xIndex);
-    }
-  }
-
-  private void addFloatInput(Column[] column, BitMap bitMap) {
-    int count = column[0].getPositionCount();
-    for (int i = 0; i < count; i++) {
-      if (bitMap != null && !bitMap.isMarked(i)) {
-        continue;
-      }
-      if (!column[2].isNull(i)) {
-        updateFloatResult(column[0].getLong(i), column[2].getFloat(i), 
column[1], i);
-      }
-    }
-  }
-
-  private void updateFloatResult(long time, float yMaxVal, Column xColumn, int 
xIndex) {
-    if (!initResult
-        || yMaxVal > yMaxValue.getFloat()
-        || (yMaxVal == yMaxValue.getFloat() && time < yTimeStamp)) {
-      initResult = true;
-      yTimeStamp = time;
-      yMaxValue.setFloat(yMaxVal);
-      updateX(xColumn, xIndex);
-    }
-  }
-
-  private void addDoubleInput(Column[] column, BitMap bitMap) {
-    int count = column[0].getPositionCount();
-    for (int i = 0; i < count; i++) {
-      if (bitMap != null && !bitMap.isMarked(i)) {
-        continue;
-      }
-      if (!column[2].isNull(i)) {
-        updateDoubleResult(column[0].getLong(i), column[2].getDouble(i), 
column[1], i);
-      }
-    }
-  }
-
-  private void updateDoubleResult(long time, double yMaxVal, Column xColumn, 
int xIndex) {
-    if (!initResult
-        || yMaxVal > yMaxValue.getDouble()
-        || (yMaxVal == yMaxValue.getDouble() && time < yTimeStamp)) {
-      initResult = true;
-      yTimeStamp = time;
-      yMaxValue.setDouble(yMaxVal);
-      updateX(xColumn, xIndex);
-    }
-  }
-
-  private void writeX(ColumnBuilder columnBuilder) {
-    if (xNull) {
-      columnBuilder.appendNull();
-      return;
-    }
-    switch (xDataType) {
-      case INT32:
-        columnBuilder.writeInt(xResult.getInt());
-        break;
-      case INT64:
-        columnBuilder.writeLong(xResult.getLong());
-        break;
-      case FLOAT:
-        columnBuilder.writeFloat(xResult.getFloat());
-        break;
-      case DOUBLE:
-        columnBuilder.writeDouble(xResult.getDouble());
-        break;
-      case TEXT:
-        columnBuilder.writeBinary(xResult.getBinary());
-        break;
-      case BOOLEAN:
-        columnBuilder.writeBoolean(xResult.getBoolean());
-        break;
-      default:
-        throw new 
UnSupportedDataTypeException(String.format(UNSUPPORTED_TYPE_MESSAGE, 
xDataType));
-    }
-  }
-
-  private void updateX(Column xColumn, int xIndex) {
-    if (xColumn.isNull(xIndex)) {
-      xNull = true;
-    } else {
-      xNull = false;
-      switch (xDataType) {
-        case INT32:
-          xResult.setInt(xColumn.getInt(xIndex));
-          break;
-        case INT64:
-          xResult.setLong(xColumn.getLong(xIndex));
-          break;
-        case FLOAT:
-          xResult.setFloat(xColumn.getFloat(xIndex));
-          break;
-        case DOUBLE:
-          xResult.setDouble(xColumn.getDouble(xIndex));
-          break;
-        case TEXT:
-          xResult.setBinary(xColumn.getBinary(xIndex));
-          break;
-        case BOOLEAN:
-          xResult.setBoolean(xColumn.getBoolean(xIndex));
-          break;
-        default:
-          throw new UnSupportedDataTypeException(
-              String.format(UNSUPPORTED_TYPE_MESSAGE, xDataType));
-      }
-    }
-  }
-
-  private byte[] serialize() {
-    ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
-    DataOutputStream dataOutputStream = new 
DataOutputStream(byteArrayOutputStream);
-    try {
-      dataOutputStream.writeLong(yTimeStamp);
-      writeIntermediateToStream(yDataType, yMaxValue, dataOutputStream);
-      dataOutputStream.writeBoolean(xNull);
-      if (!xNull) {
-        writeIntermediateToStream(xDataType, xResult, dataOutputStream);
-      }
-    } catch (IOException e) {
-      throw new UnsupportedOperationException(
-          "Failed to serialize intermediate result for MaxByAccumulator.", e);
-    }
-    return byteArrayOutputStream.toByteArray();
-  }
-
-  private void writeIntermediateToStream(
-      TSDataType dataType, TsPrimitiveType value, DataOutputStream 
dataOutputStream)
-      throws IOException {
-    switch (dataType) {
-      case INT32:
-        dataOutputStream.writeInt(value.getInt());
-        break;
-      case INT64:
-        dataOutputStream.writeLong(value.getLong());
-        break;
-      case FLOAT:
-        dataOutputStream.writeFloat(value.getFloat());
-        break;
-      case DOUBLE:
-        dataOutputStream.writeDouble(value.getDouble());
-        break;
-      case TEXT:
-        dataOutputStream.writeBytes(value.getBinary().toString());
-        break;
-      case BOOLEAN:
-        dataOutputStream.writeBoolean(value.getBoolean());
-        break;
-      default:
-        throw new 
UnSupportedDataTypeException(String.format(UNSUPPORTED_TYPE_MESSAGE, dataType));
-    }
-  }
-
-  private void updateFromBytesIntermediateInput(byte[] bytes) {
-    long time = BytesUtils.bytesToLongFromOffset(bytes, Long.BYTES, 0);
-    int offset = Long.BYTES;
-    // Use Column to store x value
-    TsBlockBuilder builder = new 
TsBlockBuilder(Collections.singletonList(xDataType));
-    ColumnBuilder columnBuilder = builder.getValueColumnBuilders()[0];
-    switch (yDataType) {
-      case INT32:
-        int intMaxVal = BytesUtils.bytesToInt(bytes, offset);
-        offset += Integer.BYTES;
-        readXFromBytesIntermediateInput(bytes, offset, columnBuilder);
-        updateIntResult(time, intMaxVal, columnBuilder.build(), 0);
-        break;
-      case INT64:
-        long longMaxVal = BytesUtils.bytesToLongFromOffset(bytes, Long.BYTES, 
offset);
-        offset += Long.BYTES;
-        readXFromBytesIntermediateInput(bytes, offset, columnBuilder);
-        updateLongResult(time, longMaxVal, columnBuilder.build(), 0);
-        break;
-      case FLOAT:
-        float floatMaxVal = BytesUtils.bytesToFloat(bytes, offset);
-        offset += Float.BYTES;
-        readXFromBytesIntermediateInput(bytes, offset, columnBuilder);
-        updateFloatResult(time, floatMaxVal, columnBuilder.build(), 0);
-        break;
-      case DOUBLE:
-        double doubleMaxVal = BytesUtils.bytesToDouble(bytes, offset);
-        offset += Long.BYTES;
-        readXFromBytesIntermediateInput(bytes, offset, columnBuilder);
-        updateDoubleResult(time, doubleMaxVal, columnBuilder.build(), 0);
-        break;
-      case TEXT:
-      case BOOLEAN:
-      default:
-        throw new 
UnSupportedDataTypeException(String.format(UNSUPPORTED_TYPE_MESSAGE, 
yDataType));
-    }
-  }
-
-  private void readXFromBytesIntermediateInput(
-      byte[] bytes, int offset, ColumnBuilder columnBuilder) {
-    boolean isXNull = BytesUtils.bytesToBool(bytes, offset);
-    offset += 1;
-    if (isXNull) {
-      columnBuilder.appendNull();
-    } else {
-      switch (xDataType) {
-        case INT32:
-          columnBuilder.writeInt(BytesUtils.bytesToInt(bytes, offset));
-          break;
-        case INT64:
-          columnBuilder.writeLong(BytesUtils.bytesToLongFromOffset(bytes, 8, 
offset));
-          break;
-        case FLOAT:
-          columnBuilder.writeFloat(BytesUtils.bytesToFloat(bytes, offset));
-          break;
-        case DOUBLE:
-          columnBuilder.writeDouble(BytesUtils.bytesToDouble(bytes, offset));
-          break;
-        case TEXT:
-          columnBuilder.writeBinary(
-              new Binary(BytesUtils.subBytes(bytes, offset, bytes.length - 
offset)));
-          break;
-        case BOOLEAN:
-          columnBuilder.writeBoolean(BytesUtils.bytesToBool(bytes, offset));
-          break;
-        default:
-          throw new UnSupportedDataTypeException(
-              String.format(UNSUPPORTED_TYPE_MESSAGE, xDataType));
-      }
-    }
+  protected boolean check(double yValue, double yExtremeValue) {
+    return yValue > yExtremeValue;
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/MaxByAccumulator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/MaxMinByBaseAccumulator.java
similarity index 86%
copy from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/MaxByAccumulator.java
copy to 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/MaxMinByBaseAccumulator.java
index b2226238016..a4ea49be634 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/MaxByAccumulator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/MaxMinByBaseAccumulator.java
@@ -38,13 +38,13 @@ import java.util.Collections;
 import static com.google.common.base.Preconditions.checkArgument;
 
 /** max(x,y) returns the value of x associated with the maximum value of y 
over all input values. */
-public class MaxByAccumulator implements Accumulator {
+public abstract class MaxMinByBaseAccumulator implements Accumulator {
 
   private final TSDataType xDataType;
 
   private final TSDataType yDataType;
 
-  private final TsPrimitiveType yMaxValue;
+  private final TsPrimitiveType yExtremeValue;
 
   private final TsPrimitiveType xResult;
 
@@ -54,19 +54,19 @@ public class MaxByAccumulator implements Accumulator {
 
   private long yTimeStamp = Long.MAX_VALUE;
 
-  private static final String UNSUPPORTED_TYPE_MESSAGE = "Unsupported data 
type in MaxBy: %s";
+  private static final String UNSUPPORTED_TYPE_MESSAGE = "Unsupported data 
type in MaxBy/MinBy: %s";
 
-  public MaxByAccumulator(TSDataType xDataType, TSDataType yDataType) {
+  protected MaxMinByBaseAccumulator(TSDataType xDataType, TSDataType 
yDataType) {
     this.xDataType = xDataType;
     this.yDataType = yDataType;
     this.xResult = TsPrimitiveType.getByType(xDataType);
-    this.yMaxValue = TsPrimitiveType.getByType(yDataType);
+    this.yExtremeValue = TsPrimitiveType.getByType(yDataType);
   }
 
   // Column should be like: | Time | x | y |
   @Override
   public void addInput(Column[] column, BitMap bitMap) {
-    checkArgument(column.length == 3, "Length of input Column[] for MaxBy 
should be 3");
+    checkArgument(column.length == 3, "Length of input Column[] for 
MaxBy/MinBy should be 3");
     switch (yDataType) {
       case INT32:
         addIntInput(column, bitMap);
@@ -90,7 +90,7 @@ public class MaxByAccumulator implements Accumulator {
   // partialResult should be like: | partialMaxByBinary |
   @Override
   public void addIntermediate(Column[] partialResult) {
-    checkArgument(partialResult.length == 1, "partialResult of MaxBy should be 
1");
+    checkArgument(partialResult.length == 1, "partialResult of MaxBy/MinBy 
should be 1");
     // Return if y is null.
     if (partialResult[0].isNull(0)) {
       return;
@@ -139,7 +139,7 @@ public class MaxByAccumulator implements Accumulator {
     initResult = false;
     xNull = true;
     this.xResult.reset();
-    this.yMaxValue.reset();
+    this.yExtremeValue.reset();
     yTimeStamp = Long.MAX_VALUE;
   }
 
@@ -170,13 +170,13 @@ public class MaxByAccumulator implements Accumulator {
     }
   }
 
-  private void updateIntResult(long time, int yMaxVal, Column xColumn, int 
xIndex) {
+  private void updateIntResult(long time, int yValue, Column xColumn, int 
xIndex) {
     if (!initResult
-        || yMaxVal > yMaxValue.getInt()
-        || (yMaxVal == yMaxValue.getInt() && time < yTimeStamp)) {
+        || check(yValue, yExtremeValue.getInt())
+        || (yValue == yExtremeValue.getInt() && time < yTimeStamp)) {
       initResult = true;
       yTimeStamp = time;
-      yMaxValue.setInt(yMaxVal);
+      yExtremeValue.setInt(yValue);
       updateX(xColumn, xIndex);
     }
   }
@@ -193,13 +193,13 @@ public class MaxByAccumulator implements Accumulator {
     }
   }
 
-  private void updateLongResult(long time, long yMaxVal, Column xColumn, int 
xIndex) {
+  private void updateLongResult(long time, long yValue, Column xColumn, int 
xIndex) {
     if (!initResult
-        || yMaxVal > yMaxValue.getLong()
-        || (yMaxVal == yMaxValue.getLong() && time < yTimeStamp)) {
+        || check(yValue, yExtremeValue.getLong())
+        || (yValue == yExtremeValue.getLong() && time < yTimeStamp)) {
       initResult = true;
       yTimeStamp = time;
-      yMaxValue.setLong(yMaxVal);
+      yExtremeValue.setLong(yValue);
       updateX(xColumn, xIndex);
     }
   }
@@ -216,13 +216,13 @@ public class MaxByAccumulator implements Accumulator {
     }
   }
 
-  private void updateFloatResult(long time, float yMaxVal, Column xColumn, int 
xIndex) {
+  private void updateFloatResult(long time, float yValue, Column xColumn, int 
xIndex) {
     if (!initResult
-        || yMaxVal > yMaxValue.getFloat()
-        || (yMaxVal == yMaxValue.getFloat() && time < yTimeStamp)) {
+        || check(yValue, yExtremeValue.getFloat())
+        || (yValue == yExtremeValue.getFloat() && time < yTimeStamp)) {
       initResult = true;
       yTimeStamp = time;
-      yMaxValue.setFloat(yMaxVal);
+      yExtremeValue.setFloat(yValue);
       updateX(xColumn, xIndex);
     }
   }
@@ -239,13 +239,13 @@ public class MaxByAccumulator implements Accumulator {
     }
   }
 
-  private void updateDoubleResult(long time, double yMaxVal, Column xColumn, 
int xIndex) {
+  private void updateDoubleResult(long time, double yValue, Column xColumn, 
int xIndex) {
     if (!initResult
-        || yMaxVal > yMaxValue.getDouble()
-        || (yMaxVal == yMaxValue.getDouble() && time < yTimeStamp)) {
+        || check(yValue, yExtremeValue.getDouble())
+        || (yValue == yExtremeValue.getDouble() && time < yTimeStamp)) {
       initResult = true;
       yTimeStamp = time;
-      yMaxValue.setDouble(yMaxVal);
+      yExtremeValue.setDouble(yValue);
       updateX(xColumn, xIndex);
     }
   }
@@ -315,7 +315,7 @@ public class MaxByAccumulator implements Accumulator {
     DataOutputStream dataOutputStream = new 
DataOutputStream(byteArrayOutputStream);
     try {
       dataOutputStream.writeLong(yTimeStamp);
-      writeIntermediateToStream(yDataType, yMaxValue, dataOutputStream);
+      writeIntermediateToStream(yDataType, yExtremeValue, dataOutputStream);
       dataOutputStream.writeBoolean(xNull);
       if (!xNull) {
         writeIntermediateToStream(xDataType, xResult, dataOutputStream);
@@ -425,4 +425,17 @@ public class MaxByAccumulator implements Accumulator {
       }
     }
   }
+
+  /**
+   * @param yValue Input y.
+   * @param yExtremeValue Current extreme value of y.
+   * @return True if yValue is the new extreme value.
+   */
+  protected abstract boolean check(int yValue, int yExtremeValue);
+
+  protected abstract boolean check(long yValue, long yExtremeValue);
+
+  protected abstract boolean check(float yValue, float yExtremeValue);
+
+  protected abstract boolean check(double yValue, double yExtremeValue);
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/MinByAccumulator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/MinByAccumulator.java
new file mode 100644
index 00000000000..0571efc6a34
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/MinByAccumulator.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.execution.aggregation;
+
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+
+public class MinByAccumulator extends MaxMinByBaseAccumulator {
+  protected MinByAccumulator(TSDataType xDataType, TSDataType yDataType) {
+    super(xDataType, yDataType);
+  }
+
+  @Override
+  protected boolean check(int yValue, int yExtremeValue) {
+    return yValue < yExtremeValue;
+  }
+
+  @Override
+  protected boolean check(long yValue, long yExtremeValue) {
+    return yValue < yExtremeValue;
+  }
+
+  @Override
+  protected boolean check(float yValue, float yExtremeValue) {
+    return yValue < yExtremeValue;
+  }
+
+  @Override
+  protected boolean check(double yValue, double yExtremeValue) {
+    return yValue < yExtremeValue;
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/slidingwindow/SlidingWindowAggregatorFactory.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/slidingwindow/SlidingWindowAggregatorFactory.java
index 5d958cfbe25..2293c3b38ad 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/slidingwindow/SlidingWindowAggregatorFactory.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/aggregation/slidingwindow/SlidingWindowAggregatorFactory.java
@@ -51,6 +51,9 @@ public class SlidingWindowAggregatorFactory {
   private static final Map<TSDataType, Comparator<Column>> maxByComparators =
       new EnumMap<>(TSDataType.class);
 
+  private static final Map<TSDataType, Comparator<Column>> minByComparators =
+      new EnumMap<>(TSDataType.class);
+
   static {
     // return a value greater than 0 if o1 is numerically greater than o2
     maxComparators.put(TSDataType.INT32, Comparator.comparingInt(o -> 
o.getInt(0)));
@@ -137,6 +140,34 @@ public class SlidingWindowAggregatorFactory {
         TSDataType.DOUBLE,
         Comparator.comparingDouble(
             o -> BytesUtils.bytesToDouble(o.getBinary(0).getValues(), 
Long.BYTES)));
+
+    // return a value greater than 0 if o1 is numerically less than o2
+    minByComparators.put(
+        TSDataType.INT32,
+        (o1, o2) ->
+            Integer.compare(
+                BytesUtils.bytesToInt(o2.getBinary(0).getValues(), Long.BYTES),
+                BytesUtils.bytesToInt(o1.getBinary(0).getValues(), 
Long.BYTES)));
+    minByComparators.put(
+        TSDataType.INT64,
+        (o1, o2) ->
+            Long.compare(
+                BytesUtils.bytesToLongFromOffset(
+                    o2.getBinary(0).getValues(), Long.BYTES, Long.BYTES),
+                BytesUtils.bytesToLongFromOffset(
+                    o1.getBinary(0).getValues(), Long.BYTES, Long.BYTES)));
+    minByComparators.put(
+        TSDataType.FLOAT,
+        (o1, o2) ->
+            Float.compare(
+                BytesUtils.bytesToFloat(o2.getBinary(0).getValues(), 
Long.BYTES),
+                BytesUtils.bytesToFloat(o1.getBinary(0).getValues(), 
Long.BYTES)));
+    minByComparators.put(
+        TSDataType.DOUBLE,
+        (o1, o2) ->
+            Double.compare(
+                BytesUtils.bytesToDouble(o2.getBinary(0).getValues(), 
Long.BYTES),
+                BytesUtils.bytesToDouble(o1.getBinary(0).getValues(), 
Long.BYTES)));
   }
 
   public static SlidingWindowAggregator createSlidingWindowAggregator(
@@ -192,6 +223,9 @@ public class SlidingWindowAggregatorFactory {
       case MAX_BY:
         return new MonotonicQueueSlidingWindowAggregator(
             accumulator, inputLocationList, step, 
maxByComparators.get(dataTypes.get(1)));
+      case MIN_BY:
+        return new MonotonicQueueSlidingWindowAggregator(
+            accumulator, inputLocationList, step, 
minByComparators.get(dataTypes.get(1)));
       case COUNT_IF:
         throw new SemanticException("COUNT_IF with slidingWindow is not 
supported now");
       case TIME_DURATION:
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ExpressionTypeAnalyzer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ExpressionTypeAnalyzer.java
index 3e3da458b7a..66dcc539851 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ExpressionTypeAnalyzer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ExpressionTypeAnalyzer.java
@@ -443,6 +443,7 @@ public class ExpressionTypeAnalyzer {
       case SqlConstant.VAR_POP:
       case SqlConstant.VAR_SAMP:
       case SqlConstant.MAX_BY:
+      case SqlConstant.MIN_BY:
         return expressionTypes.get(NodeRef.of(inputExpressions.get(0)));
       default:
         throw new IllegalArgumentException(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java
index d39498a70d7..3b93613c294 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java
@@ -2920,6 +2920,7 @@ public class ASTVisitor extends 
IoTDBSqlParserBaseVisitor<Statement> {
         return;
       case SqlConstant.COUNT_IF:
       case SqlConstant.MAX_BY:
+      case SqlConstant.MIN_BY:
         checkFunctionExpressionInputSize(
             functionExpression.getExpressionString(),
             functionExpression.getExpressions().size(),
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/AggregationDescriptor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/AggregationDescriptor.java
index 806d5ced160..69eb807711e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/AggregationDescriptor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/AggregationDescriptor.java
@@ -184,6 +184,9 @@ public class AggregationDescriptor {
         case MAX_BY:
           outputAggregationNames.add(addPartialSuffix(SqlConstant.MAX_BY));
           break;
+        case MIN_BY:
+          outputAggregationNames.add(addPartialSuffix(SqlConstant.MIN_BY));
+          break;
         case UDAF:
           outputAggregationNames.add(addPartialSuffix(aggregationFuncName));
           break;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/SchemaUtils.java 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/SchemaUtils.java
index 4cd94334460..533e1c7ab34 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/SchemaUtils.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/SchemaUtils.java
@@ -141,6 +141,7 @@ public class SchemaUtils {
       case SqlConstant.VAR_POP + "_partial":
       case SqlConstant.VAR_SAMP + "_partial":
       case SqlConstant.MAX_BY + "_partial":
+      case SqlConstant.MIN_BY + "_partial":
         return TSDataType.TEXT;
       case SqlConstant.LAST_VALUE:
       case SqlConstant.FIRST_VALUE:
@@ -148,6 +149,7 @@ public class SchemaUtils {
       case SqlConstant.MAX_VALUE:
       case SqlConstant.MODE:
       case SqlConstant.MAX_BY:
+      case SqlConstant.MIN_BY:
       default:
         return null;
     }
@@ -239,6 +241,7 @@ public class SchemaUtils {
       case VAR_POP:
       case VAR_SAMP:
       case MAX_BY:
+      case MIN_BY:
       case UDAF:
         return true;
       default:
@@ -275,6 +278,8 @@ public class SchemaUtils {
         return 
Collections.singletonList(addPartialSuffix(SqlConstant.VAR_SAMP));
       case MAX_BY:
         return Collections.singletonList(addPartialSuffix(SqlConstant.MAX_BY));
+      case MIN_BY:
+        return Collections.singletonList(addPartialSuffix(SqlConstant.MIN_BY));
       case AVG:
         return Arrays.asList(SqlConstant.COUNT, SqlConstant.SUM);
       case TIME_DURATION:
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/TypeInferenceUtils.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/TypeInferenceUtils.java
index b80ae5a7a8f..187225f622a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/TypeInferenceUtils.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/TypeInferenceUtils.java
@@ -144,6 +144,7 @@ public class TypeInferenceUtils {
       case SqlConstant.EXTREME:
       case SqlConstant.MODE:
       case SqlConstant.MAX_BY:
+      case SqlConstant.MIN_BY:
         return dataType;
       case SqlConstant.AVG:
       case SqlConstant.SUM:
@@ -191,6 +192,7 @@ public class TypeInferenceUtils {
       case SqlConstant.TIME_DURATION:
       case SqlConstant.MODE:
       case SqlConstant.MAX_BY:
+      case SqlConstant.MIN_BY:
         return;
       case SqlConstant.COUNT_IF:
         if (dataType != TSDataType.BOOLEAN) {
@@ -236,6 +238,7 @@ public class TypeInferenceUtils {
       case SqlConstant.VAR_POP:
       case SqlConstant.VAR_SAMP:
       case SqlConstant.MAX_BY:
+      case SqlConstant.MIN_BY:
         return;
       case SqlConstant.COUNT_IF:
         Expression keepExpression = inputExpressions.get(1);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/constant/SqlConstant.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/constant/SqlConstant.java
index 686476ff6a0..8521e3ad2d1 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/constant/SqlConstant.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/constant/SqlConstant.java
@@ -48,6 +48,7 @@ public class SqlConstant {
   public static final String MAX_VALUE = "max_value";
   public static final String MIN_VALUE = "min_value";
   public static final String MAX_BY = "max_by";
+  public static final String MIN_BY = "min_by";
   public static final String EXTREME = "extreme";
   public static final String FIRST_VALUE = "first_value";
   public static final String LAST_VALUE = "last_value";
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/constant/TestConstant.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/constant/TestConstant.java
index 12d881ad760..265e9184e1b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/constant/TestConstant.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/constant/TestConstant.java
@@ -79,6 +79,10 @@ public class TestConstant {
     return String.format("max_by(%s, %s)", x, y);
   }
 
+  public static String minBy(String x, String y) {
+    return String.format("min_by(%s, %s)", x, y);
+  }
+
   private TestConstant() {}
 
   public static String getTestTsFilePath(
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/aggregation/AccumulatorTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/aggregation/AccumulatorTest.java
index 68326f23958..ca8d5eb0afd 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/aggregation/AccumulatorTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/aggregation/AccumulatorTest.java
@@ -909,4 +909,37 @@ public class AccumulatorTest {
     maxByAccumulator.outputFinal(finalResult);
     Assert.assertEquals(-99, finalResult.build().getInt(0));
   }
+
+  @Test
+  public void minByAccumulatorTest() {
+    Accumulator minByAccumulator =
+        AccumulatorFactory.createBuiltinAccumulator(
+            TAggregationType.MIN_BY,
+            Arrays.asList(TSDataType.INT32, TSDataType.DOUBLE),
+            Collections.emptyList(),
+            Collections.emptyMap(),
+            true);
+    Assert.assertEquals(TSDataType.TEXT, 
minByAccumulator.getIntermediateType()[0]);
+    Assert.assertEquals(TSDataType.INT32, minByAccumulator.getFinalType());
+    // Returns null if there's no data
+    ColumnBuilder[] intermediateResult = new ColumnBuilder[1];
+    intermediateResult[0] = new BinaryColumnBuilder(null, 1);
+    minByAccumulator.outputIntermediate(intermediateResult);
+    Assert.assertTrue(intermediateResult[0].build().isNull(0));
+    ColumnBuilder finalResult = new IntColumnBuilder(null, 1);
+    minByAccumulator.outputFinal(finalResult);
+    Assert.assertTrue(finalResult.build().isNull(0));
+
+    Column[] timeAndValueColumn = getTimeAndTwoValueColumns(1, 0);
+    minByAccumulator.addInput(timeAndValueColumn, null);
+    Assert.assertFalse(minByAccumulator.hasFinalResult());
+    intermediateResult[0] = new BinaryColumnBuilder(null, 1);
+    minByAccumulator.outputIntermediate(intermediateResult);
+
+    // add intermediate result as input
+    minByAccumulator.addIntermediate(new Column[] 
{intermediateResult[0].build()});
+    finalResult = new IntColumnBuilder(null, 1);
+    minByAccumulator.outputFinal(finalResult);
+    Assert.assertEquals(0, finalResult.build().getInt(0));
+  }
 }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/BuiltinAggregationFunction.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/BuiltinAggregationFunction.java
index c551b02bd8e..72bd4ac51af 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/BuiltinAggregationFunction.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/BuiltinAggregationFunction.java
@@ -45,7 +45,8 @@ public enum BuiltinAggregationFunction {
   VARIANCE("variance"),
   VAR_POP("var_pop"),
   VAR_SAMP("var_samp"),
-  MAX_BY("max_by");
+  MAX_BY("max_by"),
+  MIN_BY("min_by");
 
   private final String functionName;
 
@@ -93,6 +94,7 @@ public enum BuiltinAggregationFunction {
       case "var_pop":
       case "var_samp":
       case "max_by":
+      case "min_by":
         return false;
       default:
         throw new IllegalArgumentException("Invalid Aggregation function: " + 
name);
@@ -124,6 +126,7 @@ public enum BuiltinAggregationFunction {
       case "var_pop":
       case "var_samp":
       case "max_by":
+      case "min_by":
         return true;
       case "count_if":
       case "count_time":
diff --git a/iotdb-protocol/thrift-commons/src/main/thrift/common.thrift 
b/iotdb-protocol/thrift-commons/src/main/thrift/common.thrift
index b0ad612bdd1..ba5a2da6b09 100644
--- a/iotdb-protocol/thrift-commons/src/main/thrift/common.thrift
+++ b/iotdb-protocol/thrift-commons/src/main/thrift/common.thrift
@@ -199,5 +199,6 @@ enum TAggregationType {
   VAR_POP,
   VAR_SAMP,
   MAX_BY,
+  MIN_BY,
   UDAF
 }
\ No newline at end of file

Reply via email to