This is an automated email from the ASF dual-hosted git repository.
JackieTien97 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 63dfac842f7 Fix window function state reset across batches (#17813)
63dfac842f7 is described below
commit 63dfac842f7aaaac47a416699b06291f15a9e2fe
Author: Caideyipi <[email protected]>
AuthorDate: Fri Jun 5 13:52:20 2026 +0800
Fix window function state reset across batches (#17813)
---
.../db/it/IoTDBWindowFunctionBatchedResultIT.java | 94 ++++++++++++++++++++++
.../process/window/TableWindowOperator.java | 2 +-
.../window/partition/PartitionExecutor.java | 11 ++-
3 files changed, 105 insertions(+), 2 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBWindowFunctionBatchedResultIT.java
b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBWindowFunctionBatchedResultIT.java
new file mode 100644
index 00000000000..ad7bc73fb8f
--- /dev/null
+++
b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBWindowFunctionBatchedResultIT.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.relational.it.db.it;
+
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.TableClusterIT;
+import org.apache.iotdb.itbase.category.TableLocalStandaloneIT;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.sql.Connection;
+import java.sql.Statement;
+
+import static org.apache.iotdb.db.it.utils.TestUtils.tableResultSetEqualTest;
+import static org.junit.Assert.fail;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({TableLocalStandaloneIT.class, TableClusterIT.class})
+public class IoTDBWindowFunctionBatchedResultIT {
+ private static final String DATABASE_NAME = "test";
+
+ private static final String[] SQLS =
+ new String[] {
+ "CREATE DATABASE " + DATABASE_NAME,
+ "USE " + DATABASE_NAME,
+ "CREATE TABLE batched_rank (device STRING TAG, value INT32 FIELD)",
+ "INSERT INTO batched_rank VALUES (2021-01-01T00:00:01, 'd1', 1)",
+ "INSERT INTO batched_rank VALUES (2021-01-01T00:00:02, 'd1', 2)",
+ "INSERT INTO batched_rank VALUES (2021-01-01T00:00:03, 'd1', 3)",
+ "INSERT INTO batched_rank VALUES (2021-01-01T00:00:04, 'd1', 4)",
+ "INSERT INTO batched_rank VALUES (2021-01-01T00:00:05, 'd1', 5)",
+ "FLUSH",
+ "CLEAR ATTRIBUTE CACHE",
+ };
+
+ @BeforeClass
+ public static void setUp() {
+
EnvFactory.getEnv().getConfig().getCommonConfig().setMaxTsBlockLineNumber(2);
+ EnvFactory.getEnv().initClusterEnvironment();
+ insertData();
+ }
+
+ @AfterClass
+ public static void tearDown() {
+ EnvFactory.getEnv().cleanClusterEnvironment();
+ }
+
+ @Test
+ public void testRankStateAcrossOutputBatches() {
+ String[] expectedHeader = new String[] {"value", "rk"};
+ String[] retArray =
+ new String[] {
+ "1,1,", "2,2,", "3,3,", "4,4,", "5,5,",
+ };
+ tableResultSetEqualTest(
+ "SELECT value, rank() OVER (PARTITION BY device ORDER BY value) AS rk
FROM batched_rank ORDER BY value",
+ expectedHeader,
+ retArray,
+ DATABASE_NAME);
+ }
+
+ private static void insertData() {
+ try (Connection connection = EnvFactory.getEnv().getTableConnection();
+ Statement statement = connection.createStatement()) {
+ for (String sql : SQLS) {
+ statement.execute(sql);
+ }
+ } catch (Exception e) {
+ fail("insertData failed.");
+ }
+ }
+}
diff --git
a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/process/window/TableWindowOperator.java
b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/process/window/TableWindowOperator.java
index 8dde1145696..bdbabcefa3d 100644
---
a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/process/window/TableWindowOperator.java
+++
b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/process/window/TableWindowOperator.java
@@ -218,7 +218,7 @@ public class TableWindowOperator implements ProcessOperator
{
private TsBlock transform(long startTime) {
while (!cachedPartitionExecutors.isEmpty()) {
PartitionExecutor partitionExecutor =
cachedPartitionExecutors.getFirst();
- partitionExecutor.resetWindowFunctions();
+ partitionExecutor.initializeWindowFunctions();
while (System.nanoTime() - startTime < maxRuntime
&& !tsBlockBuilder.isFull()
diff --git
a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/process/window/partition/PartitionExecutor.java
b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/process/window/partition/PartitionExecutor.java
index 673e971a671..5cfcf3ac173 100644
---
a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/process/window/partition/PartitionExecutor.java
+++
b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/process/window/partition/PartitionExecutor.java
@@ -59,6 +59,7 @@ public final class PartitionExecutor {
private final List<Frame> frames;
private final boolean needPeerGroup;
+ private boolean windowFunctionsInitialized;
public PartitionExecutor(
List<TsBlock> tsBlocks,
@@ -114,6 +115,7 @@ public final class PartitionExecutor {
sortedColumns = partition.getSortedColumnList(sortChannels);
currentPosition = partitionStart;
+ windowFunctionsInitialized = false;
needPeerGroup =
windowFunctions.stream().anyMatch(WindowFunction::needPeerGroup)
|| frameInfoList.stream()
@@ -212,9 +214,16 @@ public final class PartitionExecutor {
}
}
- public void resetWindowFunctions() {
+ private void resetWindowFunctions() {
for (WindowFunction windowFunction : windowFunctions) {
windowFunction.reset();
}
}
+
+ public void initializeWindowFunctions() {
+ if (!windowFunctionsInitialized) {
+ resetWindowFunctions();
+ windowFunctionsInitialized = true;
+ }
+ }
}