This is an automated email from the ASF dual-hosted git repository.

justinchen pushed a commit to branch windowing
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 31ae4684e8de718a0869b690f2f217b29b7c83e8
Author: Caideyipi <[email protected]>
AuthorDate: Tue Feb 3 10:31:49 2026 +0800

    count-processor
---
 .../PipeDataRegionProcessorConstructor.java        |  4 +++
 .../window/processor/CountWindowingProcessor.java  | 40 +++++++++++++++++++---
 .../agent/plugin/builtin/BuiltinPipePlugin.java    |  3 ++
 .../aggregate/CountWindowingProcessor.java         | 30 ++++++++++++++++
 4 files changed, 72 insertions(+), 5 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionProcessorConstructor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionProcessorConstructor.java
index 31cc8250ebf..875125e1474 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionProcessorConstructor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionProcessorConstructor.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.pipe.agent.plugin.dataregion;
 
 import org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin;
+import 
org.apache.iotdb.commons.pipe.agent.plugin.builtin.processor.aggregate.CountWindowingProcessor;
 import 
org.apache.iotdb.commons.pipe.agent.plugin.builtin.processor.donothing.DoNothingProcessor;
 import 
org.apache.iotdb.commons.pipe.agent.plugin.builtin.processor.throwing.ThrowingExceptionProcessor;
 import 
