This is an automated email from the ASF dual-hosted git repository.

chenyz pushed a commit to branch builtin-udtf
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 55cae6b2c6050992d72c4d714cee50baf1f43593
Author: Chen YZ <[email protected]>
AuthorDate: Fri Feb 28 14:04:44 2025 +0800

    Add window built in function
---
 .../function/TableFunctionLeafOperator.java        |   1 +
 .../process/function/TableFunctionOperator.java    |   7 +-
 .../grouped/HashAggregationOperator.java           |   1 -
 .../aggregation/grouped/hash/HashStrategy.java     |   2 +-
 .../planner/optimizations/ParallelizeGrouping.java |  10 +-
 .../TransformAggregationToStreamable.java          |  28 +++---
 .../relational/TableBuiltinTableFunction.java      |  12 +++
 ...PTableFunction.java => CountTableFunction.java} | 107 ++++++++------------
 .../builtin/relational/tvf/HOPTableFunction.java   |  22 +---
 .../relational/tvf/SessionTableFunction.java       |  95 ++++++++----------
 ...bleFunction.java => VarianceTableFunction.java} | 111 ++++++++++-----------
 .../udf/builtin/relational/tvf/WindowTVFUtils.java |  55 ++++++++++
 .../commons/udf/service/UDFManagementService.java  |   3 +
 13 files changed, 225 insertions(+), 229 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/TableFunctionLeafOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/TableFunctionLeafOperator.java
