This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new b3052df71db Pipe: Added max / min function to aggregate-processor
(#15459) (#15469)
b3052df71db is described below
commit b3052df71db87dd7ea02887da5f293d3c2a84f78
Author: Caideyipi <[email protected]>
AuthorDate: Thu May 8 12:19:18 2025 +0800
Pipe: Added max / min function to aggregate-processor (#15459) (#15469)
Co-authored-by: Steve Yurong Su <[email protected]>
---
.../iotdb/pipe/it/single/IoTDBPipeAggregateIT.java | 5 +-
.../standardstatistics/MaxValueOperator.java | 54 ++++++++++++++++++++++
.../standardstatistics/MinValueOperator.java | 54 ++++++++++++++++++++++
.../sametype/numeric/AbsoluteMaxOperator.java | 16 +++----
.../{AbsoluteMaxOperator.java => MaxOperator.java} | 32 ++++++-------
.../{AbsoluteMaxOperator.java => MinOperator.java} | 36 +++++++--------
.../StandardStatisticsOperatorProcessor.java | 10 +++-
7 files changed, 162 insertions(+), 45 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeAggregateIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeAggregateIT.java
index cbd9c08f4e5..78ad6b9bf33 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeAggregateIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeAggregateIT.java
@@ -66,7 +66,8 @@ public class IoTDBPipeAggregateIT extends
AbstractPipeSingleIT {
processorAttributes.put("output.database", "root.testdb");
processorAttributes.put(
"output.measurements", "Avg1, peak1, rms1, var1, skew1, kurt1, ff1,
cf1, pf1");
- processorAttributes.put("operators", "avg, peak, rms, var, skew, kurt,
ff, cf, pf, cE");
+ processorAttributes.put(
+ "operators", "avg, peak, rms, var, skew, kurt, ff, cf, pf, cE, max,
min");
processorAttributes.put("sliding.seconds", "60");
connectorAttributes.put("sink", "write-back-sink");
@@ -115,7 +116,7 @@ public class IoTDBPipeAggregateIT extends
AbstractPipeSingleIT {
env,
"select count(*) from root.testdb.** group by level=1",
"count(root.testdb.*.*.*.*),",
- Collections.singleton("20,"));
+ Collections.singleton("24,"));
// Test manually renamed timeSeries count
TestUtils.assertDataEventuallyOnEnv(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/operator/aggregatedresult/standardstatistics/MaxValueOperator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/operator/aggregatedresult/standardstatistics/MaxValueOperator.java
new file mode 100644
index 00000000000..7967771a069
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/operator/aggregatedresult/standardstatistics/MaxValueOperator.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package
org.apache.iotdb.db.pipe.processor.aggregate.operator.aggregatedresult.standardstatistics;
+
+import
org.apache.iotdb.db.pipe.processor.aggregate.operator.aggregatedresult.AggregatedResultOperator;
+import
org.apache.iotdb.db.pipe.processor.aggregate.operator.intermediateresult.CustomizedReadableIntermediateResults;
+
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.utils.Pair;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+
+public class MaxValueOperator implements AggregatedResultOperator {
+ @Override
+ public String getName() {
+ return "max";
+ }
+
+ @Override
+ public void configureSystemParameters(final Map<String, String>
systemParams) {
+ // Do nothing
+ }
+
+ @Override
+ public Set<String> getDeclaredIntermediateValueNames() {
+ return Collections.singleton("max");
+ }
+
+ @Override
+ public Pair<TSDataType, Object> terminateWindow(
+ final TSDataType measurementDataType,
+ final CustomizedReadableIntermediateResults intermediateResults) {
+ return new Pair<>(measurementDataType,
intermediateResults.getObject("max"));
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/operator/aggregatedresult/standardstatistics/MinValueOperator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/operator/aggregatedresult/standardstatistics/MinValueOperator.java
new file mode 100644
index 00000000000..0bed3a0dee0
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/operator/aggregatedresult/standardstatistics/MinValueOperator.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package
org.apache.iotdb.db.pipe.processor.aggregate.operator.aggregatedresult.standardstatistics;
+
+import
org.apache.iotdb.db.pipe.processor.aggregate.operator.aggregatedresult.AggregatedResultOperator;
+import
org.apache.iotdb.db.pipe.processor.aggregate.operator.intermediateresult.CustomizedReadableIntermediateResults;
+
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.utils.Pair;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+
+public class MinValueOperator implements AggregatedResultOperator {
+ @Override
+ public String getName() {
+ return "min";
+ }
+
+ @Override
+ public void configureSystemParameters(final Map<String, String>
systemParams) {
+ // Do nothing
+ }
+
+ @Override
+ public Set<String> getDeclaredIntermediateValueNames() {
+ return Collections.singleton("min");
+ }
+
+ @Override
+ public Pair<TSDataType, Object> terminateWindow(
+ final TSDataType measurementDataType,
+ final CustomizedReadableIntermediateResults intermediateResults) {
+ return new Pair<>(measurementDataType,
intermediateResults.getObject("min"));
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/operator/intermediateresult/sametype/numeric/AbsoluteMaxOperator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/operator/intermediateresult/sametype/numeric/AbsoluteMaxOperator.java
index db9e16f84a3..b121e4c9951 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/operator/intermediateresult/sametype/numeric/AbsoluteMaxOperator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/operator/intermediateresult/sametype/numeric/AbsoluteMaxOperator.java
@@ -26,46 +26,46 @@ public class AbsoluteMaxOperator extends
AbstractSameTypeNumericOperator {
}
@Override
- public boolean initAndGetIsSupport(int initialInput, long initialTimestamp) {
+ public boolean initAndGetIsSupport(final int initialInput, final long
initialTimestamp) {
intValue = Math.abs(initialInput);
return super.initAndGetIsSupport(initialInput, initialTimestamp);
}
@Override
- public boolean initAndGetIsSupport(long initialInput, long initialTimestamp)
{
+ public boolean initAndGetIsSupport(final long initialInput, final long
initialTimestamp) {
longValue = Math.abs(initialInput);
return super.initAndGetIsSupport(initialInput, initialTimestamp);
}
@Override
- public boolean initAndGetIsSupport(float initialInput, long
initialTimestamp) {
+ public boolean initAndGetIsSupport(final float initialInput, final long
initialTimestamp) {
floatValue = Math.abs(initialInput);
return super.initAndGetIsSupport(initialInput, initialTimestamp);
}
@Override
- public boolean initAndGetIsSupport(double initialInput, long
initialTimestamp) {
+ public boolean initAndGetIsSupport(final double initialInput, final long
initialTimestamp) {
doubleValue = Math.abs(initialInput);
return super.initAndGetIsSupport(initialInput, initialTimestamp);
}
@Override
- public void updateValue(int input, long timestamp) {
+ public void updateValue(final int input, final long timestamp) {
intValue = Math.max(intValue, Math.abs(input));
}
@Override
- public void updateValue(long input, long timestamp) {
+ public void updateValue(final long input, final long timestamp) {
longValue = Math.max(longValue, Math.abs(input));
}
@Override
- public void updateValue(float input, long timestamp) {
+ public void updateValue(final float input, final long timestamp) {
floatValue = Math.max(floatValue, Math.abs(input));
}
@Override
- public void updateValue(double input, long timestamp) {
+ public void updateValue(final double input, final long timestamp) {
doubleValue = Math.max(doubleValue, Math.abs(input));
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/operator/intermediateresult/sametype/numeric/AbsoluteMaxOperator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/operator/intermediateresult/sametype/numeric/MaxOperator.java
similarity index 63%
copy from
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/operator/intermediateresult/sametype/numeric/AbsoluteMaxOperator.java
copy to
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/operator/intermediateresult/sametype/numeric/MaxOperator.java
index db9e16f84a3..f994a652941 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/operator/intermediateresult/sametype/numeric/AbsoluteMaxOperator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/operator/intermediateresult/sametype/numeric/MaxOperator.java
@@ -19,53 +19,53 @@
package
org.apache.iotdb.db.pipe.processor.aggregate.operator.intermediateresult.sametype.numeric;
-public class AbsoluteMaxOperator extends AbstractSameTypeNumericOperator {
+public class MaxOperator extends AbstractSameTypeNumericOperator {
@Override
public String getName() {
- return "abs_max";
+ return "max";
}
@Override
public boolean initAndGetIsSupport(int initialInput, long initialTimestamp) {
- intValue = Math.abs(initialInput);
+ intValue = initialInput;
return super.initAndGetIsSupport(initialInput, initialTimestamp);
}
@Override
public boolean initAndGetIsSupport(long initialInput, long initialTimestamp)
{
- longValue = Math.abs(initialInput);
+ longValue = initialInput;
return super.initAndGetIsSupport(initialInput, initialTimestamp);
}
@Override
- public boolean initAndGetIsSupport(float initialInput, long
initialTimestamp) {
- floatValue = Math.abs(initialInput);
+ public boolean initAndGetIsSupport(final float initialInput, final long
initialTimestamp) {
+ floatValue = initialInput;
return super.initAndGetIsSupport(initialInput, initialTimestamp);
}
@Override
- public boolean initAndGetIsSupport(double initialInput, long
initialTimestamp) {
- doubleValue = Math.abs(initialInput);
+ public boolean initAndGetIsSupport(final double initialInput, final long
initialTimestamp) {
+ doubleValue = initialInput;
return super.initAndGetIsSupport(initialInput, initialTimestamp);
}
@Override
- public void updateValue(int input, long timestamp) {
- intValue = Math.max(intValue, Math.abs(input));
+ public void updateValue(final int input, final long timestamp) {
+ intValue = Math.max(intValue, input);
}
@Override
- public void updateValue(long input, long timestamp) {
- longValue = Math.max(longValue, Math.abs(input));
+ public void updateValue(final long input, final long timestamp) {
+ longValue = Math.max(longValue, input);
}
@Override
- public void updateValue(float input, long timestamp) {
- floatValue = Math.max(floatValue, Math.abs(input));
+ public void updateValue(final float input, final long timestamp) {
+ floatValue = Math.max(floatValue, input);
}
@Override
- public void updateValue(double input, long timestamp) {
- doubleValue = Math.max(doubleValue, Math.abs(input));
+ public void updateValue(final double input, final long timestamp) {
+ doubleValue = Math.max(doubleValue, input);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/operator/intermediateresult/sametype/numeric/AbsoluteMaxOperator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/operator/intermediateresult/sametype/numeric/MinOperator.java
similarity index 57%
copy from
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/operator/intermediateresult/sametype/numeric/AbsoluteMaxOperator.java
copy to
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/operator/intermediateresult/sametype/numeric/MinOperator.java
index db9e16f84a3..947eca6ff63 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/operator/intermediateresult/sametype/numeric/AbsoluteMaxOperator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/operator/intermediateresult/sametype/numeric/MinOperator.java
@@ -19,53 +19,53 @@
package
org.apache.iotdb.db.pipe.processor.aggregate.operator.intermediateresult.sametype.numeric;
-public class AbsoluteMaxOperator extends AbstractSameTypeNumericOperator {
+public class MinOperator extends AbstractSameTypeNumericOperator {
@Override
public String getName() {
- return "abs_max";
+ return "min";
}
@Override
- public boolean initAndGetIsSupport(int initialInput, long initialTimestamp) {
- intValue = Math.abs(initialInput);
+ public boolean initAndGetIsSupport(final int initialInput, final long
initialTimestamp) {
+ intValue = initialInput;
return super.initAndGetIsSupport(initialInput, initialTimestamp);
}
@Override
- public boolean initAndGetIsSupport(long initialInput, long initialTimestamp)
{
- longValue = Math.abs(initialInput);
+ public boolean initAndGetIsSupport(final long initialInput, final long
initialTimestamp) {
+ longValue = initialInput;
return super.initAndGetIsSupport(initialInput, initialTimestamp);
}
@Override
- public boolean initAndGetIsSupport(float initialInput, long
initialTimestamp) {
- floatValue = Math.abs(initialInput);
+ public boolean initAndGetIsSupport(final float initialInput, final long
initialTimestamp) {
+ floatValue = initialInput;
return super.initAndGetIsSupport(initialInput, initialTimestamp);
}
@Override
- public boolean initAndGetIsSupport(double initialInput, long
initialTimestamp) {
- doubleValue = Math.abs(initialInput);
+ public boolean initAndGetIsSupport(final double initialInput, final long
initialTimestamp) {
+ doubleValue = initialInput;
return super.initAndGetIsSupport(initialInput, initialTimestamp);
}
@Override
- public void updateValue(int input, long timestamp) {
- intValue = Math.max(intValue, Math.abs(input));
+ public void updateValue(final int input, final long timestamp) {
+ intValue = Math.min(intValue, input);
}
@Override
- public void updateValue(long input, long timestamp) {
- longValue = Math.max(longValue, Math.abs(input));
+ public void updateValue(final long input, final long timestamp) {
+ longValue = Math.min(longValue, input);
}
@Override
- public void updateValue(float input, long timestamp) {
- floatValue = Math.max(floatValue, Math.abs(input));
+ public void updateValue(final float input, final long timestamp) {
+ floatValue = Math.min(floatValue, input);
}
@Override
- public void updateValue(double input, long timestamp) {
- doubleValue = Math.max(doubleValue, Math.abs(input));
+ public void updateValue(final double input, final long timestamp) {
+ doubleValue = Math.min(doubleValue, input);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/operator/processor/StandardStatisticsOperatorProcessor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/operator/processor/StandardStatisticsOperatorProcessor.java
index fe010a0a1a1..3d3250088f8 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/operator/processor/StandardStatisticsOperatorProcessor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/operator/processor/StandardStatisticsOperatorProcessor.java
@@ -25,6 +25,8 @@ import
org.apache.iotdb.db.pipe.processor.aggregate.operator.aggregatedresult.st
import
org.apache.iotdb.db.pipe.processor.aggregate.operator.aggregatedresult.standardstatistics.CrestFactorOperator;
import
org.apache.iotdb.db.pipe.processor.aggregate.operator.aggregatedresult.standardstatistics.FormFactorOperator;
import
org.apache.iotdb.db.pipe.processor.aggregate.operator.aggregatedresult.standardstatistics.KurtosisOperator;
+import
org.apache.iotdb.db.pipe.processor.aggregate.operator.aggregatedresult.standardstatistics.MaxValueOperator;
+import
org.apache.iotdb.db.pipe.processor.aggregate.operator.aggregatedresult.standardstatistics.MinValueOperator;
import
org.apache.iotdb.db.pipe.processor.aggregate.operator.aggregatedresult.standardstatistics.PeakOperator;
import
org.apache.iotdb.db.pipe.processor.aggregate.operator.aggregatedresult.standardstatistics.PulseFactorOperator;
import
org.apache.iotdb.db.pipe.processor.aggregate.operator.aggregatedresult.standardstatistics.RootMeanSquareOperator;
@@ -33,6 +35,8 @@ import
org.apache.iotdb.db.pipe.processor.aggregate.operator.aggregatedresult.st
import
org.apache.iotdb.db.pipe.processor.aggregate.operator.intermediateresult.IntermediateResultOperator;
import
org.apache.iotdb.db.pipe.processor.aggregate.operator.intermediateresult.sametype.numeric.AbsoluteMaxOperator;
import
org.apache.iotdb.db.pipe.processor.aggregate.operator.intermediateresult.sametype.numeric.IntegralPoweredSumOperator;
+import
org.apache.iotdb.db.pipe.processor.aggregate.operator.intermediateresult.sametype.numeric.MaxOperator;
+import
org.apache.iotdb.db.pipe.processor.aggregate.operator.intermediateresult.sametype.numeric.MinOperator;
import
org.apache.iotdb.db.pipe.processor.aggregate.operator.intermediateresult.specifictype.doubletype.FractionPoweredSumOperator;
import
org.apache.iotdb.db.pipe.processor.aggregate.operator.intermediateresult.specifictype.integertype.CountOperator;
@@ -54,6 +58,8 @@ public class StandardStatisticsOperatorProcessor extends
AbstractOperatorProcess
new FormFactorOperator(),
new KurtosisOperator(),
new PeakOperator(),
+ new MaxValueOperator(),
+ new MinValueOperator(),
new PulseFactorOperator(),
new RootMeanSquareOperator(),
new SkewnessOperator(),
@@ -71,6 +77,8 @@ public class StandardStatisticsOperatorProcessor extends
AbstractOperatorProcess
() -> new IntegralPoweredSumOperator(1),
() -> new IntegralPoweredSumOperator(2),
() -> new IntegralPoweredSumOperator(3),
- () -> new IntegralPoweredSumOperator(4))));
+ () -> new IntegralPoweredSumOperator(4),
+ MaxOperator::new,
+ MinOperator::new)));
}
}