This is an automated email from the ASF dual-hosted git repository.

justinchen pushed a commit to branch windowing
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/windowing by this push:
     new d58bb2255aa fix
d58bb2255aa is described below

commit d58bb2255aa94b7a9836a25658008cc0c1549d81
Author: Caideyipi <[email protected]>
AuthorDate: Tue Feb 3 11:59:03 2026 +0800

    fix
---
 .../iotdb/pipe/it/single/IoTDBPipeAggregateIT.java | 40 ++++++++++++++++++++--
 .../window/processor/CountWindowingProcessor.java  |  4 ++-
 2 files changed, 40 insertions(+), 4 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeAggregateIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeAggregateIT.java
index 9430e0adbfa..1400926dca2 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeAggregateIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeAggregateIT.java
@@ -28,7 +28,7 @@ import org.apache.iotdb.itbase.category.MultiClusterIT1;
 import org.apache.iotdb.rpc.TSStatusCode;
 
 import org.junit.Assert;
-import org.junit.Ignore;
+import org.junit.Before;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
@@ -36,14 +36,22 @@ import org.junit.runner.RunWith;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Locale;
 import java.util.Map;
 
 @RunWith(IoTDBTestRunner.class)
 @Category({MultiClusterIT1.class})
 public class IoTDBPipeAggregateIT extends AbstractPipeSingleIT {
+
+  @Before
+  public void setUp() {
+    Locale.setDefault(Locale.ENGLISH);
+    super.setUp();
+  }
+
   @Test
-  @Ignore
-  public void testAggregator() throws Exception {
+  public void testWindowingAggregator() throws Exception {
     try (final SyncConfigNodeIServiceClient client =
         (SyncConfigNodeIServiceClient) env.getLeaderConfigNodeConnection()) {
       // Test the mixture of historical and realtime data
@@ -132,4 +140,30 @@ public class IoTDBPipeAggregateIT extends 
AbstractPipeSingleIT {
           Collections.singleton("2,"));
     }
   }
+
+  @Test
+  public void testCountAggregator() {
+    TestUtils.executeNonQueries(
+        env,
+        Arrays.asList(
+            "create pipe factory with source('path'='root.test.**') with 
processor ('processor'='aggregate-processor', 'operators'='min', 
'output.database'='root.aggregate', 'windowing-strategy'='count', 'count'='5') 
with sink ('sink'='write-back-sink')",
+            "insert into root.test.factory.vehicle (time, temperature) 
values(1, 1)",
+            "insert into root.test.factory.vehicle (time, temperature) 
values(2, 2)",
+            "insert into root.test.factory.vehicle (time, temperature) 
values(3, 3)",
+            "insert into root.test.factory.vehicle (time, temperature) 
values(4, 4)",
+            "insert into root.test.factory.vehicle (time, temperature) 
values(5, 5)",
+            "insert into root.test.factory.vehicle (time, temperature) 
values(6, 6)",
+            "insert into root.test.factory.vehicle (time, temperature) 
values(7, 7)",
+            "insert into root.test.factory.vehicle (time, temperature) 
values(8, 8)",
+            "insert into root.test.factory.vehicle (time, temperature) 
values(10, 10)",
+            "insert into root.test.factory.vehicle (time, temperature) 
values(9, 9)",
+            "flush"));
+
+    TestUtils.assertDataEventuallyOnEnv(
+        env,
+        "select min from root.aggregate.factory.vehicle.temperature",
+        "Time,root.aggregate.factory.vehicle.temperature.min,",
+        new HashSet<>(Arrays.asList("5,1.0", "9,6.0")),
+        10);
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/window/processor/CountWindowingProcessor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/window/processor/CountWindowingProcessor.java
index ea286d23a0a..655744a9c30 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/window/processor/CountWindowingProcessor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/window/processor/CountWindowingProcessor.java
@@ -79,7 +79,9 @@ public class CountWindowingProcessor extends 
AbstractSimpleTimeWindowingProcesso
     if ((long) window.getCustomizedRuntimeValue() >= count - 1) {
       return new Pair<>(
           WindowState.EMIT_AND_PURGE_WITH_COMPUTE,
-          new 
WindowOutput().setTimestamp(timeStamp).setProgressTime(timeStamp));
+          new WindowOutput()
+              .setTimestamp(window.getTimestamp())
+              .setProgressTime(window.getTimestamp()));
     }
     window.setCustomizedRuntimeValue((long) window.getCustomizedRuntimeValue() 
+ 1);
     return new Pair<>(WindowState.COMPUTE, null);

Reply via email to