This is an automated email from the ASF dual-hosted git repository.

JackieTien97 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 63dfac842f7 Fix window function state reset across batches (#17813)
63dfac842f7 is described below

commit 63dfac842f7aaaac47a416699b06291f15a9e2fe
Author: Caideyipi <[email protected]>
AuthorDate: Fri Jun 5 13:52:20 2026 +0800

    Fix window function state reset across batches (#17813)
---
 .../db/it/IoTDBWindowFunctionBatchedResultIT.java  | 94 ++++++++++++++++++++++
 .../process/window/TableWindowOperator.java        |  2 +-
 .../window/partition/PartitionExecutor.java        | 11 ++-
 3 files changed, 105 insertions(+), 2 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBWindowFunctionBatchedResultIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBWindowFunctionBatchedResultIT.java
new file mode 100644
index 00000000000..ad7bc73fb8f
--- /dev/null
+++ 
b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBWindowFunctionBatchedResultIT.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.relational.it.db.it;
+
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.TableClusterIT;
+import org.apache.iotdb.itbase.category.TableLocalStandaloneIT;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.sql.Connection;
+import java.sql.Statement;
+
+import static org.apache.iotdb.db.it.utils.TestUtils.tableResultSetEqualTest;
+import static org.junit.Assert.fail;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({TableLocalStandaloneIT.class, TableClusterIT.class})
+public class IoTDBWindowFunctionBatchedResultIT {
+  private static final String DATABASE_NAME = "test";
+
+  private static final String[] SQLS =
+      new String[] {
+        "CREATE DATABASE " + DATABASE_NAME,
+        "USE " + DATABASE_NAME,
+        "CREATE TABLE batched_rank (device STRING TAG, value INT32 FIELD)",
+        "INSERT INTO batched_rank VALUES (2021-01-01T00:00:01, 'd1', 1)",
+        "INSERT INTO batched_rank VALUES (2021-01-01T00:00:02, 'd1', 2)",
+        "INSERT INTO batched_rank VALUES (2021-01-01T00:00:03, 'd1', 3)",
+        "INSERT INTO batched_rank VALUES (2021-01-01T00:00:04, 'd1', 4)",
+        "INSERT INTO batched_rank VALUES (2021-01-01T00:00:05, 'd1', 5)",
+        "FLUSH",
+        "CLEAR ATTRIBUTE CACHE",
+      };
+
+  @BeforeClass
+  public static void setUp() {
+    
EnvFactory.getEnv().getConfig().getCommonConfig().setMaxTsBlockLineNumber(2);
+    EnvFactory.getEnv().initClusterEnvironment();
+    insertData();
+  }
+
+  @AfterClass
+  public static void tearDown() {
+    EnvFactory.getEnv().cleanClusterEnvironment();
+  }
+
+  @Test
+  public void testRankStateAcrossOutputBatches() {
+    String[] expectedHeader = new String[] {"value", "rk"};
+    String[] retArray =
+        new String[] {
+          "1,1,", "2,2,", "3,3,", "4,4,", "5,5,",
+        };
+    tableResultSetEqualTest(
+        "SELECT value, rank() OVER (PARTITION BY device ORDER BY value) AS rk 
FROM batched_rank ORDER BY value",
+        expectedHeader,
+        retArray,
+        DATABASE_NAME);
+  }
+
+  private static void insertData() {
+    try (Connection connection = EnvFactory.getEnv().getTableConnection();
+        Statement statement = connection.createStatement()) {
+      for (String sql : SQLS) {
+        statement.execute(sql);
+      }
+    } catch (Exception e) {
+      fail("insertData failed.");
+    }
+  }
+}
diff --git 
a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/process/window/TableWindowOperator.java
 
b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/process/window/TableWindowOperator.java
index 8dde1145696..bdbabcefa3d 100644
--- 
a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/process/window/TableWindowOperator.java
+++ 
b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/process/window/TableWindowOperator.java
@@ -218,7 +218,7 @@ public class TableWindowOperator implements ProcessOperator 
{
   private TsBlock transform(long startTime) {
     while (!cachedPartitionExecutors.isEmpty()) {
       PartitionExecutor partitionExecutor = 
cachedPartitionExecutors.getFirst();
-      partitionExecutor.resetWindowFunctions();
+      partitionExecutor.initializeWindowFunctions();
 
       while (System.nanoTime() - startTime < maxRuntime
           && !tsBlockBuilder.isFull()
diff --git 
a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/process/window/partition/PartitionExecutor.java
 
b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/process/window/partition/PartitionExecutor.java
index 673e971a671..5cfcf3ac173 100644
--- 
a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/process/window/partition/PartitionExecutor.java
+++ 
b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/execution/operator/process/window/partition/PartitionExecutor.java
@@ -59,6 +59,7 @@ public final class PartitionExecutor {
   private final List<Frame> frames;
 
   private final boolean needPeerGroup;
+  private boolean windowFunctionsInitialized;
 
   public PartitionExecutor(
       List<TsBlock> tsBlocks,
@@ -114,6 +115,7 @@ public final class PartitionExecutor {
     sortedColumns = partition.getSortedColumnList(sortChannels);
 
     currentPosition = partitionStart;
+    windowFunctionsInitialized = false;
     needPeerGroup =
         windowFunctions.stream().anyMatch(WindowFunction::needPeerGroup)
             || frameInfoList.stream()
@@ -212,9 +214,16 @@ public final class PartitionExecutor {
     }
   }
 
-  public void resetWindowFunctions() {
+  private void resetWindowFunctions() {
     for (WindowFunction windowFunction : windowFunctions) {
       windowFunction.reset();
     }
   }
+
+  public void initializeWindowFunctions() {
+    if (!windowFunctionsInitialized) {
+      resetWindowFunctions();
+      windowFunctionsInitialized = true;
+    }
+  }
 }

Reply via email to