This is an automated email from the ASF dual-hosted git repository.

caogaofei pushed a commit to branch fix_last_by_max_time
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 3eb07fd3880e9babd35d568093d4d5e249b5b45f
Author: Beyyes <[email protected]>
AuthorDate: Mon Nov 25 11:04:09 2024 +0800

    optimize first, last, first_by, last_by aggregation process
---
 .../relational/aggregation/FirstAccumulator.java   | 19 ++++---
 .../relational/aggregation/FirstByAccumulator.java | 19 ++++---
 .../aggregation/FirstByDescAccumulator.java        | 55 +++++++++++++++++++
 .../aggregation/FirstDescAccumulator.java          | 55 +++++++++++++++++++
 .../relational/aggregation/LastAccumulator.java    | 13 +++--
 .../relational/aggregation/LastByAccumulator.java  | 16 +++---
 .../aggregation/LastByDescAccumulator.java         | 62 ++++++++++++++++++++++
 .../aggregation/LastDescAccumulator.java           | 61 +++++++++++++++++++++
 8 files changed, 271 insertions(+), 29 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/FirstAccumulator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/FirstAccumulator.java
index 2cdd9b8edfc..8b364c58fbe 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/FirstAccumulator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/FirstAccumulator.java
@@ -245,11 +245,11 @@ public class FirstAccumulator implements TableAccumulator 
{
     this.firstValue.reset();
   }
 
-  private void addIntInput(Column valueColumn, Column timeColumn) {
-    // TODO can add first position optimization if first position is null ?
+  protected void addIntInput(Column valueColumn, Column timeColumn) {
     for (int i = 0; i < valueColumn.getPositionCount(); i++) {
       if (!valueColumn.isNull(i)) {
         updateIntFirstValue(valueColumn.getInt(i), timeColumn.getLong(i));
+        return;
       }
     }
   }
@@ -262,10 +262,11 @@ public class FirstAccumulator implements TableAccumulator 
{
     }
   }
 
-  private void addLongInput(Column valueColumn, Column timeColumn) {
+  protected void addLongInput(Column valueColumn, Column timeColumn) {
     for (int i = 0; i < valueColumn.getPositionCount(); i++) {
       if (!valueColumn.isNull(i)) {
         updateLongFirstValue(valueColumn.getLong(i), timeColumn.getLong(i));
+        return;
       }
     }
   }
@@ -278,10 +279,11 @@ public class FirstAccumulator implements TableAccumulator 
{
     }
   }
 
-  private void addFloatInput(Column valueColumn, Column timeColumn) {
+  protected void addFloatInput(Column valueColumn, Column timeColumn) {
     for (int i = 0; i < valueColumn.getPositionCount(); i++) {
       if (!valueColumn.isNull(i)) {
         updateFloatFirstValue(valueColumn.getFloat(i), timeColumn.getLong(i));
+        return;
       }
     }
   }
@@ -294,10 +296,11 @@ public class FirstAccumulator implements TableAccumulator 
{
     }
   }
 
-  private void addDoubleInput(Column valueColumn, Column timeColumn) {
+  protected void addDoubleInput(Column valueColumn, Column timeColumn) {
     for (int i = 0; i < valueColumn.getPositionCount(); i++) {
       if (!valueColumn.isNull(i)) {
         updateDoubleFirstValue(valueColumn.getDouble(i), 
timeColumn.getLong(i));
+        return;
       }
     }
   }
@@ -310,10 +313,11 @@ public class FirstAccumulator implements TableAccumulator 
{
     }
   }
 
-  private void addBinaryInput(Column valueColumn, Column timeColumn) {
+  protected void addBinaryInput(Column valueColumn, Column timeColumn) {
     for (int i = 0; i < valueColumn.getPositionCount(); i++) {
       if (!valueColumn.isNull(i)) {
         updateBinaryFirstValue(valueColumn.getBinary(i), 
timeColumn.getLong(i));
+        return;
       }
     }
   }
@@ -326,10 +330,11 @@ public class FirstAccumulator implements TableAccumulator 
{
     }
   }
 
