This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch windowing
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/windowing by this push:
new d58bb2255aa fix
d58bb2255aa is described below
commit d58bb2255aa94b7a9836a25658008cc0c1549d81
Author: Caideyipi <[email protected]>
AuthorDate: Tue Feb 3 11:59:03 2026 +0800
fix
---
.../iotdb/pipe/it/single/IoTDBPipeAggregateIT.java | 40 ++++++++++++++++++++--
.../window/processor/CountWindowingProcessor.java | 4 ++-
2 files changed, 40 insertions(+), 4 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeAggregateIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeAggregateIT.java
index 9430e0adbfa..1400926dca2 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeAggregateIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeAggregateIT.java
@@ -28,7 +28,7 @@ import org.apache.iotdb.itbase.category.MultiClusterIT1;
import org.apache.iotdb.rpc.TSStatusCode;
import org.junit.Assert;
-import org.junit.Ignore;
+import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
@@ -36,14 +36,22 @@ import org.junit.runner.RunWith;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Locale;
import java.util.Map;
@RunWith(IoTDBTestRunner.class)
@Category({MultiClusterIT1.class})
public class IoTDBPipeAggregateIT extends AbstractPipeSingleIT {
+
+ @Before
+ public void setUp() {
+ Locale.setDefault(Locale.ENGLISH);
+ super.setUp();
+ }
+
@Test
- @Ignore
- public void testAggregator() throws Exception {
+ public void testWindowingAggregator() throws Exception {
try (final SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient) env.getLeaderConfigNodeConnection()) {
// Test the mixture of historical and realtime data
@@ -132,4 +140,30 @@ public class IoTDBPipeAggregateIT extends
AbstractPipeSingleIT {
Collections.singleton("2,"));
}
}
+
+ @Test
+ public void testCountAggregator() {
+ TestUtils.executeNonQueries(
+ env,
+ Arrays.asList(
+ "create pipe factory with source('path'='root.test.**') with
processor ('processor'='aggregate-processor', 'operators'='min',
'output.database'='root.aggregate', 'windowing-strategy'='count', 'count'='5')
with sink ('sink'='write-back-sink')",
+ "insert into root.test.factory.vehicle (time, temperature)
values(1, 1)",
+ "insert into root.test.factory.vehicle (time, temperature)
values(2, 2)",
+ "insert into root.test.factory.vehicle (time, temperature)
values(3, 3)",
+ "insert into root.test.factory.vehicle (time, temperature)
values(4, 4)",
+ "insert into root.test.factory.vehicle (time, temperature)
values(5, 5)",
+ "insert into root.test.factory.vehicle (time, temperature)
values(6, 6)",
+ "insert into root.test.factory.vehicle (time, temperature)
values(7, 7)",
+ "insert into root.test.factory.vehicle (time, temperature)
values(8, 8)",
+ "insert into root.test.factory.vehicle (time, temperature)
values(10, 10)",
+ "insert into root.test.factory.vehicle (time, temperature)
values(9, 9)",
+ "flush"));
+
+ TestUtils.assertDataEventuallyOnEnv(
+ env,
+ "select min from root.aggregate.factory.vehicle.temperature",
+ "Time,root.aggregate.factory.vehicle.temperature.min,",
+ new HashSet<>(Arrays.asList("5,1.0", "9,6.0")),
+ 10);
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/window/processor/CountWindowingProcessor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/window/processor/CountWindowingProcessor.java
index ea286d23a0a..655744a9c30 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/window/processor/CountWindowingProcessor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/window/processor/CountWindowingProcessor.java
@@ -79,7 +79,9 @@ public class CountWindowingProcessor extends
AbstractSimpleTimeWindowingProcesso
if ((long) window.getCustomizedRuntimeValue() >= count - 1) {
return new Pair<>(
WindowState.EMIT_AND_PURGE_WITH_COMPUTE,
- new
WindowOutput().setTimestamp(timeStamp).setProgressTime(timeStamp));
+ new WindowOutput()
+ .setTimestamp(window.getTimestamp())
+ .setProgressTime(window.getTimestamp()));
}
window.setCustomizedRuntimeValue((long) window.getCustomizedRuntimeValue()
+ 1);
return new Pair<>(WindowState.COMPUTE, null);