This is an automated email from the ASF dual-hosted git repository. chenyz pushed a commit to branch variation in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 9808d6a7156deef941e4da846824526fc30607b8 Author: Chen YZ <[email protected]> AuthorDate: Mon Jun 2 01:16:48 2025 +0800 support ignoreNull --- .../relational/tvf/VariationTableFunction.java | 63 ++++++++++++++++------ 1 file changed, 47 insertions(+), 16 deletions(-) diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/VariationTableFunction.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/VariationTableFunction.java index 284f623f301..7dd15dca9e0 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/VariationTableFunction.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/VariationTableFunction.java @@ -41,8 +41,11 @@ import org.apache.tsfile.block.column.ColumnBuilder; import java.util.Arrays; import java.util.Collections; +import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Queue; +import java.util.Set; import static org.apache.iotdb.commons.udf.builtin.relational.tvf.WindowTVFUtils.findColumnIndex; @@ -50,6 +53,7 @@ public class VariationTableFunction implements TableFunction { private static final String DATA_PARAMETER_NAME = "DATA"; private static final String COL_PARAMETER_NAME = "COL"; private static final String DELTA_PARAMETER_NAME = "DELTA"; + private static final String IGNORE_NULL_PARAMETER_NAME = "IGNORENULL"; @Override public List<ParameterSpecification> getArgumentsSpecifications() { @@ -61,8 +65,9 @@ public class VariationTableFunction implements TableFunction { ScalarParameterSpecification.builder().name(COL_PARAMETER_NAME).type(Type.STRING).build(), ScalarParameterSpecification.builder() .name(DELTA_PARAMETER_NAME) - .type(Type.DOUBLE) - .build()); + .type(Type.DOUBLE).defaultValue(0.0) + .build(), + ScalarParameterSpecification.builder().name(IGNORE_NULL_PARAMETER_NAME).type(Type.BOOLEAN).defaultValue(false).build()); } @Override @@ -83,6 +88,8 @@ public class VariationTableFunction implements TableFunction { .addProperty( DELTA_PARAMETER_NAME, ((ScalarArgument) arguments.get(DELTA_PARAMETER_NAME)).getValue()) + .addProperty(IGNORE_NULL_PARAMETER_NAME, + ((ScalarArgument) arguments.get(IGNORE_NULL_PARAMETER_NAME)).getValue()) .build(); return TableFunctionAnalysis.builder() .properColumnSchema(properColumnSchema) @@ -102,10 +109,11 @@ public class VariationTableFunction implements TableFunction { TableFunctionHandle tableFunctionHandle) { double delta = (double) ((MapTableFunctionHandle) tableFunctionHandle).getProperty(DELTA_PARAMETER_NAME); + boolean ignoreNull = (boolean) ((MapTableFunctionHandle) tableFunctionHandle).getProperty(IGNORE_NULL_PARAMETER_NAME); return new TableFunctionProcessorProvider() { @Override public TableFunctionDataProcessor getDataProcessor() { - return new VariationDataProcessor(delta); + return new VariationDataProcessor(delta, ignoreNull); } }; } @@ -113,13 +121,18 @@ public class VariationTableFunction implements TableFunction { private static class VariationDataProcessor implements TableFunctionDataProcessor { private final double gap; - private long currentStartIndex = -1; + private final boolean ignoreNull; + private final Queue<Long> skipIndex = new LinkedList<>(); + + private long currentStartIndex = 0; private double baseValue = 0; private long curIndex = 0; private long windowIndex = 0; + private boolean previousIsNull = true; - public VariationDataProcessor(double delta) { + public VariationDataProcessor(double delta, boolean ignoreNull) { this.gap = delta; + this.ignoreNull = ignoreNull; } @Override @@ -127,17 +140,28 @@ public class VariationTableFunction implements TableFunction { Record input, List<ColumnBuilder> properColumnBuilders, ColumnBuilder passThroughIndexBuilder) { - double value = input.getDouble(0); - if (currentStartIndex == -1) { - // init the first window - currentStartIndex = curIndex; - baseValue = value; - } else if (Math.abs(value - baseValue) > gap) { - outputWindow(properColumnBuilders, passThroughIndexBuilder); - currentStartIndex = curIndex; - // use the first value in the window as the base value - baseValue = value; + if(input.isNull(0)){ + // handle null value + if(ignoreNull) { + // skip null values + skipIndex.add(curIndex); + }else if(!previousIsNull) { + // output window and reset currentStartIndex + outputWindow(properColumnBuilders, passThroughIndexBuilder); + currentStartIndex = curIndex; + previousIsNull = true; + } + }else{ + double value = input.getDouble(0); + if (previousIsNull||Math.abs(value - baseValue) > gap) { + outputWindow(properColumnBuilders, passThroughIndexBuilder); + currentStartIndex = curIndex; + // use the first value in the window as the base value + baseValue = value; + } + previousIsNull = false; } + curIndex++; } @@ -149,11 +173,18 @@ public class VariationTableFunction implements TableFunction { private void outputWindow( List<ColumnBuilder> properColumnBuilders, ColumnBuilder passThroughIndexBuilder) { + boolean increaseIndex = false; for (long i = currentStartIndex; i < curIndex; i++) { + if(!skipIndex.isEmpty()&&i==skipIndex.peek()){ + // skip the index if it is in the skip queue + skipIndex.poll(); + continue; + } properColumnBuilders.get(0).writeLong(windowIndex); passThroughIndexBuilder.writeLong(i); + increaseIndex = true; } - windowIndex++; + windowIndex += increaseIndex ? 1 : 0; } } }
