This is an automated email from the ASF dual-hosted git repository. chenyz pushed a commit to branch builtin-udtf in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 55cae6b2c6050992d72c4d714cee50baf1f43593 Author: Chen YZ <[email protected]> AuthorDate: Fri Feb 28 14:04:44 2025 +0800 Add window built in function --- .../function/TableFunctionLeafOperator.java | 1 + .../process/function/TableFunctionOperator.java | 7 +- .../grouped/HashAggregationOperator.java | 1 - .../aggregation/grouped/hash/HashStrategy.java | 2 +- .../planner/optimizations/ParallelizeGrouping.java | 10 +- .../TransformAggregationToStreamable.java | 28 +++--- .../relational/TableBuiltinTableFunction.java | 12 +++ ...PTableFunction.java => CountTableFunction.java} | 107 ++++++++------------ .../builtin/relational/tvf/HOPTableFunction.java | 22 +--- .../relational/tvf/SessionTableFunction.java | 95 ++++++++---------- ...bleFunction.java => VarianceTableFunction.java} | 111 ++++++++++----------- .../udf/builtin/relational/tvf/WindowTVFUtils.java | 55 ++++++++++ .../commons/udf/service/UDFManagementService.java | 3 + 13 files changed, 225 insertions(+), 229 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/TableFunctionLeafOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/TableFunctionLeafOperator.java index cd9e219c6b6..47f2ec9de35 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/TableFunctionLeafOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/TableFunctionLeafOperator.java @@ -49,6 +49,7 @@ public class TableFunctionLeafOperator implements ProcessOperator { List<TSDataType> outputDataTypes) { this.operatorContext = operatorContext; this.processor = processorProvider.getSplitProcessor(); + this.processor.beforeStart(); this.blockBuilder = new TsBlockBuilder(outputDataTypes); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/TableFunctionOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/TableFunctionOperator.java index 47e5b9041b0..0caa8cbb3a9 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/TableFunctionOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/TableFunctionOperator.java @@ -59,7 +59,6 @@ public class TableFunctionOperator implements ProcessOperator { private PartitionState partitionState; private ListenableFuture<?> isBlocked; private boolean finished = false; - private ColumnBuilder passThroughIndexBuilder; private SliceCache sliceCache; @@ -149,6 +148,7 @@ public class TableFunctionOperator implements ProcessOperator { return tsBlock; } else { processor = processorProvider.getDataProcessor(); + processor.beforeStart(); } } sliceCache.addSlice(slice); @@ -167,10 +167,7 @@ public class TableFunctionOperator implements ProcessOperator { } private ColumnBuilder getPassThroughIndexBuilder() { - if (needPassThrough) { - passThroughIndexBuilder = new LongColumnBuilder(null, 1); - } - return passThroughIndexBuilder; + return new LongColumnBuilder(null, 1); } private TsBlock buildTsBlock( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/HashAggregationOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/HashAggregationOperator.java index ce7e91f8448..48c11e31dae 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/HashAggregationOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/HashAggregationOperator.java @@ -140,7 +140,6 @@ public class HashAggregationOperator extends AbstractOperator { if (block == null) { return null; } - aggregationBuilder.processBlock(block); aggregationBuilder.updateMemory(); updateOccupiedMemorySize(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/hash/HashStrategy.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/hash/HashStrategy.java index bd6a4a52ebe..6a5cf351dd9 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/hash/HashStrategy.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/hash/HashStrategy.java @@ -189,7 +189,7 @@ public class HashStrategy implements FlatHashStrategy { @Override public void hashBatched(Column[] columns, long[] hashes, int offset, int length) { for (int i = 0; i < length; i++) { - hashes[i] = hash(columns, i); + hashes[i] = hash(columns, i + offset); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/ParallelizeGrouping.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/ParallelizeGrouping.java index 5777e32fe93..8078e4d0598 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/ParallelizeGrouping.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/ParallelizeGrouping.java @@ -19,7 +19,6 @@ package org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanGraphPrinter; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor; import org.apache.iotdb.db.queryengine.plan.relational.analyzer.Analysis; @@ -80,14 +79,7 @@ public class ParallelizeGrouping implements PlanOptimizer { if (!(context.getAnalysis().isQuery())) { return plan; } - // TODO: remove println - System.out.println("before optimize ParallelizeGrouping =========================="); - PlanGraphPrinter.print(plan); - PlanNode res = plan.accept(new Rewriter(context.getAnalysis()), new Context(null, 0)); - System.out.println("after optimize ParallelizeGrouping =========================="); - PlanGraphPrinter.print(res); - return res; - // return plan.accept(new Rewriter(context.getAnalysis()), new Context()); + return plan.accept(new Rewriter(context.getAnalysis()), new Context(null, 0)); } private static class Rewriter extends PlanVisitor<PlanNode, Context> { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/TransformAggregationToStreamable.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/TransformAggregationToStreamable.java index 0c2a1cd4a77..6a7609e907f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/TransformAggregationToStreamable.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/TransformAggregationToStreamable.java @@ -22,7 +22,6 @@ package org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor; import org.apache.iotdb.db.queryengine.plan.relational.metadata.ColumnSchema; -import org.apache.iotdb.db.queryengine.plan.relational.planner.DataOrganizationSpecification; import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationTableScanNode; @@ -36,7 +35,6 @@ import com.google.common.collect.ImmutableSet; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; @@ -127,20 +125,20 @@ public class TransformAggregationToStreamable implements PlanOptimizer { @Override public List<Symbol> visitTableFunctionProcessor( TableFunctionProcessorNode node, GroupContext context) { - if (node.getChildren().isEmpty()) { - return ImmutableList.of(); - } else if (node.isRowSemantic()) { - return visitPlan(node, context); - } - // return ImmutableList.of(); - Optional<DataOrganizationSpecification> dataOrganizationSpecification = - node.getDataOrganizationSpecification(); - return dataOrganizationSpecification - .<List<Symbol>>map( - organizationSpecification -> - ImmutableList.copyOf(organizationSpecification.getPartitionBy())) - .orElseGet(ImmutableList::of); + return ImmutableList.of(); + // if (node.getChildren().isEmpty()) { + // return ImmutableList.of(); + // } else if (node.isRowSemantic()) { + // return visitPlan(node, context); + // } + // Optional<DataOrganizationSpecification> dataOrganizationSpecification = + // node.getDataOrganizationSpecification(); + // return dataOrganizationSpecification + // .<List<Symbol>>map( + // organizationSpecification -> + // ImmutableList.copyOf(organizationSpecification.getPartitionBy())) + // .orElseGet(ImmutableList::of); } @Override diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/TableBuiltinTableFunction.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/TableBuiltinTableFunction.java index 830ffa8733c..6a6fc0eb9a8 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/TableBuiltinTableFunction.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/TableBuiltinTableFunction.java @@ -19,8 +19,11 @@ package org.apache.iotdb.commons.udf.builtin.relational; +import org.apache.iotdb.commons.udf.builtin.relational.tvf.CountTableFunction; import org.apache.iotdb.commons.udf.builtin.relational.tvf.HOPTableFunction; import org.apache.iotdb.commons.udf.builtin.relational.tvf.RepeatExample; +import org.apache.iotdb.commons.udf.builtin.relational.tvf.SessionTableFunction; +import org.apache.iotdb.commons.udf.builtin.relational.tvf.VarianceTableFunction; import org.apache.iotdb.udf.api.relational.TableFunction; import java.util.Arrays; @@ -30,6 +33,9 @@ import java.util.stream.Collectors; public enum TableBuiltinTableFunction { HOP("hop"), + SESSION("session"), + VARIANCE("variance"), + COUNT("count"), REPEAT("repeat"), ; @@ -61,6 +67,12 @@ public enum TableBuiltinTableFunction { switch (functionName.toLowerCase()) { case "hop": return new HOPTableFunction(); + case "session": + return new SessionTableFunction(); + case "variance": + return new VarianceTableFunction(); + case "count": + return new CountTableFunction(); case "repeat": return new RepeatExample(); default: diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/HOPTableFunction.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/CountTableFunction.java similarity index 51% copy from iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/HOPTableFunction.java copy to iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/CountTableFunction.java index 18527425414..079a6594808 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/HOPTableFunction.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/CountTableFunction.java @@ -27,7 +27,6 @@ import org.apache.iotdb.udf.api.relational.table.TableFunctionProcessorProvider; import org.apache.iotdb.udf.api.relational.table.argument.Argument; import org.apache.iotdb.udf.api.relational.table.argument.DescribedSchema; import org.apache.iotdb.udf.api.relational.table.argument.ScalarArgument; -import org.apache.iotdb.udf.api.relational.table.argument.TableArgument; import org.apache.iotdb.udf.api.relational.table.processor.TableFunctionDataProcessor; import org.apache.iotdb.udf.api.relational.table.specification.ParameterSpecification; import org.apache.iotdb.udf.api.relational.table.specification.ScalarParameterSpecification; @@ -36,99 +35,62 @@ import org.apache.iotdb.udf.api.type.Type; import org.apache.tsfile.block.column.ColumnBuilder; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.Optional; - -public class HOPTableFunction implements TableFunction { +public class CountTableFunction implements TableFunction { private static final String DATA_PARAMETER_NAME = "DATA"; - private static final String TIMECOL_PARAMETER_NAME = "TIMECOL"; - private static final String SLIDE_PARAMETER_NAME = "SLIDE"; private static final String SIZE_PARAMETER_NAME = "SIZE"; - private static final String START_PARAMETER_NAME = "START"; @Override public List<ParameterSpecification> getArgumentsSpecifications() { return Arrays.asList( TableParameterSpecification.builder() .name(DATA_PARAMETER_NAME) - .rowSemantics() .passThroughColumns() .build(), - ScalarParameterSpecification.builder() - .name(TIMECOL_PARAMETER_NAME) - .type(Type.STRING) - .build(), - ScalarParameterSpecification.builder().name(SLIDE_PARAMETER_NAME).type(Type.INT64).build(), - ScalarParameterSpecification.builder().name(SIZE_PARAMETER_NAME).type(Type.INT64).build(), - ScalarParameterSpecification.builder() - .name(START_PARAMETER_NAME) - .type(Type.TIMESTAMP) - .defaultValue(0L) - .build()); - } - - private int findTimeColumnIndex(TableArgument tableArgument, String expectedFieldName) { - int requiredIndex = -1; - for (int i = 0; i < tableArgument.getFieldTypes().size(); i++) { - Optional<String> fieldName = tableArgument.getFieldNames().get(i); - if (fieldName.isPresent() && expectedFieldName.equalsIgnoreCase(fieldName.get())) { - requiredIndex = i; - break; - } - } - return requiredIndex; + ScalarParameterSpecification.builder().name(SIZE_PARAMETER_NAME).type(Type.INT64).build()); } @Override - public TableFunctionAnalysis analyze(Map<String, Argument> arguments) { - TableArgument tableArgument = (TableArgument) arguments.get(DATA_PARAMETER_NAME); - String expectedFieldName = - (String) ((ScalarArgument) arguments.get(TIMECOL_PARAMETER_NAME)).getValue(); - int requiredIndex = findTimeColumnIndex(tableArgument, expectedFieldName); - if (requiredIndex == -1) { - throw new UDFException("The required field is not found in the input table"); + public TableFunctionAnalysis analyze(Map<String, Argument> arguments) throws UDFException { + long size = (long) ((ScalarArgument) arguments.get(SIZE_PARAMETER_NAME)).getValue(); + if (size <= 0) { + throw new UDFException("Size must be greater than 0"); } - DescribedSchema properColumnSchema = - new DescribedSchema.Builder() - .addField("window_start", Type.TIMESTAMP) - .addField("window_end", Type.TIMESTAMP) - .build(); - - // outputColumnSchema return TableFunctionAnalysis.builder() - .properColumnSchema(properColumnSchema) - .requiredColumns(DATA_PARAMETER_NAME, Collections.singletonList(requiredIndex)) + .properColumnSchema( + new DescribedSchema.Builder() + .addField("window_index", Type.INT64) + .addField("count", Type.INT64) + .build()) + .requiredColumns(DATA_PARAMETER_NAME, Collections.singletonList(0)) .build(); } @Override public TableFunctionProcessorProvider getProcessorProvider(Map<String, Argument> arguments) { + long sz = (long) ((ScalarArgument) arguments.get(SIZE_PARAMETER_NAME)).getValue(); return new TableFunctionProcessorProvider() { @Override public TableFunctionDataProcessor getDataProcessor() { - return new HOPDataProcessor( - (Long) ((ScalarArgument) arguments.get(START_PARAMETER_NAME)).getValue(), - (Long) ((ScalarArgument) arguments.get(SLIDE_PARAMETER_NAME)).getValue(), - (Long) ((ScalarArgument) arguments.get(SIZE_PARAMETER_NAME)).getValue()); + return new CountDataProcessor(sz); } }; } - private static class HOPDataProcessor implements TableFunctionDataProcessor { + private static class CountDataProcessor implements TableFunctionDataProcessor { - private final long slide; private final long size; - private final long start; + private final List<Long> currentRowIndexes = new ArrayList<>(); private long curIndex = 0; + private long windowIndex = 0; - public HOPDataProcessor(long startTime, long slide, long size) { - this.slide = slide; + public CountDataProcessor(long size) { this.size = size; - this.start = startTime; } @Override @@ -136,17 +98,30 @@ public class HOPTableFunction implements TableFunction { Record input, List<ColumnBuilder> properColumnBuilders, ColumnBuilder passThroughIndexBuilder) { - // find the first windows that satisfy the condition: start + n*slide <= time < start + - // n*slide + size - long timeValue = input.getLong(0); - long window_start = (timeValue - start - size + slide) / slide * slide; - while (window_start <= timeValue && window_start + size > timeValue) { - properColumnBuilders.get(0).writeLong(window_start); - properColumnBuilders.get(1).writeLong(window_start + size - 1); - passThroughIndexBuilder.writeLong(curIndex); - window_start += slide; + if (currentRowIndexes.size() >= size) { + outputWindow(properColumnBuilders, passThroughIndexBuilder); } + currentRowIndexes.add(curIndex); curIndex++; } + + @Override + public void finish(List<ColumnBuilder> columnBuilders, ColumnBuilder passThroughIndexBuilder) { + if (!currentRowIndexes.isEmpty()) { + outputWindow(columnBuilders, passThroughIndexBuilder); + } + } + + private void outputWindow( + List<ColumnBuilder> properColumnBuilders, ColumnBuilder passThroughIndexBuilder) { + int sz = currentRowIndexes.size(); + for (Long currentRowIndex : currentRowIndexes) { + properColumnBuilders.get(0).writeLong(windowIndex); + properColumnBuilders.get(1).writeLong(sz); + passThroughIndexBuilder.writeLong(currentRowIndex); + } + windowIndex++; + currentRowIndexes.clear(); + } } } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/HOPTableFunction.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/HOPTableFunction.java index 18527425414..731f37d0a57 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/HOPTableFunction.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/HOPTableFunction.java @@ -19,7 +19,6 @@ package org.apache.iotdb.commons.udf.builtin.relational.tvf; -import org.apache.iotdb.udf.api.exception.UDFException; import org.apache.iotdb.udf.api.relational.TableFunction; import org.apache.iotdb.udf.api.relational.access.Record; import org.apache.iotdb.udf.api.relational.table.TableFunctionAnalysis; @@ -40,7 +39,8 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.Optional; + +import static org.apache.iotdb.commons.udf.builtin.relational.tvf.WindowTVFUtils.findColumnIndex; public class HOPTableFunction implements TableFunction { @@ -71,27 +71,13 @@ public class HOPTableFunction implements TableFunction { .build()); } - private int findTimeColumnIndex(TableArgument tableArgument, String expectedFieldName) { - int requiredIndex = -1; - for (int i = 0; i < tableArgument.getFieldTypes().size(); i++) { - Optional<String> fieldName = tableArgument.getFieldNames().get(i); - if (fieldName.isPresent() && expectedFieldName.equalsIgnoreCase(fieldName.get())) { - requiredIndex = i; - break; - } - } - return requiredIndex; - } - @Override public TableFunctionAnalysis analyze(Map<String, Argument> arguments) { TableArgument tableArgument = (TableArgument) arguments.get(DATA_PARAMETER_NAME); String expectedFieldName = (String) ((ScalarArgument) arguments.get(TIMECOL_PARAMETER_NAME)).getValue(); - int requiredIndex = findTimeColumnIndex(tableArgument, expectedFieldName); - if (requiredIndex == -1) { - throw new UDFException("The required field is not found in the input table"); - } + int requiredIndex = + findColumnIndex(tableArgument, expectedFieldName, Collections.singleton(Type.TIMESTAMP)); DescribedSchema properColumnSchema = new DescribedSchema.Builder() .addField("window_start", Type.TIMESTAMP) diff --git a/example/udf/src/main/java/org/apache/iotdb/udf/table/HOPTableFunction.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/SessionTableFunction.java similarity index 60% rename from example/udf/src/main/java/org/apache/iotdb/udf/table/HOPTableFunction.java rename to iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/SessionTableFunction.java index fb9f7beac33..a3f49840734 100644 --- a/example/udf/src/main/java/org/apache/iotdb/udf/table/HOPTableFunction.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/SessionTableFunction.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.iotdb.udf.table; +package org.apache.iotdb.commons.udf.builtin.relational.tvf; import org.apache.iotdb.udf.api.exception.UDFException; import org.apache.iotdb.udf.api.relational.TableFunction; @@ -36,19 +36,18 @@ import org.apache.iotdb.udf.api.type.Type; import org.apache.tsfile.block.column.ColumnBuilder; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.Optional; -public class HOPTableFunction implements TableFunction { +import static org.apache.iotdb.commons.udf.builtin.relational.tvf.WindowTVFUtils.findColumnIndex; +public class SessionTableFunction implements TableFunction { private static final String DATA_PARAMETER_NAME = "DATA"; private static final String TIMECOL_PARAMETER_NAME = "TIMECOL"; - private static final String SLIDE_PARAMETER_NAME = "SLIDE"; - private static final String SIZE_PARAMETER_NAME = "SIZE"; - private static final String START_PARAMETER_NAME = "START"; + private static final String GAP_PARAMETER_NAME = "GAP"; @Override public List<ParameterSpecification> getArgumentsSpecifications() { @@ -56,42 +55,21 @@ public class HOPTableFunction implements TableFunction { TableParameterSpecification.builder() .name(DATA_PARAMETER_NAME) .passThroughColumns() - .keepWhenEmpty() .build(), ScalarParameterSpecification.builder() .name(TIMECOL_PARAMETER_NAME) .type(Type.STRING) .build(), - ScalarParameterSpecification.builder().name(SLIDE_PARAMETER_NAME).type(Type.INT64).build(), - ScalarParameterSpecification.builder().name(SIZE_PARAMETER_NAME).type(Type.INT64).build(), - ScalarParameterSpecification.builder() - .name(START_PARAMETER_NAME) - .type(Type.TIMESTAMP) - .defaultValue(Long.MIN_VALUE) - .build()); - } - - private int findTimeColumnIndex(TableArgument tableArgument, String expectedFieldName) { - int requiredIndex = -1; - for (int i = 0; i < tableArgument.getFieldTypes().size(); i++) { - Optional<String> fieldName = tableArgument.getFieldNames().get(i); - if (fieldName.isPresent() && expectedFieldName.equalsIgnoreCase(fieldName.get())) { - requiredIndex = i; - break; - } - } - return requiredIndex; + ScalarParameterSpecification.builder().name(GAP_PARAMETER_NAME).type(Type.INT64).build()); } @Override - public TableFunctionAnalysis analyze(Map<String, Argument> arguments) { + public TableFunctionAnalysis analyze(Map<String, Argument> arguments) throws UDFException { TableArgument tableArgument = (TableArgument) arguments.get(DATA_PARAMETER_NAME); String expectedFieldName = (String) ((ScalarArgument) arguments.get(TIMECOL_PARAMETER_NAME)).getValue(); - int requiredIndex = findTimeColumnIndex(tableArgument, expectedFieldName); - if (requiredIndex == -1) { - throw new UDFException("The required field is not found in the input table"); - } + int requiredIndex = + findColumnIndex(tableArgument, expectedFieldName, Collections.singleton(Type.TIMESTAMP)); DescribedSchema properColumnSchema = new DescribedSchema.Builder() .addField("window_start", Type.TIMESTAMP) @@ -107,28 +85,25 @@ public class HOPTableFunction implements TableFunction { @Override public TableFunctionProcessorProvider getProcessorProvider(Map<String, Argument> arguments) { + long gap = (long) ((ScalarArgument) arguments.get(GAP_PARAMETER_NAME)).getValue(); return new TableFunctionProcessorProvider() { @Override public TableFunctionDataProcessor getDataProcessor() { - return new HOPDataProcessor( - (Long) ((ScalarArgument) arguments.get(START_PARAMETER_NAME)).getValue(), - (Long) ((ScalarArgument) arguments.get(SLIDE_PARAMETER_NAME)).getValue(), - (Long) ((ScalarArgument) arguments.get(SIZE_PARAMETER_NAME)).getValue()); + return new SessionDataProcessor(gap); } }; } - private static class HOPDataProcessor implements TableFunctionDataProcessor { + private static class SessionDataProcessor implements TableFunctionDataProcessor { - private final long slide; - private final long size; - private long curTime; + private final long gap; + private final List<Long> currentRowIndexes = new ArrayList<>(); private long curIndex = 0; + private long windowStart = -1; + private long windowEnd = -1; - public HOPDataProcessor(long startTime, long slide, long size) { - this.slide = slide; - this.size = size; - this.curTime = startTime; + public SessionDataProcessor(long gap) { + this.gap = gap; } @Override @@ -137,22 +112,30 @@ public class HOPTableFunction implements TableFunction { List<ColumnBuilder> properColumnBuilders, ColumnBuilder passThroughIndexBuilder) { long timeValue = input.getLong(0); - if (curTime == Long.MIN_VALUE) { - curTime = timeValue; + if (!currentRowIndexes.isEmpty() && timeValue > windowEnd) { + outputWindow(properColumnBuilders, passThroughIndexBuilder); } - if (curTime + size <= timeValue) { - // jump to appropriate window - long move = (timeValue - curTime - size) / slide + 1; - curTime += move * slide; + currentRowIndexes.add(curIndex); + windowStart = timeValue; + windowEnd = timeValue + gap; + curIndex++; + } + + @Override + public void finish(List<ColumnBuilder> columnBuilders, ColumnBuilder passThroughIndexBuilder) { + if (!currentRowIndexes.isEmpty()) { + outputWindow(columnBuilders, passThroughIndexBuilder); } - long slideTime = curTime; - while (slideTime <= timeValue && slideTime + size > timeValue) { - properColumnBuilders.get(0).writeLong(slideTime); - properColumnBuilders.get(1).writeLong(slideTime + size); - passThroughIndexBuilder.writeLong(curIndex); - slideTime += slide; + } + + private void outputWindow( + List<ColumnBuilder> properColumnBuilders, ColumnBuilder passThroughIndexBuilder) { + for (Long currentRowIndex : currentRowIndexes) { + properColumnBuilders.get(0).writeLong(windowStart); + properColumnBuilders.get(1).writeLong(windowEnd - 1); + passThroughIndexBuilder.writeLong(currentRowIndex); } - curIndex++; + currentRowIndexes.clear(); } } } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/HOPTableFunction.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/VarianceTableFunction.java similarity index 55% copy from iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/HOPTableFunction.java copy to iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/VarianceTableFunction.java index 18527425414..56979d374f2 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/HOPTableFunction.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/VarianceTableFunction.java @@ -34,70 +34,51 @@ import org.apache.iotdb.udf.api.relational.table.specification.ScalarParameterSp import org.apache.iotdb.udf.api.relational.table.specification.TableParameterSpecification; import org.apache.iotdb.udf.api.type.Type; +import com.google.common.collect.ImmutableSet; import org.apache.tsfile.block.column.ColumnBuilder; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.Optional; -public class HOPTableFunction implements TableFunction { +import static org.apache.iotdb.commons.udf.builtin.relational.tvf.WindowTVFUtils.findColumnIndex; +public class VarianceTableFunction implements TableFunction { private static final String DATA_PARAMETER_NAME = "DATA"; - private static final String TIMECOL_PARAMETER_NAME = "TIMECOL"; - private static final String SLIDE_PARAMETER_NAME = "SLIDE"; - private static final String SIZE_PARAMETER_NAME = "SIZE"; - private static final String START_PARAMETER_NAME = "START"; + private static final String COL_PARAMETER_NAME = "COL"; + private static final String DELTA_PARAMETER_NAME = "DELTA"; @Override public List<ParameterSpecification> getArgumentsSpecifications() { return Arrays.asList( TableParameterSpecification.builder() .name(DATA_PARAMETER_NAME) - .rowSemantics() .passThroughColumns() .build(), + ScalarParameterSpecification.builder().name(COL_PARAMETER_NAME).type(Type.STRING).build(), ScalarParameterSpecification.builder() - .name(TIMECOL_PARAMETER_NAME) - .type(Type.STRING) - .build(), - ScalarParameterSpecification.builder().name(SLIDE_PARAMETER_NAME).type(Type.INT64).build(), - ScalarParameterSpecification.builder().name(SIZE_PARAMETER_NAME).type(Type.INT64).build(), - ScalarParameterSpecification.builder() - .name(START_PARAMETER_NAME) - .type(Type.TIMESTAMP) - .defaultValue(0L) + .name(DELTA_PARAMETER_NAME) + .type(Type.DOUBLE) .build()); } - private int findTimeColumnIndex(TableArgument tableArgument, String expectedFieldName) { - int requiredIndex = -1; - for (int i = 0; i < tableArgument.getFieldTypes().size(); i++) { - Optional<String> fieldName = tableArgument.getFieldNames().get(i); - if (fieldName.isPresent() && expectedFieldName.equalsIgnoreCase(fieldName.get())) { - requiredIndex = i; - break; - } - } - return requiredIndex; - } - @Override - public TableFunctionAnalysis analyze(Map<String, Argument> arguments) { + public TableFunctionAnalysis analyze(Map<String, Argument> arguments) throws UDFException { TableArgument tableArgument = (TableArgument) arguments.get(DATA_PARAMETER_NAME); String expectedFieldName = - (String) ((ScalarArgument) arguments.get(TIMECOL_PARAMETER_NAME)).getValue(); - int requiredIndex = findTimeColumnIndex(tableArgument, expectedFieldName); - if (requiredIndex == -1) { - throw new UDFException("The required field is not found in the input table"); - } + (String) ((ScalarArgument) arguments.get(COL_PARAMETER_NAME)).getValue(); + int requiredIndex = + findColumnIndex( + tableArgument, + expectedFieldName, + ImmutableSet.of(Type.INT32, Type.INT64, Type.FLOAT, Type.DOUBLE)); DescribedSchema properColumnSchema = new DescribedSchema.Builder() - .addField("window_start", Type.TIMESTAMP) - .addField("window_end", Type.TIMESTAMP) + .addField("window_index", Type.INT64) + .addField("base_value", Type.DOUBLE) .build(); - // outputColumnSchema return TableFunctionAnalysis.builder() .properColumnSchema(properColumnSchema) @@ -107,28 +88,25 @@ public class HOPTableFunction implements TableFunction { @Override public TableFunctionProcessorProvider getProcessorProvider(Map<String, Argument> arguments) { + double delta = (double) ((ScalarArgument) arguments.get(DELTA_PARAMETER_NAME)).getValue(); return new TableFunctionProcessorProvider() { @Override public TableFunctionDataProcessor getDataProcessor() { - return new HOPDataProcessor( - (Long) ((ScalarArgument) arguments.get(START_PARAMETER_NAME)).getValue(), - (Long) ((ScalarArgument) arguments.get(SLIDE_PARAMETER_NAME)).getValue(), - (Long) ((ScalarArgument) arguments.get(SIZE_PARAMETER_NAME)).getValue()); + return new VarianceDataProcessor(delta); } }; } - private static class HOPDataProcessor implements TableFunctionDataProcessor { + private static class VarianceDataProcessor implements TableFunctionDataProcessor { - private final long slide; - private final long size; - private final long start; + private final double gap; + private final List<Long> currentRowIndexes = new ArrayList<>(); + private double baseValue = 0; private long curIndex = 0; + private long windowIndex = 0; - public HOPDataProcessor(long startTime, long slide, long size) { - this.slide = slide; - this.size = size; - this.start = startTime; + public VarianceDataProcessor(double delta) { + this.gap = delta; } @Override @@ -136,17 +114,34 @@ public class HOPTableFunction implements TableFunction { Record input, List<ColumnBuilder> properColumnBuilders, ColumnBuilder passThroughIndexBuilder) { - // find the first windows that satisfy the condition: start + n*slide <= time < start + - // n*slide + size - long timeValue = input.getLong(0); - long window_start = (timeValue - start - size + slide) / slide * slide; - while (window_start <= timeValue && window_start + size > timeValue) { - properColumnBuilders.get(0).writeLong(window_start); - properColumnBuilders.get(1).writeLong(window_start + size - 1); - passThroughIndexBuilder.writeLong(curIndex); - window_start += slide; + double value = input.getDouble(0); + if (!currentRowIndexes.isEmpty() && Math.abs(value - baseValue) > gap) { + outputWindow(properColumnBuilders, passThroughIndexBuilder); } + if (currentRowIndexes.isEmpty()) { + // use the first value in the window as the base value + baseValue = value; + } + currentRowIndexes.add(curIndex); curIndex++; } + + @Override + public void finish(List<ColumnBuilder> columnBuilders, ColumnBuilder passThroughIndexBuilder) { + if (!currentRowIndexes.isEmpty()) { + outputWindow(columnBuilders, passThroughIndexBuilder); + } + } + + private void outputWindow( + List<ColumnBuilder> properColumnBuilders, ColumnBuilder passThroughIndexBuilder) { + for (Long currentRowIndex : currentRowIndexes) { + properColumnBuilders.get(0).writeLong(windowIndex); + properColumnBuilders.get(1).writeDouble(baseValue); + passThroughIndexBuilder.writeLong(currentRowIndex); + } + currentRowIndexes.clear(); + windowIndex++; + } } } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/WindowTVFUtils.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/WindowTVFUtils.java new file mode 100644 index 00000000000..9c9be03fbba --- /dev/null +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/WindowTVFUtils.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.commons.udf.builtin.relational.tvf; + +import org.apache.iotdb.udf.api.exception.UDFException; +import org.apache.iotdb.udf.api.relational.table.argument.TableArgument; +import org.apache.iotdb.udf.api.type.Type; + +import java.util.Optional; +import java.util.Set; + +public class WindowTVFUtils { + /** + * Find the index of the column in the table argument. + * + * @param tableArgument the table argument + * @param expectedFieldName the expected field name + * @param expectedTypes the expected types + * @return the index of the time column, -1 if not found + */ + public static int findColumnIndex( + TableArgument tableArgument, String expectedFieldName, Set<Type> expectedTypes) + throws UDFException { + for (int i = 0; i < tableArgument.getFieldTypes().size(); i++) { + Optional<String> fieldName = tableArgument.getFieldNames().get(i); + if (fieldName.isPresent() && expectedFieldName.equalsIgnoreCase(fieldName.get())) { + if (!expectedTypes.contains(tableArgument.getFieldTypes().get(i))) { + throw new UDFException( + String.format("The type of the column [%s] is not as expected.", expectedFieldName)); + } + return i; + } + } + throw new UDFException( + String.format( + "Required column [%s] not found in the source table argument.", expectedFieldName)); + } +} diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFManagementService.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFManagementService.java index fe62cb4d291..762944c02c4 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFManagementService.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFManagementService.java @@ -28,6 +28,7 @@ import org.apache.iotdb.commons.udf.builtin.BuiltinScalarFunction; import org.apache.iotdb.commons.udf.builtin.BuiltinTimeSeriesGeneratingFunction; import org.apache.iotdb.commons.udf.builtin.relational.TableBuiltinAggregationFunction; import org.apache.iotdb.commons.udf.builtin.relational.TableBuiltinScalarFunction; +import org.apache.iotdb.commons.udf.builtin.relational.TableBuiltinTableFunction; import org.apache.iotdb.commons.utils.TestOnly; import org.apache.iotdb.udf.api.UDF; import org.apache.iotdb.udf.api.exception.UDFException; @@ -120,6 +121,8 @@ public class UDFManagementService { return TableBuiltinScalarFunction.getBuiltInScalarFunctionName() .contains(functionName.toLowerCase()) || TableBuiltinAggregationFunction.getBuiltInAggregateFunctionName() + .contains(functionName.toLowerCase()) + || TableBuiltinTableFunction.getBuiltInTableFunctionName() .contains(functionName.toLowerCase()); } }
