This is an automated email from the ASF dual-hosted git repository.
caogaofei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 4256ea48ad1 Fix BigArray NPE of some aggregation functions (first,
last, sum, extreme) when groups are more than 1024 in aggregation query
4256ea48ad1 is described below
commit 4256ea48ad1d5a01dfa55ab074aa0ca96b9cab69
Author: Weihao Li <[email protected]>
AuthorDate: Mon Mar 31 11:03:37 2025 +0800
Fix BigArray NPE of some aggregation functions (first, last, sum, extreme)
when groups are more than 1024 in aggregation query
---
.../grouped/GroupedExtremeAccumulator.java | 1 +
.../grouped/GroupedFirstAccumulator.java | 1 +
.../grouped/GroupedLastAccumulator.java | 21 ++-
.../aggregation/grouped/GroupedSumAccumulator.java | 1 +
.../analyzer/AggregationCornerCaseTest.java | 179 +++++++++++++++++++++
5 files changed, 200 insertions(+), 3 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/GroupedExtremeAccumulator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/GroupedExtremeAccumulator.java
index 14cb70e6414..d0893c5a833 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/GroupedExtremeAccumulator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/GroupedExtremeAccumulator.java
@@ -105,6 +105,7 @@ public class GroupedExtremeAccumulator implements
GroupedAccumulator {
@Override
public void setGroupCount(long groupCount) {
+ inits.ensureCapacity(groupCount);
switch (seriesDataType) {
case INT32:
case DATE:
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/GroupedFirstAccumulator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/GroupedFirstAccumulator.java
index d72cf25ae69..e4277bf640b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/GroupedFirstAccumulator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/GroupedFirstAccumulator.java
@@ -125,6 +125,7 @@ public class GroupedFirstAccumulator implements
GroupedAccumulator {
@Override
public void setGroupCount(long groupCount) {
+ minTimes.ensureCapacity(groupCount);
switch (seriesDataType) {
case INT32:
case DATE:
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/GroupedLastAccumulator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/GroupedLastAccumulator.java
index 71e69f7a457..8294f3c529b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/GroupedLastAccumulator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/GroupedLastAccumulator.java
@@ -125,6 +125,7 @@ public class GroupedLastAccumulator implements
GroupedAccumulator {
@Override
public void setGroupCount(long groupCount) {
+ maxTimes.ensureCapacity(groupCount);
switch (seriesDataType) {
case INT32:
case DATE:
@@ -439,9 +440,23 @@ public class GroupedLastAccumulator implements
GroupedAccumulator {
private void addFloatInput(
int[] groupIds, Column valueColumn, Column timeColumn, AggregationMask
mask) {
- for (int i = 0; i < groupIds.length; i++) {
- if (!valueColumn.isNull(i)) {
- updateFloatValue(groupIds[i], valueColumn.getFloat(i),
timeColumn.getLong(i));
+ int positionCount = mask.getSelectedPositionCount();
+
+ if (mask.isSelectAll()) {
+ for (int i = 0; i < positionCount; i++) {
+ if (!valueColumn.isNull(i)) {
+ updateFloatValue(groupIds[i], valueColumn.getFloat(i),
timeColumn.getLong(i));
+ }
+ }
+ } else {
+ int[] selectedPositions = mask.getSelectedPositions();
+ int position;
+ for (int i = 0; i < positionCount; i++) {
+ position = selectedPositions[i];
+ if (!valueColumn.isNull(position)) {
+ updateFloatValue(
+ groupIds[position], valueColumn.getFloat(position),
timeColumn.getLong(position));
+ }
}
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/GroupedSumAccumulator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/GroupedSumAccumulator.java
index d52dd8088fd..395fddb4f2e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/GroupedSumAccumulator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/GroupedSumAccumulator.java
@@ -48,6 +48,7 @@ public class GroupedSumAccumulator implements
GroupedAccumulator {
@Override
public void setGroupCount(long groupCount) {
+ initResult.ensureCapacity(groupCount);
sumValues.ensureCapacity(groupCount);
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/AggregationCornerCaseTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/AggregationCornerCaseTest.java
index c83299df92b..6f133757a89 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/AggregationCornerCaseTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/AggregationCornerCaseTest.java
@@ -29,6 +29,7 @@ import
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceStateM
import org.apache.iotdb.db.queryengine.execution.operator.Operator;
import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
import
org.apache.iotdb.db.queryengine.execution.operator.source.relational.TableScanOperator;
+import
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.GroupedAggregator;
import
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.HashAggregationOperator;
import
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.StreamingHashAggregationOperator;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
@@ -49,10 +50,17 @@ import org.junit.Test;
import java.util.Collections;
import java.util.List;
+import java.util.OptionalInt;
+import static
org.apache.iotdb.commons.udf.builtin.relational.TableBuiltinAggregationFunction.EXTREME;
+import static
org.apache.iotdb.commons.udf.builtin.relational.TableBuiltinAggregationFunction.FIRST;
+import static
org.apache.iotdb.commons.udf.builtin.relational.TableBuiltinAggregationFunction.LAST;
+import static
org.apache.iotdb.commons.udf.builtin.relational.TableBuiltinAggregationFunction.SUM;
+import static
org.apache.iotdb.commons.udf.builtin.relational.TableBuiltinAggregationFunction.getAggregationTypeByFuncName;
import static
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
import static
org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.MergeSortComparator.getComparatorForTable;
import static
org.apache.iotdb.db.queryengine.execution.operator.source.relational.AbstractTableScanOperator.TIME_COLUMN_TEMPLATE;
+import static
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.AccumulatorFactory.createGroupedAccumulator;
import static
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.hash.GroupByHash.DEFAULT_GROUP_NUMBER;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
@@ -320,4 +328,175 @@ public class AggregationCornerCaseTest {
false,
Long.MAX_VALUE);
}
+
+ @Test
+ public void groupMoreThan1024Test() {
+ try (HashAggregationOperator aggregationOperator =
genHashAggregationOperator2()) {
+ ListenableFuture<?> listenableFuture = aggregationOperator.isBlocked();
+ listenableFuture.get();
+ while (!aggregationOperator.isFinished() &&
aggregationOperator.hasNext()) {
+ aggregationOperator.next();
+ listenableFuture = aggregationOperator.isBlocked();
+ listenableFuture.get();
+ }
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ // construct a AggregationHashOperator has more than 1024 groups in input
TsBlock
+ private HashAggregationOperator genHashAggregationOperator2() {
+
+ // Construct operator tree
+ QueryId queryId = new QueryId("stub_query");
+
+ FragmentInstanceId instanceId =
+ new FragmentInstanceId(new PlanFragmentId(queryId, 0),
"stub-instance");
+ FragmentInstanceStateMachine stateMachine =
+ new FragmentInstanceStateMachine(
+ instanceId,
+ IoTDBThreadPoolFactory.newFixedThreadPool(
+ 1, "aggregationHashOperator-test-instance-notification"));
+ FragmentInstanceContext fragmentInstanceContext =
+ createFragmentInstanceContext(instanceId, stateMachine);
+ DriverContext driverContext = new DriverContext(fragmentInstanceContext,
0);
+ PlanNodeId planNodeId1 = new PlanNodeId("1");
+ driverContext.addOperatorContext(1, planNodeId1,
TableScanOperator.class.getSimpleName());
+ PlanNodeId planNodeId2 = new PlanNodeId("2");
+ driverContext.addOperatorContext(2, planNodeId2,
HashAggregationOperator.class.getSimpleName());
+ Operator childOperator =
+ new Operator() {
+ boolean finished = false;
+
+ @Override
+ public OperatorContext getOperatorContext() {
+ return driverContext.getOperatorContexts().get(0);
+ }
+
+ @Override
+ public TsBlock next() {
+ TsBlockBuilder builder =
+ new TsBlockBuilder(ImmutableList.of(TSDataType.TIMESTAMP,
TSDataType.INT32));
+ ColumnBuilder[] columnBuilders = builder.getValueColumnBuilders();
+ for (int i = 0; i < 1025; i++) {
+ columnBuilders[0].writeLong(i);
+ }
+ for (int i = 0; i < 1025; i++) {
+ columnBuilders[1].writeInt(i);
+ }
+ builder.declarePositions(1025);
+ TsBlock result =
+ builder.build(
+ new RunLengthEncodedColumn(TIME_COLUMN_TEMPLATE,
builder.getPositionCount()));
+ finished = true;
+ return result;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return !finished;
+ }
+
+ @Override
+ public void close() {}
+
+ @Override
+ public boolean isFinished() {
+ return finished;
+ }
+
+ @Override
+ public long calculateMaxPeekMemory() {
+ return 0;
+ }
+
+ @Override
+ public long calculateMaxReturnSize() {
+ return 0;
+ }
+
+ @Override
+ public long calculateRetainedSizeAfterCallingNext() {
+ return 0;
+ }
+
+ @Override
+ public long ramBytesUsed() {
+ return 0;
+ }
+ };
+
+ OperatorContext operatorContext =
driverContext.getOperatorContexts().get(1);
+
+ GroupedAggregator firstAggregator =
+ new GroupedAggregator(
+ createGroupedAccumulator(
+ FIRST.getFunctionName(),
+ getAggregationTypeByFuncName(FIRST.getFunctionName()),
+ ImmutableList.of(TSDataType.INT32, TSDataType.TIMESTAMP),
+ Collections.emptyList(),
+ Collections.emptyMap(),
+ true,
+ false),
+ AggregationNode.Step.SINGLE,
+ TSDataType.INT32,
+ ImmutableList.of(1, 0),
+ OptionalInt.empty());
+ GroupedAggregator lastAggregator =
+ new GroupedAggregator(
+ createGroupedAccumulator(
+ LAST.getFunctionName(),
+ getAggregationTypeByFuncName(LAST.getFunctionName()),
+ ImmutableList.of(TSDataType.INT32, TSDataType.TIMESTAMP),
+ Collections.emptyList(),
+ Collections.emptyMap(),
+ true,
+ false),
+ AggregationNode.Step.SINGLE,
+ TSDataType.INT32,
+ ImmutableList.of(1, 0),
+ OptionalInt.empty());
+ GroupedAggregator sumAggregator =
+ new GroupedAggregator(
+ createGroupedAccumulator(
+ SUM.getFunctionName(),
+ getAggregationTypeByFuncName(SUM.getFunctionName()),
+ ImmutableList.of(TSDataType.INT32),
+ Collections.emptyList(),
+ Collections.emptyMap(),
+ true,
+ false),
+ AggregationNode.Step.SINGLE,
+ TSDataType.DOUBLE,
+ ImmutableList.of(1),
+ OptionalInt.empty());
+ GroupedAggregator extremeAggregator =
+ new GroupedAggregator(
+ createGroupedAccumulator(
+ EXTREME.getFunctionName(),
+ getAggregationTypeByFuncName(EXTREME.getFunctionName()),
+ ImmutableList.of(TSDataType.INT32),
+ Collections.emptyList(),
+ Collections.emptyMap(),
+ true,
+ false),
+ AggregationNode.Step.SINGLE,
+ TSDataType.INT32,
+ ImmutableList.of(1),
+ OptionalInt.empty());
+
+ return new HashAggregationOperator(
+ operatorContext,
+ childOperator,
+ ImmutableList.of(IntType.INT32),
+ Collections.singletonList(1),
+ ImmutableList.of(firstAggregator, lastAggregator, sumAggregator,
extremeAggregator),
+ AggregationNode.Step.SINGLE,
+ DEFAULT_GROUP_NUMBER,
+ Long.MAX_VALUE,
+ false,
+ Long.MAX_VALUE);
+ }
}