This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new b3052df71db Pipe: Added max / min function to aggregate-processor 
(#15459) (#15469)
b3052df71db is described below

commit b3052df71db87dd7ea02887da5f293d3c2a84f78
Author: Caideyipi <[email protected]>
AuthorDate: Thu May 8 12:19:18 2025 +0800

    Pipe: Added max / min function to aggregate-processor (#15459) (#15469)
    
    Co-authored-by: Steve Yurong Su <[email protected]>
---
 .../iotdb/pipe/it/single/IoTDBPipeAggregateIT.java |  5 +-
 .../standardstatistics/MaxValueOperator.java       | 54 ++++++++++++++++++++++
 .../standardstatistics/MinValueOperator.java       | 54 ++++++++++++++++++++++
 .../sametype/numeric/AbsoluteMaxOperator.java      | 16 +++----
 .../{AbsoluteMaxOperator.java => MaxOperator.java} | 32 ++++++-------
 .../{AbsoluteMaxOperator.java => MinOperator.java} | 36 +++++++--------
 .../StandardStatisticsOperatorProcessor.java       | 10 +++-
 7 files changed, 162 insertions(+), 45 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeAggregateIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeAggregateIT.java
index cbd9c08f4e5..78ad6b9bf33 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeAggregateIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeAggregateIT.java
@@ -66,7 +66,8 @@ public class IoTDBPipeAggregateIT extends 
AbstractPipeSingleIT {
       processorAttributes.put("output.database", "root.testdb");
       processorAttributes.put(
           "output.measurements", "Avg1, peak1, rms1, var1, skew1, kurt1, ff1, 
cf1, pf1");
-      processorAttributes.put("operators", "avg, peak, rms, var, skew, kurt, 
ff, cf, pf, cE");
+      processorAttributes.put(
+          "operators", "avg, peak, rms, var, skew, kurt, ff, cf, pf, cE, max, 
min");
       processorAttributes.put("sliding.seconds", "60");
 
       connectorAttributes.put("sink", "write-back-sink");
@@ -115,7 +116,7 @@ public class IoTDBPipeAggregateIT extends 
AbstractPipeSingleIT {
           env,
           "select count(*) from root.testdb.** group by level=1",
           "count(root.testdb.*.*.*.*),",
-          Collections.singleton("20,"));
+          Collections.singleton("24,"));
 
       // Test manually renamed timeSeries count
       TestUtils.assertDataEventuallyOnEnv(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/operator/aggregatedresult/standardstatistics/MaxValueOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/operator/aggregatedresult/standardstatistics/MaxValueOperator.java
new file mode 100644
index 00000000000..7967771a069
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/operator/aggregatedresult/standardstatistics/MaxValueOperator.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package 
org.apache.iotdb.db.pipe.processor.aggregate.operator.aggregatedresult.standardstatistics;
+
+import 
org.apache.iotdb.db.pipe.processor.aggregate.operator.aggregatedresult.AggregatedResultOperator;
+import 
org.apache.iotdb.db.pipe.processor.aggregate.operator.intermediateresult.CustomizedReadableIntermediateResults;
+
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.utils.Pair;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+
+public class MaxValueOperator implements AggregatedResultOperator {
+  @Override
+  public String getName() {
+    return "max";
+  }
+
+  @Override
+  public void configureSystemParameters(final Map<String, String> 
systemParams) {
+    // Do nothing
+  }
+
+  @Override
+  public Set<String> getDeclaredIntermediateValueNames() {
+    return Collections.singleton("max");
+  }
+
+  @Override
+  public Pair<TSDataType, Object> terminateWindow(
+      final TSDataType measurementDataType,
+      final CustomizedReadableIntermediateResults intermediateResults) {
+    return new Pair<>(measurementDataType, 
intermediateResults.getObject("max"));
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/operator/aggregatedresult/standardstatistics/MinValueOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/operator/aggregatedresult/standardstatistics/MinValueOperator.java
new file mode 100644
index 00000000000..0bed3a0dee0
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/operator/aggregatedresult/standardstatistics/MinValueOperator.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package 
org.apache.iotdb.db.pipe.processor.aggregate.operator.aggregatedresult.standardstatistics;
+
+import 
org.apache.iotdb.db.pipe.processor.aggregate.operator.aggregatedresult.AggregatedResultOperator;
+import 
org.apache.iotdb.db.pipe.processor.aggregate.operator.intermediateresult.CustomizedReadableIntermediateResults;
+
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.utils.Pair;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+
+public class MinValueOperator implements AggregatedResultOperator {
+  @Override
+  public String getName() {
+    return "min";
+  }
+
+  @Override
+  public void configureSystemParameters(final Map<String, String> 
systemParams) {
+    // Do nothing
+  }
+
+  @Override
+  public Set<String> getDeclaredIntermediateValueNames() {
+    return Collections.singleton("min");
+  }
+
+  @Override
+  public Pair<TSDataType, Object> terminateWindow(
+      final TSDataType measurementDataType,
+      final CustomizedReadableIntermediateResults intermediateResults) {
+    return new Pair<>(measurementDataType, 
intermediateResults.getObject("min"));
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/operator/intermediateresult/sametype/numeric/AbsoluteMaxOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/operator/intermediateresult/sametype/numeric/AbsoluteMaxOperator.java
index db9e16f84a3..b121e4c9951 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/operator/intermediateresult/sametype/numeric/AbsoluteMaxOperator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/operator/intermediateresult/sametype/numeric/AbsoluteMaxOperator.java
@@ -26,46 +26,46 @@ public class AbsoluteMaxOperator extends 
AbstractSameTypeNumericOperator {
   }
 
   @Override
-  public boolean initAndGetIsSupport(int initialInput, long initialTimestamp) {
+  public boolean initAndGetIsSupport(final int initialInput, final long 
initialTimestamp) {
     intValue = Math.abs(initialInput);
     return super.initAndGetIsSupport(initialInput, initialTimestamp);
   }
 
   @Override
-  public boolean initAndGetIsSupport(long initialInput, long initialTimestamp) 
{
+  public boolean initAndGetIsSupport(final long initialInput, final long 
initialTimestamp) {
     longValue = Math.abs(initialInput);
     return super.initAndGetIsSupport(initialInput, initialTimestamp);
   }
 
   @Override
-  public boolean initAndGetIsSupport(float initialInput, long 
initialTimestamp) {
+  public boolean initAndGetIsSupport(final float initialInput, final long 
initialTimestamp) {
     floatValue = Math.abs(initialInput);
     return super.initAndGetIsSupport(initialInput, initialTimestamp);
   }
 
   @Override
-  public boolean initAndGetIsSupport(double initialInput, long 
initialTimestamp) {
+  public boolean initAndGetIsSupport(final double initialInput, final long 
initialTimestamp) {
     doubleValue = Math.abs(initialInput);
     return super.initAndGetIsSupport(initialInput, initialTimestamp);
   }
 
   @Override
-  public void updateValue(int input, long timestamp) {
+  public void updateValue(final int input, final long timestamp) {
     intValue = Math.max(intValue, Math.abs(input));
   }
 
   @Override
-  public void updateValue(long input, long timestamp) {
+  public void updateValue(final long input, final long timestamp) {
     longValue = Math.max(longValue, Math.abs(input));
   }
 
   @Override
-  public void updateValue(float input, long timestamp) {
+  public void updateValue(final float input, final long timestamp) {
     floatValue = Math.max(floatValue, Math.abs(input));
   }
 
   @Override
-  public void updateValue(double input, long timestamp) {
+  public void updateValue(final double input, final long timestamp) {
     doubleValue = Math.max(doubleValue, Math.abs(input));
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/operator/intermediateresult/sametype/numeric/AbsoluteMaxOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/operator/intermediateresult/sametype/numeric/MaxOperator.java
similarity index 63%
copy from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/operator/intermediateresult/sametype/numeric/AbsoluteMaxOperator.java
copy to 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/operator/intermediateresult/sametype/numeric/MaxOperator.java
index db9e16f84a3..f994a652941 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/operator/intermediateresult/sametype/numeric/AbsoluteMaxOperator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/operator/intermediateresult/sametype/numeric/MaxOperator.java
@@ -19,53 +19,53 @@
 
 package 
org.apache.iotdb.db.pipe.processor.aggregate.operator.intermediateresult.sametype.numeric;
 
-public class AbsoluteMaxOperator extends AbstractSameTypeNumericOperator {
+public class MaxOperator extends AbstractSameTypeNumericOperator {
   @Override
   public String getName() {
-    return "abs_max";
+    return "max";
   }
 
   @Override
   public boolean initAndGetIsSupport(int initialInput, long initialTimestamp) {
-    intValue = Math.abs(initialInput);
+    intValue = initialInput;
     return super.initAndGetIsSupport(initialInput, initialTimestamp);
   }
 
   @Override
   public boolean initAndGetIsSupport(long initialInput, long initialTimestamp) 
{
-    longValue = Math.abs(initialInput);
+    longValue = initialInput;
     return super.initAndGetIsSupport(initialInput, initialTimestamp);
   }
 
   @Override
-  public boolean initAndGetIsSupport(float initialInput, long 
initialTimestamp) {
-    floatValue = Math.abs(initialInput);
+  public boolean initAndGetIsSupport(final float initialInput, final long 
initialTimestamp) {
+    floatValue = initialInput;
     return super.initAndGetIsSupport(initialInput, initialTimestamp);
   }
 
   @Override
-  public boolean initAndGetIsSupport(double initialInput, long 
initialTimestamp) {
-    doubleValue = Math.abs(initialInput);
+  public boolean initAndGetIsSupport(final double initialInput, final long 
initialTimestamp) {
+    doubleValue = initialInput;
     return super.initAndGetIsSupport(initialInput, initialTimestamp);
   }
 
   @Override
-  public void updateValue(int input, long timestamp) {
-    intValue = Math.max(intValue, Math.abs(input));
+  public void updateValue(final int input, final long timestamp) {
+    intValue = Math.max(intValue, input);
   }
 
   @Override
-  public void updateValue(long input, long timestamp) {
-    longValue = Math.max(longValue, Math.abs(input));
+  public void updateValue(final long input, final long timestamp) {
+    longValue = Math.max(longValue, input);
   }
 
   @Override
-  public void updateValue(float input, long timestamp) {
-    floatValue = Math.max(floatValue, Math.abs(input));
+  public void updateValue(final float input, final long timestamp) {
+    floatValue = Math.max(floatValue, input);
   }
 
   @Override
-  public void updateValue(double input, long timestamp) {
-    doubleValue = Math.max(doubleValue, Math.abs(input));
+  public void updateValue(final double input, final long timestamp) {
+    doubleValue = Math.max(doubleValue, input);
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/operator/intermediateresult/sametype/numeric/AbsoluteMaxOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/operator/intermediateresult/sametype/numeric/MinOperator.java
similarity index 57%
copy from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/operator/intermediateresult/sametype/numeric/AbsoluteMaxOperator.java
copy to 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/operator/intermediateresult/sametype/numeric/MinOperator.java
index db9e16f84a3..947eca6ff63 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/operator/intermediateresult/sametype/numeric/AbsoluteMaxOperator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/operator/intermediateresult/sametype/numeric/MinOperator.java
@@ -19,53 +19,53 @@
 
 package 
org.apache.iotdb.db.pipe.processor.aggregate.operator.intermediateresult.sametype.numeric;
 
-public class AbsoluteMaxOperator extends AbstractSameTypeNumericOperator {
+public class MinOperator extends AbstractSameTypeNumericOperator {
   @Override
   public String getName() {
-    return "abs_max";
+    return "min";
   }
 
   @Override
-  public boolean initAndGetIsSupport(int initialInput, long initialTimestamp) {
-    intValue = Math.abs(initialInput);
+  public boolean initAndGetIsSupport(final int initialInput, final long 
initialTimestamp) {
+    intValue = initialInput;
     return super.initAndGetIsSupport(initialInput, initialTimestamp);
   }
 
   @Override
-  public boolean initAndGetIsSupport(long initialInput, long initialTimestamp) 
{
-    longValue = Math.abs(initialInput);
+  public boolean initAndGetIsSupport(final long initialInput, final long 
initialTimestamp) {
+    longValue = initialInput;
     return super.initAndGetIsSupport(initialInput, initialTimestamp);
   }
 
   @Override
-  public boolean initAndGetIsSupport(float initialInput, long 
initialTimestamp) {
-    floatValue = Math.abs(initialInput);
+  public boolean initAndGetIsSupport(final float initialInput, final long 
initialTimestamp) {
+    floatValue = initialInput;
     return super.initAndGetIsSupport(initialInput, initialTimestamp);
   }
 
   @Override
-  public boolean initAndGetIsSupport(double initialInput, long 
initialTimestamp) {
-    doubleValue = Math.abs(initialInput);
+  public boolean initAndGetIsSupport(final double initialInput, final long 
initialTimestamp) {
+    doubleValue = initialInput;
     return super.initAndGetIsSupport(initialInput, initialTimestamp);
   }
 
   @Override
-  public void updateValue(int input, long timestamp) {
-    intValue = Math.max(intValue, Math.abs(input));
+  public void updateValue(final int input, final long timestamp) {
+    intValue = Math.min(intValue, input);
   }
 
   @Override
-  public void updateValue(long input, long timestamp) {
-    longValue = Math.max(longValue, Math.abs(input));
+  public void updateValue(final long input, final long timestamp) {
+    longValue = Math.min(longValue, input);
   }
 
   @Override
-  public void updateValue(float input, long timestamp) {
-    floatValue = Math.max(floatValue, Math.abs(input));
+  public void updateValue(final float input, final long timestamp) {
+    floatValue = Math.min(floatValue, input);
   }
 
   @Override
-  public void updateValue(double input, long timestamp) {
-    doubleValue = Math.max(doubleValue, Math.abs(input));
+  public void updateValue(final double input, final long timestamp) {
+    doubleValue = Math.min(doubleValue, input);
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/operator/processor/StandardStatisticsOperatorProcessor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/operator/processor/StandardStatisticsOperatorProcessor.java
index fe010a0a1a1..3d3250088f8 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/operator/processor/StandardStatisticsOperatorProcessor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/operator/processor/StandardStatisticsOperatorProcessor.java
@@ -25,6 +25,8 @@ import 
org.apache.iotdb.db.pipe.processor.aggregate.operator.aggregatedresult.st
 import 
org.apache.iotdb.db.pipe.processor.aggregate.operator.aggregatedresult.standardstatistics.CrestFactorOperator;
 import 
org.apache.iotdb.db.pipe.processor.aggregate.operator.aggregatedresult.standardstatistics.FormFactorOperator;
 import 
org.apache.iotdb.db.pipe.processor.aggregate.operator.aggregatedresult.standardstatistics.KurtosisOperator;
+import 
org.apache.iotdb.db.pipe.processor.aggregate.operator.aggregatedresult.standardstatistics.MaxValueOperator;
+import 
org.apache.iotdb.db.pipe.processor.aggregate.operator.aggregatedresult.standardstatistics.MinValueOperator;
 import 
org.apache.iotdb.db.pipe.processor.aggregate.operator.aggregatedresult.standardstatistics.PeakOperator;
 import 
org.apache.iotdb.db.pipe.processor.aggregate.operator.aggregatedresult.standardstatistics.PulseFactorOperator;
 import 
org.apache.iotdb.db.pipe.processor.aggregate.operator.aggregatedresult.standardstatistics.RootMeanSquareOperator;
@@ -33,6 +35,8 @@ import 
org.apache.iotdb.db.pipe.processor.aggregate.operator.aggregatedresult.st
 import 
org.apache.iotdb.db.pipe.processor.aggregate.operator.intermediateresult.IntermediateResultOperator;
 import 
org.apache.iotdb.db.pipe.processor.aggregate.operator.intermediateresult.sametype.numeric.AbsoluteMaxOperator;
 import 
org.apache.iotdb.db.pipe.processor.aggregate.operator.intermediateresult.sametype.numeric.IntegralPoweredSumOperator;
+import 
org.apache.iotdb.db.pipe.processor.aggregate.operator.intermediateresult.sametype.numeric.MaxOperator;
+import 
org.apache.iotdb.db.pipe.processor.aggregate.operator.intermediateresult.sametype.numeric.MinOperator;
 import 
org.apache.iotdb.db.pipe.processor.aggregate.operator.intermediateresult.specifictype.doubletype.FractionPoweredSumOperator;
 import 
org.apache.iotdb.db.pipe.processor.aggregate.operator.intermediateresult.specifictype.integertype.CountOperator;
 
@@ -54,6 +58,8 @@ public class StandardStatisticsOperatorProcessor extends 
AbstractOperatorProcess
                 new FormFactorOperator(),
                 new KurtosisOperator(),
                 new PeakOperator(),
+                new MaxValueOperator(),
+                new MinValueOperator(),
                 new PulseFactorOperator(),
                 new RootMeanSquareOperator(),
                 new SkewnessOperator(),
@@ -71,6 +77,8 @@ public class StandardStatisticsOperatorProcessor extends 
AbstractOperatorProcess
                 () -> new IntegralPoweredSumOperator(1),
                 () -> new IntegralPoweredSumOperator(2),
                 () -> new IntegralPoweredSumOperator(3),
-                () -> new IntegralPoweredSumOperator(4))));
+                () -> new IntegralPoweredSumOperator(4),
+                MaxOperator::new,
+                MinOperator::new)));
   }
 }

Reply via email to