org.apache.iotdb.commons.pipe.agent.plugin.constructor.PipeProcessorConstructor;
@@ -64,6 +65,9 @@ class PipeDataRegionProcessorConstructor extends 
PipeProcessorConstructor {
     pluginConstructors.put(
         BuiltinPipePlugin.TUMBLING_WINDOWING_PROCESSOR.getPipePluginName(),
         TumblingWindowingProcessor::new);
+    pluginConstructors.put(
+        BuiltinPipePlugin.COUNT_WINDOWING_PROCESSOR.getPipePluginName(),
+        CountWindowingProcessor::new);
     pluginConstructors.put(
         BuiltinPipePlugin.COUNT_POINT_PROCESSOR.getPipePluginName(), 
TwoStageCountProcessor::new);
     pluginConstructors.put(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/window/processor/CountWindowingProcessor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/window/processor/CountWindowingProcessor.java
index 62e04fd4646..ea286d23a0a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/window/processor/CountWindowingProcessor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/window/processor/CountWindowingProcessor.java
@@ -29,36 +29,66 @@ import 
org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
 
 import org.apache.tsfile.utils.Pair;
 
+import java.util.Collections;
 import java.util.List;
 import java.util.Set;
 
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeProcessorConstant.PROCESSOR_COUNT_DEFAULT_VALUE;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeProcessorConstant.PROCESSOR_COUNT_KEY;
+
 @TreeModel
 public class CountWindowingProcessor extends 
AbstractSimpleTimeWindowingProcessor {
 
   private long count;
 
   @Override
-  public void validate(final PipeParameterValidator validator) throws 
Exception {}
+  public void validate(final PipeParameterValidator validator) throws 
Exception {
+    final PipeParameters parameters = validator.getParameters();
+    validator.validate(
+        args -> (long) args > 0,
+        String.format("The parameter %s must be greater than 0", 
PROCESSOR_COUNT_KEY),
+        parameters.getLongOrDefault(PROCESSOR_COUNT_KEY, 
PROCESSOR_COUNT_DEFAULT_VALUE));
+  }
 
   @Override
   public void customize(
       final PipeParameters parameters, final PipeProcessorRuntimeConfiguration 
configuration)
-      throws Exception {}
+      throws Exception {
+    count = parameters.getLongOrDefault(PROCESSOR_COUNT_KEY, 
PROCESSOR_COUNT_DEFAULT_VALUE);
+  }
 
   @Override
   public Set<TimeSeriesWindow> mayAddWindow(
       final List<TimeSeriesWindow> windowList, final long timeStamp) {
-    return null;
+    final TimeSeriesWindow result;
+    if (windowList.isEmpty()) {
+      result = new TimeSeriesWindow(this, 0L);
+      result.setTimestamp(timeStamp);
+      windowList.add(result);
+      return Collections.singleton(result);
+    }
+    return Collections.emptySet();
   }
 
   @Override
   public Pair<WindowState, WindowOutput> updateAndMaySetWindowState(
       final TimeSeriesWindow window, final long timeStamp) {
-    return null;
+    if (timeStamp > window.getTimestamp()) {
+      window.setTimestamp(timeStamp);
+    }
+    if ((long) window.getCustomizedRuntimeValue() >= count - 1) {
+      return new Pair<>(
+          WindowState.EMIT_AND_PURGE_WITH_COMPUTE,
+          new 
WindowOutput().setTimestamp(timeStamp).setProgressTime(timeStamp));
+    }
+    window.setCustomizedRuntimeValue((long) window.getCustomizedRuntimeValue() 
+ 1);
+    return new Pair<>(WindowState.COMPUTE, null);
   }
 
   @Override
   public WindowOutput forceOutput(final TimeSeriesWindow window) {
-    return null;
+    return new WindowOutput()
+        .setTimestamp(window.getTimestamp())
+        .setProgressTime(window.getTimestamp());
   }
 }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/BuiltinPipePlugin.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/BuiltinPipePlugin.java
index 74bc0d9815a..beb21f56812 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/BuiltinPipePlugin.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/BuiltinPipePlugin.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.commons.pipe.agent.plugin.builtin;
 
 import 
org.apache.iotdb.commons.pipe.agent.plugin.builtin.processor.aggregate.AggregateProcessor;
+import 
org.apache.iotdb.commons.pipe.agent.plugin.builtin.processor.aggregate.CountWindowingProcessor;
 import 
org.apache.iotdb.commons.pipe.agent.plugin.builtin.processor.aggregate.StandardStatisticsProcessor;
 import 
org.apache.iotdb.commons.pipe.agent.plugin.builtin.processor.aggregate.TumblingWindowingProcessor;
 import 
org.apache.iotdb.commons.pipe.agent.plugin.builtin.processor.donothing.DoNothingProcessor;
@@ -73,6 +74,7 @@ public enum BuiltinPipePlugin {
   // Hidden-processors, which are plugins of the processors
   STANDARD_STATISTICS_PROCESSOR("standard-statistics-processor", 
StandardStatisticsProcessor.class),
   TUMBLING_WINDOWING_PROCESSOR("tumbling-windowing-processor", 
TumblingWindowingProcessor.class),
+  COUNT_WINDOWING_PROCESSOR("count-windowing-processor", 
CountWindowingProcessor.class),
   PIPE_CONSENSUS_PROCESSOR("pipe-consensus-processor", 
PipeConsensusProcessor.class),
   RENAME_DATABASE_PROCESSOR("rename-database-processor", 
RenameDatabaseProcessor.class),
 
@@ -155,6 +157,7 @@ public enum BuiltinPipePlugin {
                   AGGREGATE_PROCESSOR.getPipePluginName().toUpperCase(),
                   COUNT_POINT_PROCESSOR.getPipePluginName().toUpperCase(),
                   
STANDARD_STATISTICS_PROCESSOR.getPipePluginName().toUpperCase(),
+                  COUNT_WINDOWING_PROCESSOR.getPipePluginName().toUpperCase(),
                   
TUMBLING_WINDOWING_PROCESSOR.getPipePluginName().toUpperCase(),
                   PIPE_CONSENSUS_PROCESSOR.getPipePluginName().toUpperCase(),
                   RENAME_DATABASE_PROCESSOR.getPipePluginName().toUpperCase(),
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/processor/aggregate/CountWindowingProcessor.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/processor/aggregate/CountWindowingProcessor.java
new file mode 100644
index 00000000000..6481a3b11ae
--- /dev/null
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/processor/aggregate/CountWindowingProcessor.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.pipe.agent.plugin.builtin.processor.aggregate;
+
+import 
org.apache.iotdb.commons.pipe.agent.plugin.builtin.processor.PlaceHolderProcessor;
+
+/**
+ * This class is a placeholder and should not be initialized. It represents 
the Standard Statistics
+ * processor. There is a real implementation in the server module but cannot 
be imported here. The
+ * pipe agent in the server module will replace this class with the real 
implementation when
+ * initializing the Standard Statistics processor.
+ */
+public class CountWindowingProcessor extends PlaceHolderProcessor {}

Reply via email to