This is an automated email from the ASF dual-hosted git repository. caogaofei pushed a commit to branch fix_last_by_max_time in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 3eb07fd3880e9babd35d568093d4d5e249b5b45f Author: Beyyes <[email protected]> AuthorDate: Mon Nov 25 11:04:09 2024 +0800 optimize first, last, first_by, last_by aggregation process --- .../relational/aggregation/FirstAccumulator.java | 19 ++++--- .../relational/aggregation/FirstByAccumulator.java | 19 ++++--- .../aggregation/FirstByDescAccumulator.java | 55 +++++++++++++++++++ .../aggregation/FirstDescAccumulator.java | 55 +++++++++++++++++++ .../relational/aggregation/LastAccumulator.java | 13 +++-- .../relational/aggregation/LastByAccumulator.java | 16 +++--- .../aggregation/LastByDescAccumulator.java | 62 ++++++++++++++++++++++ .../aggregation/LastDescAccumulator.java | 61 +++++++++++++++++++++ 8 files changed, 271 insertions(+), 29 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/FirstAccumulator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/FirstAccumulator.java index 2cdd9b8edfc..8b364c58fbe 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/FirstAccumulator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/FirstAccumulator.java @@ -245,11 +245,11 @@ public class FirstAccumulator implements TableAccumulator { this.firstValue.reset(); } - private void addIntInput(Column valueColumn, Column timeColumn) { - // TODO can add first position optimization if first position is null ? + protected void addIntInput(Column valueColumn, Column timeColumn) { for (int i = 0; i < valueColumn.getPositionCount(); i++) { if (!valueColumn.isNull(i)) { updateIntFirstValue(valueColumn.getInt(i), timeColumn.getLong(i)); + return; } } } @@ -262,10 +262,11 @@ public class FirstAccumulator implements TableAccumulator { } } - private void addLongInput(Column valueColumn, Column timeColumn) { + protected void addLongInput(Column valueColumn, Column timeColumn) { for (int i = 0; i < valueColumn.getPositionCount(); i++) { if (!valueColumn.isNull(i)) { updateLongFirstValue(valueColumn.getLong(i), timeColumn.getLong(i)); + return; } } } @@ -278,10 +279,11 @@ public class FirstAccumulator implements TableAccumulator { } } - private void addFloatInput(Column valueColumn, Column timeColumn) { + protected void addFloatInput(Column valueColumn, Column timeColumn) { for (int i = 0; i < valueColumn.getPositionCount(); i++) { if (!valueColumn.isNull(i)) { updateFloatFirstValue(valueColumn.getFloat(i), timeColumn.getLong(i)); + return; } } } @@ -294,10 +296,11 @@ public class FirstAccumulator implements TableAccumulator { } } - private void addDoubleInput(Column valueColumn, Column timeColumn) { + protected void addDoubleInput(Column valueColumn, Column timeColumn) { for (int i = 0; i < valueColumn.getPositionCount(); i++) { if (!valueColumn.isNull(i)) { updateDoubleFirstValue(valueColumn.getDouble(i), timeColumn.getLong(i)); + return; } } } @@ -310,10 +313,11 @@ public class FirstAccumulator implements TableAccumulator { } } - private void addBinaryInput(Column valueColumn, Column timeColumn) { + protected void addBinaryInput(Column valueColumn, Column timeColumn) { for (int i = 0; i < valueColumn.getPositionCount(); i++) { if (!valueColumn.isNull(i)) { updateBinaryFirstValue(valueColumn.getBinary(i), timeColumn.getLong(i)); + return; } } } @@ -326,10 +330,11 @@ public class FirstAccumulator implements TableAccumulator { } } - private void addBooleanInput(Column valueColumn, Column timeColumn) { + protected void addBooleanInput(Column valueColumn, Column timeColumn) { for (int i = 0; i < valueColumn.getPositionCount(); i++) { if (!valueColumn.isNull(i)) { updateBooleanFirstValue(valueColumn.getBoolean(i), timeColumn.getLong(i)); + return; } } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/FirstByAccumulator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/FirstByAccumulator.java index 44eb6553c70..096091ccdb2 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/FirstByAccumulator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/FirstByAccumulator.java @@ -310,11 +310,11 @@ public class FirstByAccumulator implements TableAccumulator { this.xResult.reset(); } - // TODO can add first position optimization if first position is null ? - private void addIntInput(Column xColumn, Column yColumn, Column timeColumn) { + protected void addIntInput(Column xColumn, Column yColumn, Column timeColumn) { for (int i = 0; i < yColumn.getPositionCount(); i++) { if (!yColumn.isNull(i)) { updateIntFirstValue(xColumn, i, timeColumn.getLong(i)); + return; } } } @@ -341,10 +341,11 @@ public class FirstByAccumulator implements TableAccumulator { } } - private void addLongInput(Column xColumn, Column yColumn, Column timeColumn) { + protected void addLongInput(Column xColumn, Column yColumn, Column timeColumn) { for (int i = 0; i < yColumn.getPositionCount(); i++) { if (!yColumn.isNull(i)) { updateLongFirstValue(xColumn, i, timeColumn.getLong(i)); + return; } } } @@ -371,10 +372,11 @@ public class FirstByAccumulator implements TableAccumulator { } } - private void addFloatInput(Column xColumn, Column yColumn, Column timeColumn) { + protected void addFloatInput(Column xColumn, Column yColumn, Column timeColumn) { for (int i = 0; i < yColumn.getPositionCount(); i++) { if (!yColumn.isNull(i)) { updateFloatFirstValue(xColumn, i, timeColumn.getLong(i)); + return; } } } @@ -401,10 +403,11 @@ public class FirstByAccumulator implements TableAccumulator { } } - private void addDoubleInput(Column xColumn, Column yColumn, Column timeColumn) { + protected void addDoubleInput(Column xColumn, Column yColumn, Column timeColumn) { for (int i = 0; i < yColumn.getPositionCount(); i++) { if (!yColumn.isNull(i)) { updateDoubleFirstValue(xColumn, i, timeColumn.getLong(i)); + return; } } } @@ -431,10 +434,11 @@ public class FirstByAccumulator implements TableAccumulator { } } - private void addBinaryInput(Column xColumn, Column yColumn, Column timeColumn) { + protected void addBinaryInput(Column xColumn, Column yColumn, Column timeColumn) { for (int i = 0; i < yColumn.getPositionCount(); i++) { if (!yColumn.isNull(i)) { updateBinaryFirstValue(xColumn, i, timeColumn.getLong(i)); + return; } } } @@ -461,10 +465,11 @@ public class FirstByAccumulator implements TableAccumulator { } } - private void addBooleanInput(Column xColumn, Column yColumn, Column timeColumn) { + protected void addBooleanInput(Column xColumn, Column yColumn, Column timeColumn) { for (int i = 0; i < yColumn.getPositionCount(); i++) { if (!yColumn.isNull(i)) { updateBooleanFirstValue(xColumn, i, timeColumn.getLong(i)); + return; } } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/FirstByDescAccumulator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/FirstByDescAccumulator.java index 716193da2a0..7f9340e1b3b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/FirstByDescAccumulator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/FirstByDescAccumulator.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation; +import org.apache.tsfile.block.column.Column; import org.apache.tsfile.enums.TSDataType; public class FirstByDescAccumulator extends FirstByAccumulator { @@ -32,4 +33,58 @@ public class FirstByDescAccumulator extends FirstByAccumulator { public boolean hasFinalResult() { return false; } + + @Override + protected void addIntInput(Column xColumn, Column yColumn, Column timeColumn) { + for (int i = 0; i < yColumn.getPositionCount(); i++) { + if (!yColumn.isNull(i)) { + updateIntFirstValue(xColumn, i, timeColumn.getLong(i)); + } + } + } + + @Override + protected void addLongInput(Column xColumn, Column yColumn, Column timeColumn) { + for (int i = 0; i < yColumn.getPositionCount(); i++) { + if (!yColumn.isNull(i)) { + updateLongFirstValue(xColumn, i, timeColumn.getLong(i)); + } + } + } + + @Override + protected void addFloatInput(Column xColumn, Column yColumn, Column timeColumn) { + for (int i = 0; i < yColumn.getPositionCount(); i++) { + if (!yColumn.isNull(i)) { + updateFloatFirstValue(xColumn, i, timeColumn.getLong(i)); + } + } + } + + @Override + protected void addDoubleInput(Column xColumn, Column yColumn, Column timeColumn) { + for (int i = 0; i < yColumn.getPositionCount(); i++) { + if (!yColumn.isNull(i)) { + updateDoubleFirstValue(xColumn, i, timeColumn.getLong(i)); + } + } + } + + @Override + protected void addBinaryInput(Column xColumn, Column yColumn, Column timeColumn) { + for (int i = 0; i < yColumn.getPositionCount(); i++) { + if (!yColumn.isNull(i)) { + updateBinaryFirstValue(xColumn, i, timeColumn.getLong(i)); + } + } + } + + @Override + protected void addBooleanInput(Column xColumn, Column yColumn, Column timeColumn) { + for (int i = 0; i < yColumn.getPositionCount(); i++) { + if (!yColumn.isNull(i)) { + updateBooleanFirstValue(xColumn, i, timeColumn.getLong(i)); + } + } + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/FirstDescAccumulator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/FirstDescAccumulator.java index d9e41270878..2e2982918e2 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/FirstDescAccumulator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/FirstDescAccumulator.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation; +import org.apache.tsfile.block.column.Column; import org.apache.tsfile.enums.TSDataType; public class FirstDescAccumulator extends FirstAccumulator { @@ -31,4 +32,58 @@ public class FirstDescAccumulator extends FirstAccumulator { public boolean hasFinalResult() { return false; } + + @Override + protected void addIntInput(Column valueColumn, Column timeColumn) { + for (int i = 0; i < valueColumn.getPositionCount(); i++) { + if (!valueColumn.isNull(i)) { + updateIntFirstValue(valueColumn.getInt(i), timeColumn.getLong(i)); + } + } + } + + @Override + protected void addLongInput(Column valueColumn, Column timeColumn) { + for (int i = 0; i < valueColumn.getPositionCount(); i++) { + if (!valueColumn.isNull(i)) { + updateLongFirstValue(valueColumn.getLong(i), timeColumn.getLong(i)); + } + } + } + + @Override + protected void addFloatInput(Column valueColumn, Column timeColumn) { + for (int i = 0; i < valueColumn.getPositionCount(); i++) { + if (!valueColumn.isNull(i)) { + updateFloatFirstValue(valueColumn.getFloat(i), timeColumn.getLong(i)); + } + } + } + + @Override + protected void addDoubleInput(Column valueColumn, Column timeColumn) { + for (int i = 0; i < valueColumn.getPositionCount(); i++) { + if (!valueColumn.isNull(i)) { + updateDoubleFirstValue(valueColumn.getDouble(i), timeColumn.getLong(i)); + } + } + } + + @Override + protected void addBinaryInput(Column valueColumn, Column timeColumn) { + for (int i = 0; i < valueColumn.getPositionCount(); i++) { + if (!valueColumn.isNull(i)) { + updateBinaryFirstValue(valueColumn.getBinary(i), timeColumn.getLong(i)); + } + } + } + + @Override + protected void addBooleanInput(Column valueColumn, Column timeColumn) { + for (int i = 0; i < valueColumn.getPositionCount(); i++) { + if (!valueColumn.isNull(i)) { + updateBooleanFirstValue(valueColumn.getBoolean(i), timeColumn.getLong(i)); + } + } + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/LastAccumulator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/LastAccumulator.java index 34eee287ad6..ab9f876d7e3 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/LastAccumulator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/LastAccumulator.java @@ -243,8 +243,7 @@ public class LastAccumulator implements TableAccumulator { this.lastValue.reset(); } - private void addIntInput(Column valueColumn, Column timeColumn) { - // TODO can add last position optimization if last position is null ? + protected void addIntInput(Column valueColumn, Column timeColumn) { for (int i = 0; i < valueColumn.getPositionCount(); i++) { if (!valueColumn.isNull(i)) { updateIntLastValue(valueColumn.getInt(i), timeColumn.getLong(i)); @@ -260,7 +259,7 @@ public class LastAccumulator implements TableAccumulator { } } - private void addLongInput(Column valueColumn, Column timeColumn) { + protected void addLongInput(Column valueColumn, Column timeColumn) { for (int i = 0; i < valueColumn.getPositionCount(); i++) { if (!valueColumn.isNull(i)) { updateLongLastValue(valueColumn.getLong(i), timeColumn.getLong(i)); @@ -276,7 +275,7 @@ public class LastAccumulator implements TableAccumulator { } } - private void addFloatInput(Column valueColumn, Column timeColumn) { + protected void addFloatInput(Column valueColumn, Column timeColumn) { for (int i = 0; i < valueColumn.getPositionCount(); i++) { if (!valueColumn.isNull(i)) { updateFloatLastValue(valueColumn.getFloat(i), timeColumn.getLong(i)); @@ -292,7 +291,7 @@ public class LastAccumulator implements TableAccumulator { } } - private void addDoubleInput(Column valueColumn, Column timeColumn) { + protected void addDoubleInput(Column valueColumn, Column timeColumn) { for (int i = 0; i < valueColumn.getPositionCount(); i++) { if (!valueColumn.isNull(i)) { updateDoubleLastValue(valueColumn.getDouble(i), timeColumn.getLong(i)); @@ -308,7 +307,7 @@ public class LastAccumulator implements TableAccumulator { } } - private void addBinaryInput(Column valueColumn, Column timeColumn) { + protected void addBinaryInput(Column valueColumn, Column timeColumn) { for (int i = 0; i < valueColumn.getPositionCount(); i++) { if (!valueColumn.isNull(i)) { updateBinaryLastValue(valueColumn.getBinary(i), timeColumn.getLong(i)); @@ -324,7 +323,7 @@ public class LastAccumulator implements TableAccumulator { } } - private void addBooleanInput(Column valueColumn, Column timeColumn) { + protected void addBooleanInput(Column valueColumn, Column timeColumn) { for (int i = 0; i < valueColumn.getPositionCount(); i++) { if (!valueColumn.isNull(i)) { updateBooleanLastValue(valueColumn.getBoolean(i), timeColumn.getLong(i)); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/LastByAccumulator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/LastByAccumulator.java index f03a3ab6f77..d06f9413933 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/LastByAccumulator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/LastByAccumulator.java @@ -42,8 +42,8 @@ public class LastByAccumulator implements TableAccumulator { private static final long INSTANCE_SIZE = RamUsageEstimator.shallowSizeOfInstance(LastByAccumulator.class); - private final TSDataType xDataType; - private final TSDataType yDataType; + protected final TSDataType xDataType; + protected final TSDataType yDataType; private final boolean xIsTimeColumn; private final boolean yIsTimeColumn; @@ -311,7 +311,7 @@ public class LastByAccumulator implements TableAccumulator { } // TODO can add last position optimization if last position is null ? - private void addIntInput(Column xColumn, Column yColumn, Column timeColumn) { + protected void addIntInput(Column xColumn, Column yColumn, Column timeColumn) { for (int i = 0; i < yColumn.getPositionCount(); i++) { if (!yColumn.isNull(i)) { updateIntLastValue(xColumn, i, timeColumn.getLong(i)); @@ -341,7 +341,7 @@ public class LastByAccumulator implements TableAccumulator { } } - private void addLongInput(Column xColumn, Column yColumn, Column timeColumn) { + protected void addLongInput(Column xColumn, Column yColumn, Column timeColumn) { for (int i = 0; i < yColumn.getPositionCount(); i++) { if (!yColumn.isNull(i)) { updateLongLastValue(xColumn, i, timeColumn.getLong(i)); @@ -371,7 +371,7 @@ public class LastByAccumulator implements TableAccumulator { } } - private void addFloatInput(Column xColumn, Column yColumn, Column timeColumn) { + protected void addFloatInput(Column xColumn, Column yColumn, Column timeColumn) { for (int i = 0; i < yColumn.getPositionCount(); i++) { if (!yColumn.isNull(i)) { updateFloatLastValue(xColumn, i, timeColumn.getLong(i)); @@ -401,7 +401,7 @@ public class LastByAccumulator implements TableAccumulator { } } - private void addDoubleInput(Column xColumn, Column yColumn, Column timeColumn) { + protected void addDoubleInput(Column xColumn, Column yColumn, Column timeColumn) { for (int i = 0; i < yColumn.getPositionCount(); i++) { if (!yColumn.isNull(i)) { updateDoubleLastValue(xColumn, i, timeColumn.getLong(i)); @@ -431,7 +431,7 @@ public class LastByAccumulator implements TableAccumulator { } } - private void addBinaryInput(Column xColumn, Column yColumn, Column timeColumn) { + protected void addBinaryInput(Column xColumn, Column yColumn, Column timeColumn) { for (int i = 0; i < yColumn.getPositionCount(); i++) { if (!yColumn.isNull(i)) { updateBinaryLastValue(xColumn, i, timeColumn.getLong(i)); @@ -461,7 +461,7 @@ public class LastByAccumulator implements TableAccumulator { } } - private void addBooleanInput(Column xColumn, Column yColumn, Column timeColumn) { + protected void addBooleanInput(Column xColumn, Column yColumn, Column timeColumn) { for (int i = 0; i < yColumn.getPositionCount(); i++) { if (!yColumn.isNull(i)) { updateBooleanLastValue(xColumn, i, timeColumn.getLong(i)); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/LastByDescAccumulator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/LastByDescAccumulator.java index f858d01d764..a44b41d6223 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/LastByDescAccumulator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/LastByDescAccumulator.java @@ -19,9 +19,11 @@ package org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation; +import org.apache.tsfile.block.column.Column; import org.apache.tsfile.enums.TSDataType; public class LastByDescAccumulator extends LastByAccumulator { + public LastByDescAccumulator( TSDataType xDataType, TSDataType yDataType, boolean xIsTimeColumn, boolean yIsTimeColumn) { super(xDataType, yDataType, xIsTimeColumn, yIsTimeColumn); @@ -31,4 +33,64 @@ public class LastByDescAccumulator extends LastByAccumulator { public boolean hasFinalResult() { return initResult; } + + @Override + protected void addIntInput(Column xColumn, Column yColumn, Column timeColumn) { + for (int i = 0; i < yColumn.getPositionCount(); i++) { + if (!yColumn.isNull(i)) { + updateIntLastValue(xColumn, i, timeColumn.getLong(i)); + return; + } + } + } + + @Override + protected void addLongInput(Column xColumn, Column yColumn, Column timeColumn) { + for (int i = 0; i < yColumn.getPositionCount(); i++) { + if (!yColumn.isNull(i)) { + updateLongLastValue(xColumn, i, timeColumn.getLong(i)); + return; + } + } + } + + @Override + protected void addFloatInput(Column xColumn, Column yColumn, Column timeColumn) { + for (int i = 0; i < yColumn.getPositionCount(); i++) { + if (!yColumn.isNull(i)) { + updateFloatLastValue(xColumn, i, timeColumn.getLong(i)); + return; + } + } + } + + @Override + protected void addDoubleInput(Column xColumn, Column yColumn, Column timeColumn) { + for (int i = 0; i < yColumn.getPositionCount(); i++) { + if (!yColumn.isNull(i)) { + updateDoubleLastValue(xColumn, i, timeColumn.getLong(i)); + return; + } + } + } + + @Override + protected void addBinaryInput(Column xColumn, Column yColumn, Column timeColumn) { + for (int i = 0; i < yColumn.getPositionCount(); i++) { + if (!yColumn.isNull(i)) { + updateBinaryLastValue(xColumn, i, timeColumn.getLong(i)); + return; + } + } + } + + @Override + protected void addBooleanInput(Column xColumn, Column yColumn, Column timeColumn) { + for (int i = 0; i < yColumn.getPositionCount(); i++) { + if (!yColumn.isNull(i)) { + updateBooleanLastValue(xColumn, i, timeColumn.getLong(i)); + return; + } + } + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/LastDescAccumulator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/LastDescAccumulator.java index 3641d5e23f2..bc0e97f9b7b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/LastDescAccumulator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/LastDescAccumulator.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation; +import org.apache.tsfile.block.column.Column; import org.apache.tsfile.enums.TSDataType; public class LastDescAccumulator extends LastAccumulator { @@ -31,4 +32,64 @@ public class LastDescAccumulator extends LastAccumulator { public boolean hasFinalResult() { return initResult; } + + @Override + protected void addIntInput(Column valueColumn, Column timeColumn) { + for (int i = 0; i < valueColumn.getPositionCount(); i++) { + if (!valueColumn.isNull(i)) { + updateIntLastValue(valueColumn.getInt(i), timeColumn.getLong(i)); + return; + } + } + } + + @Override + protected void addLongInput(Column valueColumn, Column timeColumn) { + for (int i = 0; i < valueColumn.getPositionCount(); i++) { + if (!valueColumn.isNull(i)) { + updateLongLastValue(valueColumn.getLong(i), timeColumn.getLong(i)); + return; + } + } + } + + @Override + protected void addFloatInput(Column valueColumn, Column timeColumn) { + for (int i = 0; i < valueColumn.getPositionCount(); i++) { + if (!valueColumn.isNull(i)) { + updateFloatLastValue(valueColumn.getFloat(i), timeColumn.getLong(i)); + return; + } + } + } + + @Override + protected void addDoubleInput(Column valueColumn, Column timeColumn) { + for (int i = 0; i < valueColumn.getPositionCount(); i++) { + if (!valueColumn.isNull(i)) { + updateDoubleLastValue(valueColumn.getDouble(i), timeColumn.getLong(i)); + return; + } + } + } + + @Override + protected void addBinaryInput(Column valueColumn, Column timeColumn) { + for (int i = 0; i < valueColumn.getPositionCount(); i++) { + if (!valueColumn.isNull(i)) { + updateBinaryLastValue(valueColumn.getBinary(i), timeColumn.getLong(i)); + return; + } + } + } + + @Override + protected void addBooleanInput(Column valueColumn, Column timeColumn) { + for (int i = 0; i < valueColumn.getPositionCount(); i++) { + if (!valueColumn.isNull(i)) { + updateBooleanLastValue(valueColumn.getBoolean(i), timeColumn.getLong(i)); + return; + } + } + } }