index cd9e219c6b6..47f2ec9de35 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/TableFunctionLeafOperator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/TableFunctionLeafOperator.java
@@ -49,6 +49,7 @@ public class TableFunctionLeafOperator implements 
ProcessOperator {
       List<TSDataType> outputDataTypes) {
     this.operatorContext = operatorContext;
     this.processor = processorProvider.getSplitProcessor();
+    this.processor.beforeStart();
     this.blockBuilder = new TsBlockBuilder(outputDataTypes);
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/TableFunctionOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/TableFunctionOperator.java
index 47e5b9041b0..0caa8cbb3a9 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/TableFunctionOperator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/TableFunctionOperator.java
@@ -59,7 +59,6 @@ public class TableFunctionOperator implements ProcessOperator 
{
   private PartitionState partitionState;
   private ListenableFuture<?> isBlocked;
   private boolean finished = false;
-  private ColumnBuilder passThroughIndexBuilder;
 
   private SliceCache sliceCache;
 
@@ -149,6 +148,7 @@ public class TableFunctionOperator implements 
ProcessOperator {
           return tsBlock;
         } else {
           processor = processorProvider.getDataProcessor();
+          processor.beforeStart();
         }
       }
       sliceCache.addSlice(slice);
@@ -167,10 +167,7 @@ public class TableFunctionOperator implements 
ProcessOperator {
   }
 
   private ColumnBuilder getPassThroughIndexBuilder() {
-    if (needPassThrough) {
-      passThroughIndexBuilder = new LongColumnBuilder(null, 1);
-    }
-    return passThroughIndexBuilder;
+    return new LongColumnBuilder(null, 1);
   }
 
   private TsBlock buildTsBlock(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/HashAggregationOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/HashAggregationOperator.java
index ce7e91f8448..48c11e31dae 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/HashAggregationOperator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/HashAggregationOperator.java
@@ -140,7 +140,6 @@ public class HashAggregationOperator extends 
AbstractOperator {
       if (block == null) {
         return null;
       }
-
       aggregationBuilder.processBlock(block);
       aggregationBuilder.updateMemory();
       updateOccupiedMemorySize();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/hash/HashStrategy.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/hash/HashStrategy.java
index bd6a4a52ebe..6a5cf351dd9 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/hash/HashStrategy.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/hash/HashStrategy.java
@@ -189,7 +189,7 @@ public class HashStrategy implements FlatHashStrategy {
   @Override
   public void hashBatched(Column[] columns, long[] hashes, int offset, int 
length) {
     for (int i = 0; i < length; i++) {
-      hashes[i] = hash(columns, i);
+      hashes[i] = hash(columns, i + offset);
     }
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/ParallelizeGrouping.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/ParallelizeGrouping.java
index 5777e32fe93..8078e4d0598 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/ParallelizeGrouping.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/ParallelizeGrouping.java
@@ -19,7 +19,6 @@
 
 package org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations;
 
-import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanGraphPrinter;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
 import org.apache.iotdb.db.queryengine.plan.relational.analyzer.Analysis;
@@ -80,14 +79,7 @@ public class ParallelizeGrouping implements PlanOptimizer {
     if (!(context.getAnalysis().isQuery())) {
       return plan;
     }
-    // TODO: remove println
-    System.out.println("before optimize ParallelizeGrouping 
==========================");
-    PlanGraphPrinter.print(plan);
-    PlanNode res = plan.accept(new Rewriter(context.getAnalysis()), new 
Context(null, 0));
-    System.out.println("after optimize ParallelizeGrouping 
==========================");
-    PlanGraphPrinter.print(res);
-    return res;
-    //        return plan.accept(new Rewriter(context.getAnalysis()), new 
Context());
+    return plan.accept(new Rewriter(context.getAnalysis()), new Context(null, 
0));
   }
 
   private static class Rewriter extends PlanVisitor<PlanNode, Context> {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/TransformAggregationToStreamable.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/TransformAggregationToStreamable.java
index 0c2a1cd4a77..6a7609e907f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/TransformAggregationToStreamable.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/TransformAggregationToStreamable.java
@@ -22,7 +22,6 @@ package 
org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
 import org.apache.iotdb.db.queryengine.plan.relational.metadata.ColumnSchema;
-import 
org.apache.iotdb.db.queryengine.plan.relational.planner.DataOrganizationSpecification;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationTableScanNode;
@@ -36,7 +35,6 @@ import com.google.common.collect.ImmutableSet;
 
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 import java.util.Set;
 import java.util.stream.Collectors;
 
@@ -127,20 +125,20 @@ public class TransformAggregationToStreamable implements 
PlanOptimizer {
     @Override
     public List<Symbol> visitTableFunctionProcessor(
         TableFunctionProcessorNode node, GroupContext context) {
-      if (node.getChildren().isEmpty()) {
-        return ImmutableList.of();
-      } else if (node.isRowSemantic()) {
-        return visitPlan(node, context);
-      }
 
-      //      return ImmutableList.of();
-      Optional<DataOrganizationSpecification> dataOrganizationSpecification =
-          node.getDataOrganizationSpecification();
-      return dataOrganizationSpecification
-          .<List<Symbol>>map(
-              organizationSpecification ->
-                  
ImmutableList.copyOf(organizationSpecification.getPartitionBy()))
-          .orElseGet(ImmutableList::of);
+      return ImmutableList.of();
+      //      if (node.getChildren().isEmpty()) {
+      //        return ImmutableList.of();
+      //      } else if (node.isRowSemantic()) {
+      //        return visitPlan(node, context);
+      //      }
+      //      Optional<DataOrganizationSpecification> 
dataOrganizationSpecification =
+      //          node.getDataOrganizationSpecification();
+      //      return dataOrganizationSpecification
+      //          .<List<Symbol>>map(
+      //              organizationSpecification ->
+      //                  
ImmutableList.copyOf(organizationSpecification.getPartitionBy()))
+      //          .orElseGet(ImmutableList::of);
     }
 
     @Override
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/TableBuiltinTableFunction.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/TableBuiltinTableFunction.java
index 830ffa8733c..6a6fc0eb9a8 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/TableBuiltinTableFunction.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/TableBuiltinTableFunction.java
@@ -19,8 +19,11 @@
 
 package org.apache.iotdb.commons.udf.builtin.relational;
 
+import org.apache.iotdb.commons.udf.builtin.relational.tvf.CountTableFunction;
 import org.apache.iotdb.commons.udf.builtin.relational.tvf.HOPTableFunction;
 import org.apache.iotdb.commons.udf.builtin.relational.tvf.RepeatExample;
+import 
org.apache.iotdb.commons.udf.builtin.relational.tvf.SessionTableFunction;
+import 
org.apache.iotdb.commons.udf.builtin.relational.tvf.VarianceTableFunction;
 import org.apache.iotdb.udf.api.relational.TableFunction;
 
 import java.util.Arrays;
@@ -30,6 +33,9 @@ import java.util.stream.Collectors;
 
 public enum TableBuiltinTableFunction {
   HOP("hop"),
+  SESSION("session"),
+  VARIANCE("variance"),
+  COUNT("count"),
   REPEAT("repeat"),
   ;
 
@@ -61,6 +67,12 @@ public enum TableBuiltinTableFunction {
     switch (functionName.toLowerCase()) {
       case "hop":
         return new HOPTableFunction();
+      case "session":
+        return new SessionTableFunction();
+      case "variance":
+        return new VarianceTableFunction();
+      case "count":
+        return new CountTableFunction();
       case "repeat":
         return new RepeatExample();
       default:
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/HOPTableFunction.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/CountTableFunction.java
similarity index 51%
copy from 
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/HOPTableFunction.java
copy to 
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/CountTableFunction.java
index 18527425414..079a6594808 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/HOPTableFunction.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/CountTableFunction.java
@@ -27,7 +27,6 @@ import 
org.apache.iotdb.udf.api.relational.table.TableFunctionProcessorProvider;
 import org.apache.iotdb.udf.api.relational.table.argument.Argument;
 import org.apache.iotdb.udf.api.relational.table.argument.DescribedSchema;
 import org.apache.iotdb.udf.api.relational.table.argument.ScalarArgument;
-import org.apache.iotdb.udf.api.relational.table.argument.TableArgument;
 import 
org.apache.iotdb.udf.api.relational.table.processor.TableFunctionDataProcessor;
 import 
org.apache.iotdb.udf.api.relational.table.specification.ParameterSpecification;
 import 
org.apache.iotdb.udf.api.relational.table.specification.ScalarParameterSpecification;
@@ -36,99 +35,62 @@ import org.apache.iotdb.udf.api.type.Type;
 
 import org.apache.tsfile.block.column.ColumnBuilder;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
-
-public class HOPTableFunction implements TableFunction {
 
+public class CountTableFunction implements TableFunction {
   private static final String DATA_PARAMETER_NAME = "DATA";
-  private static final String TIMECOL_PARAMETER_NAME = "TIMECOL";
-  private static final String SLIDE_PARAMETER_NAME = "SLIDE";
   private static final String SIZE_PARAMETER_NAME = "SIZE";
-  private static final String START_PARAMETER_NAME = "START";
 
   @Override
   public List<ParameterSpecification> getArgumentsSpecifications() {
     return Arrays.asList(
         TableParameterSpecification.builder()
             .name(DATA_PARAMETER_NAME)
-            .rowSemantics()
             .passThroughColumns()
             .build(),
-        ScalarParameterSpecification.builder()
-            .name(TIMECOL_PARAMETER_NAME)
-            .type(Type.STRING)
-            .build(),
-        
ScalarParameterSpecification.builder().name(SLIDE_PARAMETER_NAME).type(Type.INT64).build(),
-        
ScalarParameterSpecification.builder().name(SIZE_PARAMETER_NAME).type(Type.INT64).build(),
-        ScalarParameterSpecification.builder()
-            .name(START_PARAMETER_NAME)
-            .type(Type.TIMESTAMP)
-            .defaultValue(0L)
-            .build());
-  }
-
-  private int findTimeColumnIndex(TableArgument tableArgument, String 
expectedFieldName) {
-    int requiredIndex = -1;
-    for (int i = 0; i < tableArgument.getFieldTypes().size(); i++) {
-      Optional<String> fieldName = tableArgument.getFieldNames().get(i);
-      if (fieldName.isPresent() && 
expectedFieldName.equalsIgnoreCase(fieldName.get())) {
-        requiredIndex = i;
-        break;
-      }
-    }
-    return requiredIndex;
+        
ScalarParameterSpecification.builder().name(SIZE_PARAMETER_NAME).type(Type.INT64).build());
   }
 
   @Override
-  public TableFunctionAnalysis analyze(Map<String, Argument> arguments) {
-    TableArgument tableArgument = (TableArgument) 
arguments.get(DATA_PARAMETER_NAME);
-    String expectedFieldName =
-        (String) ((ScalarArgument) 
arguments.get(TIMECOL_PARAMETER_NAME)).getValue();
-    int requiredIndex = findTimeColumnIndex(tableArgument, expectedFieldName);
-    if (requiredIndex == -1) {
-      throw new UDFException("The required field is not found in the input 
table");
+  public TableFunctionAnalysis analyze(Map<String, Argument> arguments) throws 
UDFException {
+    long size = (long) ((ScalarArgument) 
arguments.get(SIZE_PARAMETER_NAME)).getValue();
+    if (size <= 0) {
+      throw new UDFException("Size must be greater than 0");
     }
-    DescribedSchema properColumnSchema =
-        new DescribedSchema.Builder()
-            .addField("window_start", Type.TIMESTAMP)
-            .addField("window_end", Type.TIMESTAMP)
-            .build();
-
-    // outputColumnSchema
     return TableFunctionAnalysis.builder()
-        .properColumnSchema(properColumnSchema)
-        .requiredColumns(DATA_PARAMETER_NAME, 
Collections.singletonList(requiredIndex))
+        .properColumnSchema(
+            new DescribedSchema.Builder()
+                .addField("window_index", Type.INT64)
+                .addField("count", Type.INT64)
+                .build())
+        .requiredColumns(DATA_PARAMETER_NAME, Collections.singletonList(0))
         .build();
   }
 
   @Override
   public TableFunctionProcessorProvider getProcessorProvider(Map<String, 
Argument> arguments) {
+    long sz = (long) ((ScalarArgument) 
arguments.get(SIZE_PARAMETER_NAME)).getValue();
     return new TableFunctionProcessorProvider() {
       @Override
       public TableFunctionDataProcessor getDataProcessor() {
-        return new HOPDataProcessor(
-            (Long) ((ScalarArgument) 
arguments.get(START_PARAMETER_NAME)).getValue(),
-            (Long) ((ScalarArgument) 
arguments.get(SLIDE_PARAMETER_NAME)).getValue(),
-            (Long) ((ScalarArgument) 
arguments.get(SIZE_PARAMETER_NAME)).getValue());
+        return new CountDataProcessor(sz);
       }
     };
   }
 
-  private static class HOPDataProcessor implements TableFunctionDataProcessor {
+  private static class CountDataProcessor implements 
TableFunctionDataProcessor {
 
-    private final long slide;
     private final long size;
-    private final long start;
+    private final List<Long> currentRowIndexes = new ArrayList<>();
     private long curIndex = 0;
+    private long windowIndex = 0;
 
-    public HOPDataProcessor(long startTime, long slide, long size) {
-      this.slide = slide;
+    public CountDataProcessor(long size) {
       this.size = size;
-      this.start = startTime;
     }
 
     @Override
@@ -136,17 +98,30 @@ public class HOPTableFunction implements TableFunction {
         Record input,
         List<ColumnBuilder> properColumnBuilders,
         ColumnBuilder passThroughIndexBuilder) {
-      // find the first windows that satisfy the condition: start + n*slide <= 
time < start +
-      // n*slide + size
-      long timeValue = input.getLong(0);
-      long window_start = (timeValue - start - size + slide) / slide * slide;
-      while (window_start <= timeValue && window_start + size > timeValue) {
-        properColumnBuilders.get(0).writeLong(window_start);
-        properColumnBuilders.get(1).writeLong(window_start + size - 1);
-        passThroughIndexBuilder.writeLong(curIndex);
-        window_start += slide;
+      if (currentRowIndexes.size() >= size) {
+        outputWindow(properColumnBuilders, passThroughIndexBuilder);
       }
+      currentRowIndexes.add(curIndex);
       curIndex++;
     }
+
+    @Override
+    public void finish(List<ColumnBuilder> columnBuilders, ColumnBuilder 
passThroughIndexBuilder) {
+      if (!currentRowIndexes.isEmpty()) {
+        outputWindow(columnBuilders, passThroughIndexBuilder);
+      }
+    }
+
+    private void outputWindow(
+        List<ColumnBuilder> properColumnBuilders, ColumnBuilder 
passThroughIndexBuilder) {
+      int sz = currentRowIndexes.size();
+      for (Long currentRowIndex : currentRowIndexes) {
+        properColumnBuilders.get(0).writeLong(windowIndex);
+        properColumnBuilders.get(1).writeLong(sz);
+        passThroughIndexBuilder.writeLong(currentRowIndex);
+      }
+      windowIndex++;
+      currentRowIndexes.clear();
+    }
   }
 }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/HOPTableFunction.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/HOPTableFunction.java
index 18527425414..731f37d0a57 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/HOPTableFunction.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/HOPTableFunction.java
@@ -19,7 +19,6 @@
 
 package org.apache.iotdb.commons.udf.builtin.relational.tvf;
 
-import org.apache.iotdb.udf.api.exception.UDFException;
 import org.apache.iotdb.udf.api.relational.TableFunction;
 import org.apache.iotdb.udf.api.relational.access.Record;
 import org.apache.iotdb.udf.api.relational.table.TableFunctionAnalysis;
@@ -40,7 +39,8 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
+
+import static 
org.apache.iotdb.commons.udf.builtin.relational.tvf.WindowTVFUtils.findColumnIndex;
 
 public class HOPTableFunction implements TableFunction {
 
@@ -71,27 +71,13 @@ public class HOPTableFunction implements TableFunction {
             .build());
   }
 
-  private int findTimeColumnIndex(TableArgument tableArgument, String 
expectedFieldName) {
-    int requiredIndex = -1;
-    for (int i = 0; i < tableArgument.getFieldTypes().size(); i++) {
-      Optional<String> fieldName = tableArgument.getFieldNames().get(i);
-      if (fieldName.isPresent() && 
expectedFieldName.equalsIgnoreCase(fieldName.get())) {
-        requiredIndex = i;
-        break;
-      }
-    }
-    return requiredIndex;
-  }
-
   @Override
   public TableFunctionAnalysis analyze(Map<String, Argument> arguments) {
     TableArgument tableArgument = (TableArgument) 
arguments.get(DATA_PARAMETER_NAME);
     String expectedFieldName =
         (String) ((ScalarArgument) 
arguments.get(TIMECOL_PARAMETER_NAME)).getValue();
-    int requiredIndex = findTimeColumnIndex(tableArgument, expectedFieldName);
-    if (requiredIndex == -1) {
-      throw new UDFException("The required field is not found in the input 
table");
-    }
+    int requiredIndex =
+        findColumnIndex(tableArgument, expectedFieldName, 
Collections.singleton(Type.TIMESTAMP));
     DescribedSchema properColumnSchema =
         new DescribedSchema.Builder()
             .addField("window_start", Type.TIMESTAMP)
diff --git 
a/example/udf/src/main/java/org/apache/iotdb/udf/table/HOPTableFunction.java 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/SessionTableFunction.java
similarity index 60%
rename from 
example/udf/src/main/java/org/apache/iotdb/udf/table/HOPTableFunction.java
rename to 
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/SessionTableFunction.java
index fb9f7beac33..a3f49840734 100644
--- a/example/udf/src/main/java/org/apache/iotdb/udf/table/HOPTableFunction.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/SessionTableFunction.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.udf.table;
+package org.apache.iotdb.commons.udf.builtin.relational.tvf;
 
 import org.apache.iotdb.udf.api.exception.UDFException;
 import org.apache.iotdb.udf.api.relational.TableFunction;
@@ -36,19 +36,18 @@ import org.apache.iotdb.udf.api.type.Type;
 
 import org.apache.tsfile.block.column.ColumnBuilder;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 
-public class HOPTableFunction implements TableFunction {
+import static 
org.apache.iotdb.commons.udf.builtin.relational.tvf.WindowTVFUtils.findColumnIndex;
 
+public class SessionTableFunction implements TableFunction {
   private static final String DATA_PARAMETER_NAME = "DATA";
   private static final String TIMECOL_PARAMETER_NAME = "TIMECOL";
-  private static final String SLIDE_PARAMETER_NAME = "SLIDE";
-  private static final String SIZE_PARAMETER_NAME = "SIZE";
-  private static final String START_PARAMETER_NAME = "START";
+  private static final String GAP_PARAMETER_NAME = "GAP";
 
   @Override
   public List<ParameterSpecification> getArgumentsSpecifications() {
@@ -56,42 +55,21 @@ public class HOPTableFunction implements TableFunction {
         TableParameterSpecification.builder()
             .name(DATA_PARAMETER_NAME)
             .passThroughColumns()
-            .keepWhenEmpty()
             .build(),
         ScalarParameterSpecification.builder()
             .name(TIMECOL_PARAMETER_NAME)
             .type(Type.STRING)
             .build(),
-        
ScalarParameterSpecification.builder().name(SLIDE_PARAMETER_NAME).type(Type.INT64).build(),
-        
ScalarParameterSpecification.builder().name(SIZE_PARAMETER_NAME).type(Type.INT64).build(),
-        ScalarParameterSpecification.builder()
-            .name(START_PARAMETER_NAME)
-            .type(Type.TIMESTAMP)
-            .defaultValue(Long.MIN_VALUE)
-            .build());
-  }
-
-  private int findTimeColumnIndex(TableArgument tableArgument, String 
expectedFieldName) {
-    int requiredIndex = -1;
-    for (int i = 0; i < tableArgument.getFieldTypes().size(); i++) {
-      Optional<String> fieldName = tableArgument.getFieldNames().get(i);
-      if (fieldName.isPresent() && 
expectedFieldName.equalsIgnoreCase(fieldName.get())) {
-        requiredIndex = i;
-        break;
-      }
-    }
-    return requiredIndex;
+        
ScalarParameterSpecification.builder().name(GAP_PARAMETER_NAME).type(Type.INT64).build());
   }
 
   @Override
-  public TableFunctionAnalysis analyze(Map<String, Argument> arguments) {
+  public TableFunctionAnalysis analyze(Map<String, Argument> arguments) throws 
UDFException {
     TableArgument tableArgument = (TableArgument) 
arguments.get(DATA_PARAMETER_NAME);
     String expectedFieldName =
         (String) ((ScalarArgument) 
arguments.get(TIMECOL_PARAMETER_NAME)).getValue();
-    int requiredIndex = findTimeColumnIndex(tableArgument, expectedFieldName);
-    if (requiredIndex == -1) {
-      throw new UDFException("The required field is not found in the input 
table");
-    }
+    int requiredIndex =
+        findColumnIndex(tableArgument, expectedFieldName, 
Collections.singleton(Type.TIMESTAMP));
     DescribedSchema properColumnSchema =
         new DescribedSchema.Builder()
             .addField("window_start", Type.TIMESTAMP)
@@ -107,28 +85,25 @@ public class HOPTableFunction implements TableFunction {
 
   @Override
   public TableFunctionProcessorProvider getProcessorProvider(Map<String, 
Argument> arguments) {
+    long gap = (long) ((ScalarArgument) 
arguments.get(GAP_PARAMETER_NAME)).getValue();
     return new TableFunctionProcessorProvider() {
       @Override
       public TableFunctionDataProcessor getDataProcessor() {
-        return new HOPDataProcessor(
-            (Long) ((ScalarArgument) 
arguments.get(START_PARAMETER_NAME)).getValue(),
-            (Long) ((ScalarArgument) 
arguments.get(SLIDE_PARAMETER_NAME)).getValue(),
-            (Long) ((ScalarArgument) 
arguments.get(SIZE_PARAMETER_NAME)).getValue());
+        return new SessionDataProcessor(gap);
       }
     };
   }
 
-  private static class HOPDataProcessor implements TableFunctionDataProcessor {
+  private static class SessionDataProcessor implements 
TableFunctionDataProcessor {
 
-    private final long slide;
-    private final long size;
-    private long curTime;
+    private final long gap;
+    private final List<Long> currentRowIndexes = new ArrayList<>();
     private long curIndex = 0;
+    private long windowStart = -1;
+    private long windowEnd = -1;
 
-    public HOPDataProcessor(long startTime, long slide, long size) {
-      this.slide = slide;
-      this.size = size;
-      this.curTime = startTime;
+    public SessionDataProcessor(long gap) {
+      this.gap = gap;
     }
 
     @Override
@@ -137,22 +112,30 @@ public class HOPTableFunction implements TableFunction {
         List<ColumnBuilder> properColumnBuilders,
         ColumnBuilder passThroughIndexBuilder) {
       long timeValue = input.getLong(0);
-      if (curTime == Long.MIN_VALUE) {
-        curTime = timeValue;
+      if (!currentRowIndexes.isEmpty() && timeValue > windowEnd) {
+        outputWindow(properColumnBuilders, passThroughIndexBuilder);
       }
-      if (curTime + size <= timeValue) {
-        // jump to appropriate window
-        long move = (timeValue - curTime - size) / slide + 1;
-        curTime += move * slide;
+      currentRowIndexes.add(curIndex);
+      windowStart = timeValue;
+      windowEnd = timeValue + gap;
+      curIndex++;
+    }
+
+    @Override
+    public void finish(List<ColumnBuilder> columnBuilders, ColumnBuilder 
passThroughIndexBuilder) {
+      if (!currentRowIndexes.isEmpty()) {
+        outputWindow(columnBuilders, passThroughIndexBuilder);
       }
-      long slideTime = curTime;
-      while (slideTime <= timeValue && slideTime + size > timeValue) {
-        properColumnBuilders.get(0).writeLong(slideTime);
-        properColumnBuilders.get(1).writeLong(slideTime + size);
-        passThroughIndexBuilder.writeLong(curIndex);
-        slideTime += slide;
+    }
+
+    private void outputWindow(
+        List<ColumnBuilder> properColumnBuilders, ColumnBuilder 
passThroughIndexBuilder) {
+      for (Long currentRowIndex : currentRowIndexes) {
+        properColumnBuilders.get(0).writeLong(windowStart);
+        properColumnBuilders.get(1).writeLong(windowEnd - 1);
+        passThroughIndexBuilder.writeLong(currentRowIndex);
       }
-      curIndex++;
+      currentRowIndexes.clear();
     }
   }
 }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/HOPTableFunction.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/VarianceTableFunction.java
similarity index 55%
copy from 
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/HOPTableFunction.java
copy to 
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/VarianceTableFunction.java
index 18527425414..56979d374f2 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/HOPTableFunction.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/VarianceTableFunction.java
@@ -34,70 +34,51 @@ import 
org.apache.iotdb.udf.api.relational.table.specification.ScalarParameterSp
 import 
org.apache.iotdb.udf.api.relational.table.specification.TableParameterSpecification;
 import org.apache.iotdb.udf.api.type.Type;
 
+import com.google.common.collect.ImmutableSet;
 import org.apache.tsfile.block.column.ColumnBuilder;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 
-public class HOPTableFunction implements TableFunction {
+import static 
org.apache.iotdb.commons.udf.builtin.relational.tvf.WindowTVFUtils.findColumnIndex;
 
+public class VarianceTableFunction implements TableFunction {
   private static final String DATA_PARAMETER_NAME = "DATA";
-  private static final String TIMECOL_PARAMETER_NAME = "TIMECOL";
-  private static final String SLIDE_PARAMETER_NAME = "SLIDE";
-  private static final String SIZE_PARAMETER_NAME = "SIZE";
-  private static final String START_PARAMETER_NAME = "START";
+  private static final String COL_PARAMETER_NAME = "COL";
+  private static final String DELTA_PARAMETER_NAME = "DELTA";
 
   @Override
   public List<ParameterSpecification> getArgumentsSpecifications() {
     return Arrays.asList(
         TableParameterSpecification.builder()
             .name(DATA_PARAMETER_NAME)
-            .rowSemantics()
             .passThroughColumns()
             .build(),
+        
ScalarParameterSpecification.builder().name(COL_PARAMETER_NAME).type(Type.STRING).build(),
         ScalarParameterSpecification.builder()
-            .name(TIMECOL_PARAMETER_NAME)
-            .type(Type.STRING)
-            .build(),
-        
ScalarParameterSpecification.builder().name(SLIDE_PARAMETER_NAME).type(Type.INT64).build(),
-        
ScalarParameterSpecification.builder().name(SIZE_PARAMETER_NAME).type(Type.INT64).build(),
-        ScalarParameterSpecification.builder()
-            .name(START_PARAMETER_NAME)
-            .type(Type.TIMESTAMP)
-            .defaultValue(0L)
+            .name(DELTA_PARAMETER_NAME)
+            .type(Type.DOUBLE)
             .build());
   }
 
-  private int findTimeColumnIndex(TableArgument tableArgument, String 
expectedFieldName) {
-    int requiredIndex = -1;
-    for (int i = 0; i < tableArgument.getFieldTypes().size(); i++) {
-      Optional<String> fieldName = tableArgument.getFieldNames().get(i);
-      if (fieldName.isPresent() && 
expectedFieldName.equalsIgnoreCase(fieldName.get())) {
-        requiredIndex = i;
-        break;
-      }
-    }
-    return requiredIndex;
-  }
-
   @Override
-  public TableFunctionAnalysis analyze(Map<String, Argument> arguments) {
+  public TableFunctionAnalysis analyze(Map<String, Argument> arguments) throws 
UDFException {
     TableArgument tableArgument = (TableArgument) 
arguments.get(DATA_PARAMETER_NAME);
     String expectedFieldName =
-        (String) ((ScalarArgument) 
arguments.get(TIMECOL_PARAMETER_NAME)).getValue();
-    int requiredIndex = findTimeColumnIndex(tableArgument, expectedFieldName);
-    if (requiredIndex == -1) {
-      throw new UDFException("The required field is not found in the input 
table");
-    }
+        (String) ((ScalarArgument) 
arguments.get(COL_PARAMETER_NAME)).getValue();
+    int requiredIndex =
+        findColumnIndex(
+            tableArgument,
+            expectedFieldName,
+            ImmutableSet.of(Type.INT32, Type.INT64, Type.FLOAT, Type.DOUBLE));
     DescribedSchema properColumnSchema =
         new DescribedSchema.Builder()
-            .addField("window_start", Type.TIMESTAMP)
-            .addField("window_end", Type.TIMESTAMP)
+            .addField("window_index", Type.INT64)
+            .addField("base_value", Type.DOUBLE)
             .build();
-
     // outputColumnSchema
     return TableFunctionAnalysis.builder()
         .properColumnSchema(properColumnSchema)
@@ -107,28 +88,25 @@ public class HOPTableFunction implements TableFunction {
 
   @Override
   public TableFunctionProcessorProvider getProcessorProvider(Map<String, 
Argument> arguments) {
+    double delta = (double) ((ScalarArgument) 
arguments.get(DELTA_PARAMETER_NAME)).getValue();
     return new TableFunctionProcessorProvider() {
       @Override
       public TableFunctionDataProcessor getDataProcessor() {
-        return new HOPDataProcessor(
-            (Long) ((ScalarArgument) 
arguments.get(START_PARAMETER_NAME)).getValue(),
-            (Long) ((ScalarArgument) 
arguments.get(SLIDE_PARAMETER_NAME)).getValue(),
-            (Long) ((ScalarArgument) 
arguments.get(SIZE_PARAMETER_NAME)).getValue());
+        return new VarianceDataProcessor(delta);
       }
     };
   }
 
-  private static class HOPDataProcessor implements TableFunctionDataProcessor {
+  private static class VarianceDataProcessor implements 
TableFunctionDataProcessor {
 
-    private final long slide;
-    private final long size;
-    private final long start;
+    private final double gap;
+    private final List<Long> currentRowIndexes = new ArrayList<>();
+    private double baseValue = 0;
     private long curIndex = 0;
+    private long windowIndex = 0;
 
-    public HOPDataProcessor(long startTime, long slide, long size) {
-      this.slide = slide;
-      this.size = size;
-      this.start = startTime;
+    public VarianceDataProcessor(double delta) {
+      this.gap = delta;
     }
 
     @Override
@@ -136,17 +114,34 @@ public class HOPTableFunction implements TableFunction {
         Record input,
         List<ColumnBuilder> properColumnBuilders,
         ColumnBuilder passThroughIndexBuilder) {
-      // find the first windows that satisfy the condition: start + n*slide <= 
time < start +
-      // n*slide + size
-      long timeValue = input.getLong(0);
-      long window_start = (timeValue - start - size + slide) / slide * slide;
-      while (window_start <= timeValue && window_start + size > timeValue) {
-        properColumnBuilders.get(0).writeLong(window_start);
-        properColumnBuilders.get(1).writeLong(window_start + size - 1);
-        passThroughIndexBuilder.writeLong(curIndex);
-        window_start += slide;
+      double value = input.getDouble(0);
+      if (!currentRowIndexes.isEmpty() && Math.abs(value - baseValue) > gap) {
+        outputWindow(properColumnBuilders, passThroughIndexBuilder);
       }
+      if (currentRowIndexes.isEmpty()) {
+        // use the first value in the window as the base value
+        baseValue = value;
+      }
+      currentRowIndexes.add(curIndex);
       curIndex++;
     }
+
+    @Override
+    public void finish(List<ColumnBuilder> columnBuilders, ColumnBuilder 
passThroughIndexBuilder) {
+      if (!currentRowIndexes.isEmpty()) {
+        outputWindow(columnBuilders, passThroughIndexBuilder);
+      }
+    }
+
+    private void outputWindow(
+        List<ColumnBuilder> properColumnBuilders, ColumnBuilder 
passThroughIndexBuilder) {
+      for (Long currentRowIndex : currentRowIndexes) {
+        properColumnBuilders.get(0).writeLong(windowIndex);
+        properColumnBuilders.get(1).writeDouble(baseValue);
+        passThroughIndexBuilder.writeLong(currentRowIndex);
+      }
+      currentRowIndexes.clear();
+      windowIndex++;
+    }
   }
 }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/WindowTVFUtils.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/WindowTVFUtils.java
new file mode 100644
index 00000000000..9c9be03fbba
--- /dev/null
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/WindowTVFUtils.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.udf.builtin.relational.tvf;
+
+import org.apache.iotdb.udf.api.exception.UDFException;
+import org.apache.iotdb.udf.api.relational.table.argument.TableArgument;
+import org.apache.iotdb.udf.api.type.Type;
+
+import java.util.Optional;
+import java.util.Set;
+
+public class WindowTVFUtils {
+  /**
+   * Find the index of the column in the table argument.
+   *
+   * @param tableArgument the table argument
+   * @param expectedFieldName the expected field name
+   * @param expectedTypes the expected types
+   * @return the index of the time column, -1 if not found
+   */
+  public static int findColumnIndex(
+      TableArgument tableArgument, String expectedFieldName, Set<Type> 
expectedTypes)
+      throws UDFException {
+    for (int i = 0; i < tableArgument.getFieldTypes().size(); i++) {
+      Optional<String> fieldName = tableArgument.getFieldNames().get(i);
+      if (fieldName.isPresent() && 
expectedFieldName.equalsIgnoreCase(fieldName.get())) {
+        if (!expectedTypes.contains(tableArgument.getFieldTypes().get(i))) {
+          throw new UDFException(
+              String.format("The type of the column [%s] is not as expected.", 
expectedFieldName));
+        }
+        return i;
+      }
+    }
+    throw new UDFException(
+        String.format(
+            "Required column [%s] not found in the source table argument.", 
expectedFieldName));
+  }
+}
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFManagementService.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFManagementService.java
index fe62cb4d291..762944c02c4 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFManagementService.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/service/UDFManagementService.java
@@ -28,6 +28,7 @@ import 
org.apache.iotdb.commons.udf.builtin.BuiltinScalarFunction;
 import 
org.apache.iotdb.commons.udf.builtin.BuiltinTimeSeriesGeneratingFunction;
 import 
org.apache.iotdb.commons.udf.builtin.relational.TableBuiltinAggregationFunction;
 import 
org.apache.iotdb.commons.udf.builtin.relational.TableBuiltinScalarFunction;
+import 
org.apache.iotdb.commons.udf.builtin.relational.TableBuiltinTableFunction;
 import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.udf.api.UDF;
 import org.apache.iotdb.udf.api.exception.UDFException;
@@ -120,6 +121,8 @@ public class UDFManagementService {
       return TableBuiltinScalarFunction.getBuiltInScalarFunctionName()
               .contains(functionName.toLowerCase())
           || TableBuiltinAggregationFunction.getBuiltInAggregateFunctionName()
+              .contains(functionName.toLowerCase())
+          || TableBuiltinTableFunction.getBuiltInTableFunctionName()
               .contains(functionName.toLowerCase());
     }
   }


Reply via email to