-  private void addBooleanInput(Column valueColumn, Column timeColumn) {
+  protected void addBooleanInput(Column valueColumn, Column timeColumn) {
     for (int i = 0; i < valueColumn.getPositionCount(); i++) {
       if (!valueColumn.isNull(i)) {
         updateBooleanFirstValue(valueColumn.getBoolean(i), 
timeColumn.getLong(i));
+        return;
       }
     }
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/FirstByAccumulator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/FirstByAccumulator.java
index 44eb6553c70..096091ccdb2 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/FirstByAccumulator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/FirstByAccumulator.java
@@ -310,11 +310,11 @@ public class FirstByAccumulator implements 
TableAccumulator {
     this.xResult.reset();
   }
 
-  // TODO can add first position optimization if first position is null ?
-  private void addIntInput(Column xColumn, Column yColumn, Column timeColumn) {
+  protected void addIntInput(Column xColumn, Column yColumn, Column 
timeColumn) {
     for (int i = 0; i < yColumn.getPositionCount(); i++) {
       if (!yColumn.isNull(i)) {
         updateIntFirstValue(xColumn, i, timeColumn.getLong(i));
+        return;
       }
     }
   }
@@ -341,10 +341,11 @@ public class FirstByAccumulator implements 
TableAccumulator {
     }
   }
 
-  private void addLongInput(Column xColumn, Column yColumn, Column timeColumn) 
{
+  protected void addLongInput(Column xColumn, Column yColumn, Column 
timeColumn) {
     for (int i = 0; i < yColumn.getPositionCount(); i++) {
       if (!yColumn.isNull(i)) {
         updateLongFirstValue(xColumn, i, timeColumn.getLong(i));
+        return;
       }
     }
   }
@@ -371,10 +372,11 @@ public class FirstByAccumulator implements 
TableAccumulator {
     }
   }
 
-  private void addFloatInput(Column xColumn, Column yColumn, Column 
timeColumn) {
+  protected void addFloatInput(Column xColumn, Column yColumn, Column 
timeColumn) {
     for (int i = 0; i < yColumn.getPositionCount(); i++) {
       if (!yColumn.isNull(i)) {
         updateFloatFirstValue(xColumn, i, timeColumn.getLong(i));
+        return;
       }
     }
   }
@@ -401,10 +403,11 @@ public class FirstByAccumulator implements 
TableAccumulator {
     }
   }
 
-  private void addDoubleInput(Column xColumn, Column yColumn, Column 
timeColumn) {
+  protected void addDoubleInput(Column xColumn, Column yColumn, Column 
timeColumn) {
     for (int i = 0; i < yColumn.getPositionCount(); i++) {
       if (!yColumn.isNull(i)) {
         updateDoubleFirstValue(xColumn, i, timeColumn.getLong(i));
+        return;
       }
     }
   }
@@ -431,10 +434,11 @@ public class FirstByAccumulator implements 
TableAccumulator {
     }
   }
 
-  private void addBinaryInput(Column xColumn, Column yColumn, Column 
timeColumn) {
+  protected void addBinaryInput(Column xColumn, Column yColumn, Column 
timeColumn) {
     for (int i = 0; i < yColumn.getPositionCount(); i++) {
       if (!yColumn.isNull(i)) {
         updateBinaryFirstValue(xColumn, i, timeColumn.getLong(i));
+        return;
       }
     }
   }
@@ -461,10 +465,11 @@ public class FirstByAccumulator implements 
TableAccumulator {
     }
   }
 
-  private void addBooleanInput(Column xColumn, Column yColumn, Column 
timeColumn) {
+  protected void addBooleanInput(Column xColumn, Column yColumn, Column 
timeColumn) {
     for (int i = 0; i < yColumn.getPositionCount(); i++) {
       if (!yColumn.isNull(i)) {
         updateBooleanFirstValue(xColumn, i, timeColumn.getLong(i));
+        return;
       }
     }
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/FirstByDescAccumulator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/FirstByDescAccumulator.java
index 716193da2a0..7f9340e1b3b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/FirstByDescAccumulator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/FirstByDescAccumulator.java
@@ -19,6 +19,7 @@
 
 package 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation;
 
+import org.apache.tsfile.block.column.Column;
 import org.apache.tsfile.enums.TSDataType;
 
 public class FirstByDescAccumulator extends FirstByAccumulator {
@@ -32,4 +33,58 @@ public class FirstByDescAccumulator extends 
FirstByAccumulator {
   public boolean hasFinalResult() {
     return false;
   }
+
+  @Override
+  protected void addIntInput(Column xColumn, Column yColumn, Column 
timeColumn) {
+    for (int i = 0; i < yColumn.getPositionCount(); i++) {
+      if (!yColumn.isNull(i)) {
+        updateIntFirstValue(xColumn, i, timeColumn.getLong(i));
+      }
+    }
+  }
+
+  @Override
+  protected void addLongInput(Column xColumn, Column yColumn, Column 
timeColumn) {
+    for (int i = 0; i < yColumn.getPositionCount(); i++) {
+      if (!yColumn.isNull(i)) {
+        updateLongFirstValue(xColumn, i, timeColumn.getLong(i));
+      }
+    }
+  }
+
+  @Override
+  protected void addFloatInput(Column xColumn, Column yColumn, Column 
timeColumn) {
+    for (int i = 0; i < yColumn.getPositionCount(); i++) {
+      if (!yColumn.isNull(i)) {
+        updateFloatFirstValue(xColumn, i, timeColumn.getLong(i));
+      }
+    }
+  }
+
+  @Override
+  protected void addDoubleInput(Column xColumn, Column yColumn, Column 
timeColumn) {
+    for (int i = 0; i < yColumn.getPositionCount(); i++) {
+      if (!yColumn.isNull(i)) {
+        updateDoubleFirstValue(xColumn, i, timeColumn.getLong(i));
+      }
+    }
+  }
+
+  @Override
+  protected void addBinaryInput(Column xColumn, Column yColumn, Column 
timeColumn) {
+    for (int i = 0; i < yColumn.getPositionCount(); i++) {
+      if (!yColumn.isNull(i)) {
+        updateBinaryFirstValue(xColumn, i, timeColumn.getLong(i));
+      }
+    }
+  }
+
+  @Override
+  protected void addBooleanInput(Column xColumn, Column yColumn, Column 
timeColumn) {
+    for (int i = 0; i < yColumn.getPositionCount(); i++) {
+      if (!yColumn.isNull(i)) {
+        updateBooleanFirstValue(xColumn, i, timeColumn.getLong(i));
+      }
+    }
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/FirstDescAccumulator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/FirstDescAccumulator.java
index d9e41270878..2e2982918e2 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/FirstDescAccumulator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/FirstDescAccumulator.java
@@ -19,6 +19,7 @@
 
 package 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation;
 
+import org.apache.tsfile.block.column.Column;
 import org.apache.tsfile.enums.TSDataType;
 
 public class FirstDescAccumulator extends FirstAccumulator {
@@ -31,4 +32,58 @@ public class FirstDescAccumulator extends FirstAccumulator {
   public boolean hasFinalResult() {
     return false;
   }
+
+  @Override
+  protected void addIntInput(Column valueColumn, Column timeColumn) {
+    for (int i = 0; i < valueColumn.getPositionCount(); i++) {
+      if (!valueColumn.isNull(i)) {
+        updateIntFirstValue(valueColumn.getInt(i), timeColumn.getLong(i));
+      }
+    }
+  }
+
+  @Override
+  protected void addLongInput(Column valueColumn, Column timeColumn) {
+    for (int i = 0; i < valueColumn.getPositionCount(); i++) {
+      if (!valueColumn.isNull(i)) {
+        updateLongFirstValue(valueColumn.getLong(i), timeColumn.getLong(i));
+      }
+    }
+  }
+
+  @Override
+  protected void addFloatInput(Column valueColumn, Column timeColumn) {
+    for (int i = 0; i < valueColumn.getPositionCount(); i++) {
+      if (!valueColumn.isNull(i)) {
+        updateFloatFirstValue(valueColumn.getFloat(i), timeColumn.getLong(i));
+      }
+    }
+  }
+
+  @Override
+  protected void addDoubleInput(Column valueColumn, Column timeColumn) {
+    for (int i = 0; i < valueColumn.getPositionCount(); i++) {
+      if (!valueColumn.isNull(i)) {
+        updateDoubleFirstValue(valueColumn.getDouble(i), 
timeColumn.getLong(i));
+      }
+    }
+  }
+
+  @Override
+  protected void addBinaryInput(Column valueColumn, Column timeColumn) {
+    for (int i = 0; i < valueColumn.getPositionCount(); i++) {
+      if (!valueColumn.isNull(i)) {
+        updateBinaryFirstValue(valueColumn.getBinary(i), 
timeColumn.getLong(i));
+      }
+    }
+  }
+
+  @Override
+  protected void addBooleanInput(Column valueColumn, Column timeColumn) {
+    for (int i = 0; i < valueColumn.getPositionCount(); i++) {
+      if (!valueColumn.isNull(i)) {
+        updateBooleanFirstValue(valueColumn.getBoolean(i), 
timeColumn.getLong(i));
+      }
+    }
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/LastAccumulator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/LastAccumulator.java
index 34eee287ad6..ab9f876d7e3 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/LastAccumulator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/LastAccumulator.java
@@ -243,8 +243,7 @@ public class LastAccumulator implements TableAccumulator {
     this.lastValue.reset();
   }
 
-  private void addIntInput(Column valueColumn, Column timeColumn) {
-    // TODO can add last position optimization if last position is null ?
+  protected void addIntInput(Column valueColumn, Column timeColumn) {
     for (int i = 0; i < valueColumn.getPositionCount(); i++) {
       if (!valueColumn.isNull(i)) {
         updateIntLastValue(valueColumn.getInt(i), timeColumn.getLong(i));
@@ -260,7 +259,7 @@ public class LastAccumulator implements TableAccumulator {
     }
   }
 
-  private void addLongInput(Column valueColumn, Column timeColumn) {
+  protected void addLongInput(Column valueColumn, Column timeColumn) {
     for (int i = 0; i < valueColumn.getPositionCount(); i++) {
       if (!valueColumn.isNull(i)) {
         updateLongLastValue(valueColumn.getLong(i), timeColumn.getLong(i));
@@ -276,7 +275,7 @@ public class LastAccumulator implements TableAccumulator {
     }
   }
 
-  private void addFloatInput(Column valueColumn, Column timeColumn) {
+  protected void addFloatInput(Column valueColumn, Column timeColumn) {
     for (int i = 0; i < valueColumn.getPositionCount(); i++) {
       if (!valueColumn.isNull(i)) {
         updateFloatLastValue(valueColumn.getFloat(i), timeColumn.getLong(i));
@@ -292,7 +291,7 @@ public class LastAccumulator implements TableAccumulator {
     }
   }
 
-  private void addDoubleInput(Column valueColumn, Column timeColumn) {
+  protected void addDoubleInput(Column valueColumn, Column timeColumn) {
     for (int i = 0; i < valueColumn.getPositionCount(); i++) {
       if (!valueColumn.isNull(i)) {
         updateDoubleLastValue(valueColumn.getDouble(i), timeColumn.getLong(i));
@@ -308,7 +307,7 @@ public class LastAccumulator implements TableAccumulator {
     }
   }
 
-  private void addBinaryInput(Column valueColumn, Column timeColumn) {
+  protected void addBinaryInput(Column valueColumn, Column timeColumn) {
     for (int i = 0; i < valueColumn.getPositionCount(); i++) {
       if (!valueColumn.isNull(i)) {
         updateBinaryLastValue(valueColumn.getBinary(i), timeColumn.getLong(i));
@@ -324,7 +323,7 @@ public class LastAccumulator implements TableAccumulator {
     }
   }
 
-  private void addBooleanInput(Column valueColumn, Column timeColumn) {
+  protected void addBooleanInput(Column valueColumn, Column timeColumn) {
     for (int i = 0; i < valueColumn.getPositionCount(); i++) {
       if (!valueColumn.isNull(i)) {
         updateBooleanLastValue(valueColumn.getBoolean(i), 
timeColumn.getLong(i));
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/LastByAccumulator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/LastByAccumulator.java
index f03a3ab6f77..d06f9413933 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/LastByAccumulator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/LastByAccumulator.java
@@ -42,8 +42,8 @@ public class LastByAccumulator implements TableAccumulator {
   private static final long INSTANCE_SIZE =
       RamUsageEstimator.shallowSizeOfInstance(LastByAccumulator.class);
 
-  private final TSDataType xDataType;
-  private final TSDataType yDataType;
+  protected final TSDataType xDataType;
+  protected final TSDataType yDataType;
 
   private final boolean xIsTimeColumn;
   private final boolean yIsTimeColumn;
@@ -311,7 +311,7 @@ public class LastByAccumulator implements TableAccumulator {
   }
 
   // TODO can add last position optimization if last position is null ?
-  private void addIntInput(Column xColumn, Column yColumn, Column timeColumn) {
+  protected void addIntInput(Column xColumn, Column yColumn, Column 
timeColumn) {
     for (int i = 0; i < yColumn.getPositionCount(); i++) {
       if (!yColumn.isNull(i)) {
         updateIntLastValue(xColumn, i, timeColumn.getLong(i));
@@ -341,7 +341,7 @@ public class LastByAccumulator implements TableAccumulator {
     }
   }
 
-  private void addLongInput(Column xColumn, Column yColumn, Column timeColumn) 
{
+  protected void addLongInput(Column xColumn, Column yColumn, Column 
timeColumn) {
     for (int i = 0; i < yColumn.getPositionCount(); i++) {
       if (!yColumn.isNull(i)) {
         updateLongLastValue(xColumn, i, timeColumn.getLong(i));
@@ -371,7 +371,7 @@ public class LastByAccumulator implements TableAccumulator {
     }
   }
 
-  private void addFloatInput(Column xColumn, Column yColumn, Column 
timeColumn) {
+  protected void addFloatInput(Column xColumn, Column yColumn, Column 
timeColumn) {
     for (int i = 0; i < yColumn.getPositionCount(); i++) {
       if (!yColumn.isNull(i)) {
         updateFloatLastValue(xColumn, i, timeColumn.getLong(i));
@@ -401,7 +401,7 @@ public class LastByAccumulator implements TableAccumulator {
     }
   }
 
-  private void addDoubleInput(Column xColumn, Column yColumn, Column 
timeColumn) {
+  protected void addDoubleInput(Column xColumn, Column yColumn, Column 
timeColumn) {
     for (int i = 0; i < yColumn.getPositionCount(); i++) {
       if (!yColumn.isNull(i)) {
         updateDoubleLastValue(xColumn, i, timeColumn.getLong(i));
@@ -431,7 +431,7 @@ public class LastByAccumulator implements TableAccumulator {
     }
   }
 
-  private void addBinaryInput(Column xColumn, Column yColumn, Column 
timeColumn) {
+  protected void addBinaryInput(Column xColumn, Column yColumn, Column 
timeColumn) {
     for (int i = 0; i < yColumn.getPositionCount(); i++) {
       if (!yColumn.isNull(i)) {
         updateBinaryLastValue(xColumn, i, timeColumn.getLong(i));
@@ -461,7 +461,7 @@ public class LastByAccumulator implements TableAccumulator {
     }
   }
 
-  private void addBooleanInput(Column xColumn, Column yColumn, Column 
timeColumn) {
+  protected void addBooleanInput(Column xColumn, Column yColumn, Column 
timeColumn) {
     for (int i = 0; i < yColumn.getPositionCount(); i++) {
       if (!yColumn.isNull(i)) {
         updateBooleanLastValue(xColumn, i, timeColumn.getLong(i));
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/LastByDescAccumulator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/LastByDescAccumulator.java
index f858d01d764..a44b41d6223 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/LastByDescAccumulator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/LastByDescAccumulator.java
@@ -19,9 +19,11 @@
 
 package 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation;
 
+import org.apache.tsfile.block.column.Column;
 import org.apache.tsfile.enums.TSDataType;
 
 public class LastByDescAccumulator extends LastByAccumulator {
+
   public LastByDescAccumulator(
       TSDataType xDataType, TSDataType yDataType, boolean xIsTimeColumn, 
boolean yIsTimeColumn) {
     super(xDataType, yDataType, xIsTimeColumn, yIsTimeColumn);
@@ -31,4 +33,64 @@ public class LastByDescAccumulator extends LastByAccumulator 
{
   public boolean hasFinalResult() {
     return initResult;
   }
+
+  @Override
+  protected void addIntInput(Column xColumn, Column yColumn, Column 
timeColumn) {
+    for (int i = 0; i < yColumn.getPositionCount(); i++) {
+      if (!yColumn.isNull(i)) {
+        updateIntLastValue(xColumn, i, timeColumn.getLong(i));
+        return;
+      }
+    }
+  }
+
+  @Override
+  protected void addLongInput(Column xColumn, Column yColumn, Column 
timeColumn) {
+    for (int i = 0; i < yColumn.getPositionCount(); i++) {
+      if (!yColumn.isNull(i)) {
+        updateLongLastValue(xColumn, i, timeColumn.getLong(i));
+        return;
+      }
+    }
+  }
+
+  @Override
+  protected void addFloatInput(Column xColumn, Column yColumn, Column 
timeColumn) {
+    for (int i = 0; i < yColumn.getPositionCount(); i++) {
+      if (!yColumn.isNull(i)) {
+        updateFloatLastValue(xColumn, i, timeColumn.getLong(i));
+        return;
+      }
+    }
+  }
+
+  @Override
+  protected void addDoubleInput(Column xColumn, Column yColumn, Column 
timeColumn) {
+    for (int i = 0; i < yColumn.getPositionCount(); i++) {
+      if (!yColumn.isNull(i)) {
+        updateDoubleLastValue(xColumn, i, timeColumn.getLong(i));
+        return;
+      }
+    }
+  }
+
+  @Override
+  protected void addBinaryInput(Column xColumn, Column yColumn, Column 
timeColumn) {
+    for (int i = 0; i < yColumn.getPositionCount(); i++) {
+      if (!yColumn.isNull(i)) {
+        updateBinaryLastValue(xColumn, i, timeColumn.getLong(i));
+        return;
+      }
+    }
+  }
+
+  @Override
+  protected void addBooleanInput(Column xColumn, Column yColumn, Column 
timeColumn) {
+    for (int i = 0; i < yColumn.getPositionCount(); i++) {
+      if (!yColumn.isNull(i)) {
+        updateBooleanLastValue(xColumn, i, timeColumn.getLong(i));
+        return;
+      }
+    }
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/LastDescAccumulator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/LastDescAccumulator.java
index 3641d5e23f2..bc0e97f9b7b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/LastDescAccumulator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/LastDescAccumulator.java
@@ -19,6 +19,7 @@
 
 package 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation;
 
+import org.apache.tsfile.block.column.Column;
 import org.apache.tsfile.enums.TSDataType;
 
 public class LastDescAccumulator extends LastAccumulator {
@@ -31,4 +32,64 @@ public class LastDescAccumulator extends LastAccumulator {
   public boolean hasFinalResult() {
     return initResult;
   }
+
+  @Override
+  protected void addIntInput(Column valueColumn, Column timeColumn) {
+    for (int i = 0; i < valueColumn.getPositionCount(); i++) {
+      if (!valueColumn.isNull(i)) {
+        updateIntLastValue(valueColumn.getInt(i), timeColumn.getLong(i));
+        return;
+      }
+    }
+  }
+
+  @Override
+  protected void addLongInput(Column valueColumn, Column timeColumn) {
+    for (int i = 0; i < valueColumn.getPositionCount(); i++) {
+      if (!valueColumn.isNull(i)) {
+        updateLongLastValue(valueColumn.getLong(i), timeColumn.getLong(i));
+        return;
+      }
+    }
+  }
+
+  @Override
+  protected void addFloatInput(Column valueColumn, Column timeColumn) {
+    for (int i = 0; i < valueColumn.getPositionCount(); i++) {
+      if (!valueColumn.isNull(i)) {
+        updateFloatLastValue(valueColumn.getFloat(i), timeColumn.getLong(i));
+        return;
+      }
+    }
+  }
+
+  @Override
+  protected void addDoubleInput(Column valueColumn, Column timeColumn) {
+    for (int i = 0; i < valueColumn.getPositionCount(); i++) {
+      if (!valueColumn.isNull(i)) {
+        updateDoubleLastValue(valueColumn.getDouble(i), timeColumn.getLong(i));
+        return;
+      }
+    }
+  }
+
+  @Override
+  protected void addBinaryInput(Column valueColumn, Column timeColumn) {
+    for (int i = 0; i < valueColumn.getPositionCount(); i++) {
+      if (!valueColumn.isNull(i)) {
+        updateBinaryLastValue(valueColumn.getBinary(i), timeColumn.getLong(i));
+        return;
+      }
+    }
+  }
+
+  @Override
+  protected void addBooleanInput(Column valueColumn, Column timeColumn) {
+    for (int i = 0; i < valueColumn.getPositionCount(); i++) {
+      if (!valueColumn.isNull(i)) {
+        updateBooleanLastValue(valueColumn.getBoolean(i), 
timeColumn.getLong(i));
+        return;
+      }
+    }
+  }
 }

Reply via email to