This is an automated email from the ASF dual-hosted git repository. justinchen pushed a commit to branch windowing in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 31ae4684e8de718a0869b690f2f217b29b7c83e8 Author: Caideyipi <[email protected]> AuthorDate: Tue Feb 3 10:31:49 2026 +0800 count-processor --- .../PipeDataRegionProcessorConstructor.java | 4 +++ .../window/processor/CountWindowingProcessor.java | 40 +++++++++++++++++++--- .../agent/plugin/builtin/BuiltinPipePlugin.java | 3 ++ .../aggregate/CountWindowingProcessor.java | 30 ++++++++++++++++ 4 files changed, 72 insertions(+), 5 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionProcessorConstructor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionProcessorConstructor.java index 31cc8250ebf..875125e1474 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionProcessorConstructor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionProcessorConstructor.java @@ -20,6 +20,7 @@ package org.apache.iotdb.db.pipe.agent.plugin.dataregion; import org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin; +import org.apache.iotdb.commons.pipe.agent.plugin.builtin.processor.aggregate.CountWindowingProcessor; import org.apache.iotdb.commons.pipe.agent.plugin.builtin.processor.donothing.DoNothingProcessor; import org.apache.iotdb.commons.pipe.agent.plugin.builtin.processor.throwing.ThrowingExceptionProcessor; import org.apache.iotdb.commons.pipe.agent.plugin.constructor.PipeProcessorConstructor; @@ -64,6 +65,9 @@ class PipeDataRegionProcessorConstructor extends PipeProcessorConstructor { pluginConstructors.put( BuiltinPipePlugin.TUMBLING_WINDOWING_PROCESSOR.getPipePluginName(), TumblingWindowingProcessor::new); + pluginConstructors.put( + BuiltinPipePlugin.COUNT_WINDOWING_PROCESSOR.getPipePluginName(), + CountWindowingProcessor::new); pluginConstructors.put( BuiltinPipePlugin.COUNT_POINT_PROCESSOR.getPipePluginName(), TwoStageCountProcessor::new); pluginConstructors.put( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/window/processor/CountWindowingProcessor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/window/processor/CountWindowingProcessor.java index 62e04fd4646..ea286d23a0a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/window/processor/CountWindowingProcessor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/window/processor/CountWindowingProcessor.java @@ -29,36 +29,66 @@ import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; import org.apache.tsfile.utils.Pair; +import java.util.Collections; import java.util.List; import java.util.Set; +import static org.apache.iotdb.commons.pipe.config.constant.PipeProcessorConstant.PROCESSOR_COUNT_DEFAULT_VALUE; +import static org.apache.iotdb.commons.pipe.config.constant.PipeProcessorConstant.PROCESSOR_COUNT_KEY; + @TreeModel public class CountWindowingProcessor extends AbstractSimpleTimeWindowingProcessor { private long count; @Override - public void validate(final PipeParameterValidator validator) throws Exception {} + public void validate(final PipeParameterValidator validator) throws Exception { + final PipeParameters parameters = validator.getParameters(); + validator.validate( + args -> (long) args > 0, + String.format("The parameter %s must be greater than 0", PROCESSOR_COUNT_KEY), + parameters.getLongOrDefault(PROCESSOR_COUNT_KEY, PROCESSOR_COUNT_DEFAULT_VALUE)); + } @Override public void customize( final PipeParameters parameters, final PipeProcessorRuntimeConfiguration configuration) - throws Exception {} + throws Exception { + count = parameters.getLongOrDefault(PROCESSOR_COUNT_KEY, PROCESSOR_COUNT_DEFAULT_VALUE); + } @Override public Set<TimeSeriesWindow> mayAddWindow( final List<TimeSeriesWindow> windowList, final long timeStamp) { - return null; + final TimeSeriesWindow result; + if (windowList.isEmpty()) { + result = new TimeSeriesWindow(this, 0L); + result.setTimestamp(timeStamp); + windowList.add(result); + return Collections.singleton(result); + } + return Collections.emptySet(); } @Override public Pair<WindowState, WindowOutput> updateAndMaySetWindowState( final TimeSeriesWindow window, final long timeStamp) { - return null; + if (timeStamp > window.getTimestamp()) { + window.setTimestamp(timeStamp); + } + if ((long) window.getCustomizedRuntimeValue() >= count - 1) { + return new Pair<>( + WindowState.EMIT_AND_PURGE_WITH_COMPUTE, + new WindowOutput().setTimestamp(timeStamp).setProgressTime(timeStamp)); + } + window.setCustomizedRuntimeValue((long) window.getCustomizedRuntimeValue() + 1); + return new Pair<>(WindowState.COMPUTE, null); } @Override public WindowOutput forceOutput(final TimeSeriesWindow window) { - return null; + return new WindowOutput() + .setTimestamp(window.getTimestamp()) + .setProgressTime(window.getTimestamp()); } } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/BuiltinPipePlugin.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/BuiltinPipePlugin.java index 74bc0d9815a..beb21f56812 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/BuiltinPipePlugin.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/BuiltinPipePlugin.java @@ -20,6 +20,7 @@ package org.apache.iotdb.commons.pipe.agent.plugin.builtin; import org.apache.iotdb.commons.pipe.agent.plugin.builtin.processor.aggregate.AggregateProcessor; +import org.apache.iotdb.commons.pipe.agent.plugin.builtin.processor.aggregate.CountWindowingProcessor; import org.apache.iotdb.commons.pipe.agent.plugin.builtin.processor.aggregate.StandardStatisticsProcessor; import org.apache.iotdb.commons.pipe.agent.plugin.builtin.processor.aggregate.TumblingWindowingProcessor; import org.apache.iotdb.commons.pipe.agent.plugin.builtin.processor.donothing.DoNothingProcessor; @@ -73,6 +74,7 @@ public enum BuiltinPipePlugin { // Hidden-processors, which are plugins of the processors STANDARD_STATISTICS_PROCESSOR("standard-statistics-processor", StandardStatisticsProcessor.class), TUMBLING_WINDOWING_PROCESSOR("tumbling-windowing-processor", TumblingWindowingProcessor.class), + COUNT_WINDOWING_PROCESSOR("count-windowing-processor", CountWindowingProcessor.class), PIPE_CONSENSUS_PROCESSOR("pipe-consensus-processor", PipeConsensusProcessor.class), RENAME_DATABASE_PROCESSOR("rename-database-processor", RenameDatabaseProcessor.class), @@ -155,6 +157,7 @@ public enum BuiltinPipePlugin { AGGREGATE_PROCESSOR.getPipePluginName().toUpperCase(), COUNT_POINT_PROCESSOR.getPipePluginName().toUpperCase(), STANDARD_STATISTICS_PROCESSOR.getPipePluginName().toUpperCase(), + COUNT_WINDOWING_PROCESSOR.getPipePluginName().toUpperCase(), TUMBLING_WINDOWING_PROCESSOR.getPipePluginName().toUpperCase(), PIPE_CONSENSUS_PROCESSOR.getPipePluginName().toUpperCase(), RENAME_DATABASE_PROCESSOR.getPipePluginName().toUpperCase(), diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/processor/aggregate/CountWindowingProcessor.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/processor/aggregate/CountWindowingProcessor.java new file mode 100644 index 00000000000..6481a3b11ae --- /dev/null +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/processor/aggregate/CountWindowingProcessor.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.commons.pipe.agent.plugin.builtin.processor.aggregate; + +import org.apache.iotdb.commons.pipe.agent.plugin.builtin.processor.PlaceHolderProcessor; + +/** + * This class is a placeholder and should not be initialized. It represents the Standard Statistics + * processor. There is a real implementation in the server module but cannot be imported here. The + * pipe agent in the server module will replace this class with the real implementation when + * initializing the Standard Statistics processor. + */ +public class CountWindowingProcessor extends PlaceHolderProcessor {}
