Caideyipi commented on code in PR #14168:
URL: https://github.com/apache/iotdb/pull/14168#discussion_r1893429756
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/changing/ChangingPointFilter.java:
##########
@@ -79,23 +76,28 @@ private boolean tryFilter(final long timestamp, final T
value) {
return false;
}
- reset(timestamp, value);
+ reset(arrivalTime, timestamp, value);
return true;
}
// For other numerical types, we compare the value difference
if (Math.abs(
Double.parseDouble(lastStoredValue.toString()) -
Double.parseDouble(value.toString()))
- > processor.getCompressionDeviation()) {
- reset(timestamp, value);
+ > compressionDeviation) {
+ reset(arrivalTime, timestamp, value);
return true;
}
return false;
}
- private void reset(final long timestamp, final T value) {
- lastStoredTimestamp = timestamp;
+ public void reset(final long arrivalTime, final long timestamp, final Object
value) {
+ lastPointArrivalTime = arrivalTime;
+ lastPointEventTime = timestamp;
Review Comment:
It seems that this method is identical to "init" except for the name...
##########
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeProcessorConstant.java:
##########
@@ -77,6 +87,22 @@ public class PipeProcessorConstant {
public static final long
PROCESSOR_CHANGING_VALUE_MAX_TIME_INTERVAL_DEFAULT_VALUE =
Long.MAX_VALUE;
+ public static final String
PROCESSOR_CHANGING_POINT_ARRIVAL_TIME_MIN_INTERVAL =
+ "processor.changing-point.arrival-time.min-interval";
+ public static final String
PROCESSOR_CHANGING_POINT_ARRIVAL_TIME_MAX_INTERVAL =
+ "processor.changing-point.arrival-time.max-interval";
+ public static final String PROCESSOR_CHANGING_POINT_EVENT_TIME_MIN_INTERVAL =
+ "processor.changing-point.event-time.min-interval";
Review Comment:
PTAL at the feishu doc? Maybe the "changing-point" and the "sdt" prefix is
not here for "arrival-time" and "event-time"... And the customization
(currently validation) shall be placed in the super class
"DownSamplingProcessor"?
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/DownSamplingProcessor.java:
##########
@@ -71,6 +93,50 @@ public void validate(PipeParameterValidator validator)
throws Exception {
memoryLimitInBytes);
}
+ public void validatorTimeInterval(final PipeParameterValidator validator)
throws Exception {
Review Comment:
Better use "validate"
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/changing/ChangingPointFilter.java:
##########
@@ -19,57 +19,54 @@
package org.apache.iotdb.db.pipe.processor.downsampling.changing;
+import org.apache.iotdb.db.pipe.processor.downsampling.DownSamplingFilter;
import org.apache.iotdb.pipe.api.type.Binary;
+import org.apache.tsfile.utils.RamUsageEstimator;
+
import java.time.LocalDate;
import java.util.Objects;
-public class ChangingValueFilter<T> {
+public class ChangingPointFilter extends DownSamplingFilter {
- private final ChangingValueSamplingProcessor processor;
+ private static final long estimatedMemory =
+ RamUsageEstimator.shallowSizeOfInstance(ChangingPointFilter.class);
/**
- * The last stored time and value we compare current point against
lastReadTimestamp and
- * lastReadValue
+ * The maximum absolute difference the user set if the data's value is within
+ * compressionDeviation, it will be compressed and discarded after
compression
*/
- private long lastStoredTimestamp;
+ private final double compressionDeviation;
- private T lastStoredValue;
+ private Object lastStoredValue;
- public ChangingValueFilter(
- final ChangingValueSamplingProcessor processor,
+ public ChangingPointFilter(
+ final long arrivalTime,
final long firstTimestamp,
- final T firstValue) {
- this.processor = processor;
- init(firstTimestamp, firstValue);
+ final Object firstValue,
+ final double compressionDeviation,
+ final boolean isFilterArrivalTime) {
+ super(arrivalTime, firstTimestamp, isFilterArrivalTime);
+ lastStoredValue = firstValue;
+ this.compressionDeviation = compressionDeviation;
}
- private void init(final long firstTimestamp, final T firstValue) {
- lastStoredTimestamp = firstTimestamp;
+ private void init(final long arrivalTime, long firstTimestamp, final Object
firstValue) {
+ lastPointArrivalTime = arrivalTime;
+ lastPointEventTime = firstTimestamp;
Review Comment:
Better call "super();"?
##########
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeProcessorConstant.java:
##########
@@ -77,6 +87,22 @@ public class PipeProcessorConstant {
public static final long
PROCESSOR_CHANGING_VALUE_MAX_TIME_INTERVAL_DEFAULT_VALUE =
Long.MAX_VALUE;
+ public static final String
PROCESSOR_CHANGING_POINT_ARRIVAL_TIME_MIN_INTERVAL =
+ "processor.changing-point.arrival-time.min-interval";
+ public static final String
PROCESSOR_CHANGING_POINT_ARRIVAL_TIME_MAX_INTERVAL =
+ "processor.changing-point.arrival-time.max-interval";
+ public static final String PROCESSOR_CHANGING_POINT_EVENT_TIME_MIN_INTERVAL =
+ "processor.changing-point.event-time.min-interval";
+ public static final String PROCESSOR_CHANGING_POINT_EVENT_TIME_MAX_INTERVAL =
+ "processor.changing-point.event-time.max-interval";
+ public static final String PROCESSOR_CHANGING_POINT_VALUE_INTERVAL =
+ "processor.changing-point.value-interval";
Review Comment:
"value-variation"?
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/changing/ChangingPointSamplingProcessor.java:
##########
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.processor.downsampling.changing;
+
+import org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin;
+import org.apache.iotdb.commons.pipe.config.constant.PipeProcessorConstant;
+import org.apache.iotdb.db.pipe.event.common.row.PipeRemarkableRow;
+import org.apache.iotdb.db.pipe.event.common.row.PipeRow;
+import org.apache.iotdb.db.pipe.processor.downsampling.DownSamplingProcessor;
+import
org.apache.iotdb.db.pipe.processor.downsampling.PartialPathLastObjectCache;
+import org.apache.iotdb.pipe.api.access.Row;
+import org.apache.iotdb.pipe.api.collector.RowCollector;
+import
org.apache.iotdb.pipe.api.customizer.configuration.PipeProcessorRuntimeConfiguration;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
+
+import org.apache.tsfile.common.constant.TsFileConstant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class ChangingPointSamplingProcessor extends DownSamplingProcessor {
+
+ private static final Logger LOGGER =
+ LoggerFactory.getLogger(ChangingPointSamplingProcessor.class);
+
+ /**
+ * The maximum absolute difference the user set if the data's value is within
+ * compressionDeviation, it will be compressed and discarded after
compression
+ */
+ private double compressionDeviation;
+
+ private boolean isFilteredByArrivalTime = true;
+
+ private PartialPathLastObjectCache<ChangingPointFilter> pathLastObjectCache;
+
+ @Override
+ public void validate(PipeParameterValidator validator) throws Exception {
+ super.validate(validator);
+
+ final PipeParameters parameters = validator.getParameters();
+ compressionDeviation =
+ parameters.getDoubleOrDefault(
+
PipeProcessorConstant.PROCESSOR_CHANGING_VALUE_COMPRESSION_DEVIATION,
+
PipeProcessorConstant.PROCESSOR_CHANGING_VALUE_COMPRESSION_DEVIATION_DEFAULT_VALUE);
+
+ final boolean isChangingPointProcessor =
+ BuiltinPipePlugin.CHANGING_POINT_SAMPLING_PROCESSOR
+ .getPipePluginName()
+ .equals(parameters.getString("processor"));
+
+ if (isChangingPointProcessor) {
+ isFilteredByArrivalTime = true;
+ compressionDeviation =
+ parameters.getDoubleOrDefault(
+ PipeProcessorConstant.PROCESSOR_CHANGING_POINT_VALUE_INTERVAL,
+
PipeProcessorConstant.PROCESSOR_CHANGING_POINT_VALUE_INTERVAL_DEFAULT_VALUE);
+ eventTimeMinInterval =
+ parameters.getLongOrDefault(
+
PipeProcessorConstant.PROCESSOR_CHANGING_POINT_EVENT_TIME_MIN_INTERVAL,
+
PipeProcessorConstant.PROCESSOR_CHANGING_POINT_EVENT_TIME_MIN_INTERVAL_DEFAULT_VALUE);
+ eventTimeMaxInterval =
+ parameters.getLongOrDefault(
+
PipeProcessorConstant.PROCESSOR_CHANGING_POINT_EVENT_TIME_MAX_INTERVAL,
+
PipeProcessorConstant.PROCESSOR_CHANGING_POINT_EVENT_TIME_MAX_INTERVAL_DEFAULT_VALUE);
+ arrivalTimeMinInterval =
+ parameters.getLongOrDefault(
+
PipeProcessorConstant.PROCESSOR_CHANGING_POINT_ARRIVAL_TIME_MIN_INTERVAL,
+ PipeProcessorConstant
+
.PROCESSOR_CHANGING_POINT_ARRIVAL_TIME_MIN_INTERVAL_DEFAULT_VALUE);
+ arrivalTimeMaxInterval =
+ parameters.getLongOrDefault(
+
PipeProcessorConstant.PROCESSOR_CHANGING_POINT_ARRIVAL_TIME_MAX_INTERVAL,
+ PipeProcessorConstant
+
.PROCESSOR_CHANGING_POINT_ARRIVAL_TIME_MAX_INTERVAL_DEFAULT_VALUE);
+ } else {
+ isFilteredByArrivalTime = false;
+ compressionDeviation =
+ parameters.getDoubleOrDefault(
+
PipeProcessorConstant.PROCESSOR_CHANGING_VALUE_COMPRESSION_DEVIATION,
+
PipeProcessorConstant.PROCESSOR_CHANGING_POINT_VALUE_INTERVAL_DEFAULT_VALUE);
+ eventTimeMinInterval =
+ parameters.getLongOrDefault(
+
PipeProcessorConstant.PROCESSOR_CHANGING_VALUE_MIN_TIME_INTERVAL_KEY,
+
PipeProcessorConstant.PROCESSOR_CHANGING_VALUE_MIN_TIME_INTERVAL_DEFAULT_VALUE);
+ eventTimeMaxInterval =
+ parameters.getLongOrDefault(
+
PipeProcessorConstant.PROCESSOR_CHANGING_VALUE_MAX_TIME_INTERVAL_KEY,
+
PipeProcessorConstant.PROCESSOR_CHANGING_VALUE_MAX_TIME_INTERVAL_DEFAULT_VALUE);
+ // will not be used
+ arrivalTimeMinInterval = 0;
+ arrivalTimeMaxInterval = Long.MAX_VALUE;
+ }
+
+ validatorTimeInterval(validator);
+ validator.validate(
+ compressionDeviation -> (Double) compressionDeviation >= 0,
+ String.format(
+ "%s must be >= 0, but got %s",
+
PipeProcessorConstant.PROCESSOR_CHANGING_VALUE_COMPRESSION_DEVIATION,
+ compressionDeviation),
+ compressionDeviation);
+ }
+
+ @Override
+ public void customize(
+ PipeParameters parameters, PipeProcessorRuntimeConfiguration
configuration) {
+ super.customize(parameters, configuration);
+
+ final boolean isChangingPointProcessor =
+ BuiltinPipePlugin.CHANGING_POINT_SAMPLING_PROCESSOR
+ .getPipePluginName()
+ .equals(parameters.getString("processor"));
+
+ if (isChangingPointProcessor) {
Review Comment:
Do we need to separate the logger? It seems that the info log is mostly for
ourselves....
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]