This is an automated email from the ASF dual-hosted git repository.

chenyz pushed a commit to branch udtf
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/udtf by this push:
     new cb66448a1d7 add leaf operator
cb66448a1d7 is described below

commit cb66448a1d7f2985e3cb6a560fc32e65ef749dcc
Author: Chen YZ <[email protected]>
AuthorDate: Thu Jan 16 17:35:26 2025 +0800

    add leaf operator
---
 .../table/TableFunctionProcessorProvider.java      |   8 +-
 ...cessor.java => TableFunctionLeafProcessor.java} |  17 +++-
 .../processor/TableFunctionProcessorState.java     |   2 +-
 .../execution/function/table/HOPTableFunction.java |  45 ++++++---
 .../execution/function/table/SplitFunction.java    | 100 +++++++++++++++++++
 .../function/TableFunctionLeafOperator.java        | 110 +++++++++++++++++++++
 .../process/function/TableFunctionOperator.java    |  11 ++-
 .../plan/planner/TableOperatorGenerator.java       |  16 ++-
 .../relational/metadata/TableMetadataImpl.java     |   3 +
 .../distribute/TableDistributedPlanGenerator.java  |   3 +
 10 files changed, 287 insertions(+), 28 deletions(-)

diff --git 
a/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/table/TableFunctionProcessorProvider.java
 
b/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/table/TableFunctionProcessorProvider.java
index 180d8bb0e51..fcbc1a198c1 100644
--- 
a/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/table/TableFunctionProcessorProvider.java
+++ 
b/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/table/TableFunctionProcessorProvider.java
@@ -20,7 +20,7 @@
 package org.apache.iotdb.udf.api.relational.table;
 
 import 
org.apache.iotdb.udf.api.relational.table.processor.TableFunctionDataProcessor;
-import 
org.apache.iotdb.udf.api.relational.table.processor.TableFunctionSplitProcessor;
+import 
org.apache.iotdb.udf.api.relational.table.processor.TableFunctionLeafProcessor;
 
 public interface TableFunctionProcessorProvider {
   /**
@@ -33,11 +33,11 @@ public interface TableFunctionProcessorProvider {
   }
 
   /**
-   * This method returns a {@linkplain TableFunctionSplitProcessor}. All the 
necessary information
+   * This method returns a {@linkplain TableFunctionLeafProcessor}. All the 
necessary information
    * collected during analysis is available in the implementation of 
TableFunctionProcessorProvider.
    * It is called once per each split processed by the table function.
    */
-  default TableFunctionSplitProcessor getSplitProcessor() {
-    throw new UnsupportedOperationException("this table function does not 
process splits");
+  default TableFunctionLeafProcessor getSplitProcessor() {
+    throw new UnsupportedOperationException("this table function does not 
process leaf data");
   }
 }
diff --git 
a/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/table/processor/TableFunctionSplitProcessor.java
 
b/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/table/processor/TableFunctionLeafProcessor.java
similarity index 62%
rename from 
iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/table/processor/TableFunctionSplitProcessor.java
rename to 
iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/table/processor/TableFunctionLeafProcessor.java
index 49f747c3db1..a8a49af3fa8 100644
--- 
a/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/table/processor/TableFunctionSplitProcessor.java
+++ 
b/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/table/processor/TableFunctionLeafProcessor.java
@@ -19,4 +19,19 @@
 
 package org.apache.iotdb.udf.api.relational.table.processor;
 
-public interface TableFunctionSplitProcessor {}
+import org.apache.tsfile.block.column.ColumnBuilder;
+
+import java.util.List;
+
+public interface TableFunctionLeafProcessor {
+  /**
+   * This method processes a portion of data. It is called multiple times 
until the processor is
+   * fully processed.
+   *
+   * @param columnBuilders a list of {@link ColumnBuilder} for each column in 
the output table.
+   */
+  void process(List<ColumnBuilder> columnBuilders);
+
+  /** This method is called to determine if the processor has finished 
processing all data. */
+  boolean isFinish();
+}
diff --git 
a/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/table/processor/TableFunctionProcessorState.java
 
b/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/table/processor/TableFunctionProcessorState.java
index 94502d930a6..e471319570c 100644
--- 
a/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/table/processor/TableFunctionProcessorState.java
+++ 
b/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/table/processor/TableFunctionProcessorState.java
@@ -11,7 +11,7 @@ import static java.util.Objects.requireNonNull;
 
 /**
  * The result of processing input by {@link TableFunctionDataProcessor} or 
{@link
- * TableFunctionSplitProcessor}. It can optionally include a portion of output 
data in the form of
+ * TableFunctionLeafProcessor}. It can optionally include a portion of output 
data in the form of
  * {@link List<Column>}
  *
  * <ul>
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/function/table/HOPTableFunction.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/function/table/HOPTableFunction.java
index e7b11e26ed1..ce4021a0771 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/function/table/HOPTableFunction.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/function/table/HOPTableFunction.java
@@ -50,6 +50,7 @@ public class HOPTableFunction implements TableFunction {
   private static final String TIMECOL_PARAMETER_NAME = "TIMECOL";
   private static final String SLIDE_PARAMETER_NAME = "SLIDE";
   private static final String SIZE_PARAMETER_NAME = "SIZE";
+  private static final String START_PARAMETER_NAME = "START";
 
   @Override
   public List<ParameterSpecification> getArgumentsSpecification() {
@@ -64,7 +65,12 @@ public class HOPTableFunction implements TableFunction {
             .type(Type.STRING)
             .build(),
         
ScalarParameterSpecification.builder().name(SLIDE_PARAMETER_NAME).type(Type.INT64).build(),
-        
ScalarParameterSpecification.builder().name(SIZE_PARAMETER_NAME).type(Type.INT64).build());
+        
ScalarParameterSpecification.builder().name(SIZE_PARAMETER_NAME).type(Type.INT64).build(),
+        ScalarParameterSpecification.builder()
+            .name(START_PARAMETER_NAME)
+            .type(Type.TIMESTAMP)
+            .defaultValue(Long.MIN_VALUE)
+            .build());
   }
 
   private int findTimeColumnIndex(TableArgument tableArgument, String 
expectedFieldName) {
@@ -115,6 +121,7 @@ public class HOPTableFunction implements TableFunction {
       @Override
       public TableFunctionDataProcessor getDataProcessor() {
         return new HOPDataProcessor(
+            (Long) ((ScalarArgument) 
arguments.get(START_PARAMETER_NAME)).getValue(),
             (Long) ((ScalarArgument) 
arguments.get(SLIDE_PARAMETER_NAME)).getValue() * 1000,
             (Long) ((ScalarArgument) 
arguments.get(SLIDE_PARAMETER_NAME)).getValue() * 1000,
             requiredIndex);
@@ -127,33 +134,39 @@ public class HOPTableFunction implements TableFunction {
     private final long slide;
     private final long size;
     private final int timeColumnIndex;
-    private long startTime = Long.MIN_VALUE;
+    private long curTime;
 
-    public HOPDataProcessor(long slide, long size, int timeColumnIndex) {
+    public HOPDataProcessor(long startTime, long slide, long size, int 
timeColumnIndex) {
       this.slide = slide;
       this.size = size;
+      this.curTime = startTime;
       this.timeColumnIndex = timeColumnIndex;
     }
 
     @Override
     public void process(Record input, List<ColumnBuilder> columnBuilders) {
-      long curTime = input.getLong(0);
-      if (startTime == Long.MIN_VALUE) {
-        startTime = curTime;
+      long timeValue = input.getLong(timeColumnIndex);
+      if (curTime == Long.MIN_VALUE) {
+        curTime = timeValue;
       }
-      while (curTime - startTime >= size) {
-        startTime += slide;
+      if (curTime + size <= timeValue) {
+        // jump to appropriate window
+        long move = (timeValue - curTime - size) / slide + 1;
+        curTime += move * slide;
       }
-
-      for (int i = 0; i < input.size(); i++) {
-        if (input.isNull(i)) {
-          columnBuilders.get(i + 2).appendNull();
-        } else {
-          columnBuilders.get(i + 2).writeObject(input.getObject(i));
+      long slideTime = curTime;
+      while (slideTime <= timeValue && slideTime + size > timeValue) {
+        for (int i = 0; i < input.size(); i++) {
+          if (input.isNull(i)) {
+            columnBuilders.get(i + 2).appendNull();
+          } else {
+            columnBuilders.get(i + 2).writeObject(input.getObject(i));
+          }
         }
+        columnBuilders.get(0).writeLong(slideTime);
+        columnBuilders.get(1).writeLong(slideTime + size);
+        slideTime += slide;
       }
-      columnBuilders.get(0).writeLong(startTime);
-      columnBuilders.get(1).writeLong(startTime + size);
     }
 
     @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/function/table/SplitFunction.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/function/table/SplitFunction.java
new file mode 100644
index 00000000000..263cfa79d7f
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/function/table/SplitFunction.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.execution.function.table;
+
+import org.apache.iotdb.udf.api.exception.UDFException;
+import org.apache.iotdb.udf.api.relational.TableFunction;
+import org.apache.iotdb.udf.api.relational.table.TableFunctionAnalysis;
+import 
org.apache.iotdb.udf.api.relational.table.TableFunctionProcessorProvider;
+import org.apache.iotdb.udf.api.relational.table.argument.Argument;
+import org.apache.iotdb.udf.api.relational.table.argument.DescribedSchema;
+import org.apache.iotdb.udf.api.relational.table.argument.ScalarArgument;
+import 
org.apache.iotdb.udf.api.relational.table.processor.TableFunctionLeafProcessor;
+import 
org.apache.iotdb.udf.api.relational.table.specification.ParameterSpecification;
+import 
org.apache.iotdb.udf.api.relational.table.specification.ScalarParameterSpecification;
+import org.apache.iotdb.udf.api.type.ColumnCategory;
+import org.apache.iotdb.udf.api.type.Type;
+
+import org.apache.tsfile.block.column.ColumnBuilder;
+import org.apache.tsfile.common.conf.TSFileConfig;
+import org.apache.tsfile.utils.Binary;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+public class SplitFunction implements TableFunction {
+  private final String INPUT_PARAMETER_NAME = "INPUT";
+  private final String SPLIT_PARAMETER_NAME = "SPLIT";
+
+  @Override
+  public List<ParameterSpecification> getArgumentsSpecification() {
+    return Arrays.asList(
+        
ScalarParameterSpecification.builder().name(INPUT_PARAMETER_NAME).type(Type.STRING).build(),
+        ScalarParameterSpecification.builder()
+            .name(SPLIT_PARAMETER_NAME)
+            .type(Type.STRING)
+            .defaultValue(",")
+            .build());
+  }
+
+  @Override
+  public TableFunctionAnalysis analyze(Map<String, Argument> arguments) throws 
UDFException {
+    DescribedSchema schema =
+        DescribedSchema.builder().addField("output", Type.STRING, 
ColumnCategory.FIELD).build();
+    return TableFunctionAnalysis.builder().properColumnSchema(schema).build();
+  }
+
+  @Override
+  public TableFunctionProcessorProvider getProcessorProvider(Map<String, 
Argument> arguments) {
+    return new TableFunctionProcessorProvider() {
+      @Override
+      public TableFunctionLeafProcessor getSplitProcessor() {
+        return new SplitProcessor(
+            (String) ((ScalarArgument) 
arguments.get(INPUT_PARAMETER_NAME)).getValue(),
+            (String) ((ScalarArgument) 
arguments.get(SPLIT_PARAMETER_NAME)).getValue());
+      }
+    };
+  }
+
+  private static class SplitProcessor implements TableFunctionLeafProcessor {
+    private final String input;
+    private final String split;
+    private boolean finish = false;
+
+    SplitProcessor(String input, String split) {
+      this.input = input;
+      this.split = split;
+    }
+
+    @Override
+    public void process(List<ColumnBuilder> columnBuilders) {
+      for (String s : input.split(split)) {
+        columnBuilders.get(0).writeBinary(new Binary(s, 
TSFileConfig.STRING_CHARSET));
+      }
+      finish = true;
+    }
+
+    @Override
+    public boolean isFinish() {
+      return finish;
+    }
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/TableFunctionLeafOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/TableFunctionLeafOperator.java
new file mode 100644
index 00000000000..cd9e219c6b6
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/TableFunctionLeafOperator.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.execution.operator.process.function;
+
+import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
+import 
org.apache.iotdb.db.queryengine.execution.operator.process.ProcessOperator;
+import 
org.apache.iotdb.udf.api.relational.table.TableFunctionProcessorProvider;
+import 
org.apache.iotdb.udf.api.relational.table.processor.TableFunctionLeafProcessor;
+
+import org.apache.tsfile.block.column.ColumnBuilder;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.read.common.block.TsBlock;
+import org.apache.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.tsfile.read.common.block.column.RunLengthEncodedColumn;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.TableScanOperator.TIME_COLUMN_TEMPLATE;
+
+// only one input source is supported now
+public class TableFunctionLeafOperator implements ProcessOperator {
+
+  private final OperatorContext operatorContext;
+  private final TsBlockBuilder blockBuilder;
+
+  private final TableFunctionLeafProcessor processor;
+
+  public TableFunctionLeafOperator(
+      OperatorContext operatorContext,
+      TableFunctionProcessorProvider processorProvider,
+      List<TSDataType> outputDataTypes) {
+    this.operatorContext = operatorContext;
+    this.processor = processorProvider.getSplitProcessor();
+    this.blockBuilder = new TsBlockBuilder(outputDataTypes);
+  }
+
+  @Override
+  public OperatorContext getOperatorContext() {
+    return this.operatorContext;
+  }
+
+  @Override
+  public TsBlock next() throws Exception {
+    List<ColumnBuilder> columnBuilders = getOutputColumnBuilders();
+    processor.process(columnBuilders);
+    return buildTsBlock(columnBuilders);
+  }
+
+  private List<ColumnBuilder> getOutputColumnBuilders() {
+    blockBuilder.reset();
+    return Arrays.asList(blockBuilder.getValueColumnBuilders());
+  }
+
+  private TsBlock buildTsBlock(List<ColumnBuilder> columnBuilders) {
+    int positionCount = columnBuilders.get(0).getPositionCount();
+    blockBuilder.declarePositions(positionCount);
+    return blockBuilder.build(new RunLengthEncodedColumn(TIME_COLUMN_TEMPLATE, 
positionCount));
+  }
+
+  @Override
+  public boolean hasNext() throws Exception {
+    return !processor.isFinish();
+  }
+
+  @Override
+  public void close() throws Exception {}
+
+  @Override
+  public boolean isFinished() throws Exception {
+    return processor.isFinish();
+  }
+
+  @Override
+  public long calculateMaxPeekMemory() {
+    return 0;
+  }
+
+  @Override
+  public long calculateMaxReturnSize() {
+    return 0;
+  }
+
+  @Override
+  public long calculateRetainedSizeAfterCallingNext() {
+    return 0;
+  }
+
+  @Override
+  public long ramBytesUsed() {
+    return 0;
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/TableFunctionOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/TableFunctionOperator.java
index a4e36451824..0ad55973e38 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/TableFunctionOperator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/TableFunctionOperator.java
@@ -69,7 +69,7 @@ public class TableFunctionOperator implements ProcessOperator 
{
     this.processorProvider = processorProvider;
     this.partitionRecognizer =
         new PartitionRecognizer(partitionChannels, requiredChannels, 
outputDataTypes);
-    this.partitionState = PartitionState.INIT_STATE;
+    this.partitionState = null;
     this.blockBuilder = new TsBlockBuilder(outputDataTypes);
   }
 
@@ -107,6 +107,8 @@ public class TableFunctionOperator implements 
ProcessOperator {
   @Override
   public TsBlock next() throws Exception {
     PartitionState.StateType stateType = partitionState.getStateType();
+    Iterator<Record> recordIterator = partitionState.getRecordIterator();
+    partitionState = null;
     if (stateType == PartitionState.StateType.INIT
         || stateType == PartitionState.StateType.NEED_MORE_DATA) {
       isBlocked = null;
@@ -126,7 +128,6 @@ public class TableFunctionOperator implements 
ProcessOperator {
         }
         processor = processorProvider.getDataProcessor();
       }
-      Iterator<Record> recordIterator = partitionState.getRecordIterator();
       while (recordIterator.hasNext()) {
         processor.process(recordIterator.next(), columnBuilders);
       }
@@ -147,8 +148,10 @@ public class TableFunctionOperator implements 
ProcessOperator {
 
   @Override
   public boolean hasNext() throws Exception {
-    isBlocked().get(); // wait for the next TsBlock
-    partitionState = partitionRecognizer.getState();
+    if (partitionState == null) {
+      isBlocked().get(); // wait for the next TsBlock
+      partitionState = partitionRecognizer.getState();
+    }
     return !finished;
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
index a700dfb246c..01377a469ff 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
@@ -61,6 +61,7 @@ import 
org.apache.iotdb.db.queryengine.execution.operator.process.fill.constant.
 import 
org.apache.iotdb.db.queryengine.execution.operator.process.fill.constant.FloatConstantFill;
 import 
org.apache.iotdb.db.queryengine.execution.operator.process.fill.constant.IntConstantFill;
 import 
org.apache.iotdb.db.queryengine.execution.operator.process.fill.constant.LongConstantFill;
+import 
org.apache.iotdb.db.queryengine.execution.operator.process.function.TableFunctionLeafOperator;
 import 
org.apache.iotdb.db.queryengine.execution.operator.process.function.TableFunctionOperator;
 import 
org.apache.iotdb.db.queryengine.execution.operator.process.gapfill.GapFillWGroupWMoOperator;
 import 
org.apache.iotdb.db.queryengine.execution.operator.process.gapfill.GapFillWGroupWoMoOperator;
@@ -2252,8 +2253,19 @@ public class TableOperatorGenerator extends 
PlanVisitor<Operator, LocalExecution
     TableFunctionProcessorProvider processorProvider =
         tableFunction.getProcessorProvider(node.getArguments());
     if (node.getChildren().isEmpty()) {
-      // TODO(UDF): leaf node
-      throw new UnsupportedOperationException();
+      List<TSDataType> outputDataTypes =
+          node.getOutputSymbols().stream()
+              .map(context.getTypeProvider()::getTableModelType)
+              .map(InternalTypeManager::getTSDataType)
+              .collect(Collectors.toList());
+      OperatorContext operatorContext =
+          context
+              .getDriverContext()
+              .addOperatorContext(
+                  context.getNextOperatorId(),
+                  node.getPlanNodeId(),
+                  TableFunctionLeafOperator.class.getSimpleName());
+      return new TableFunctionLeafOperator(operatorContext, processorProvider, 
outputDataTypes);
     } else {
       Operator operator = node.getChild().accept(this, context);
       OperatorContext operatorContext =
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/TableMetadataImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/TableMetadataImpl.java
index ff00e007d6f..2a556889336 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/TableMetadataImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/TableMetadataImpl.java
@@ -32,6 +32,7 @@ import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
 import org.apache.iotdb.db.queryengine.common.SessionInfo;
 import 
org.apache.iotdb.db.queryengine.execution.function.table.ExcludeColumnFunction;
 import 
org.apache.iotdb.db.queryengine.execution.function.table.HOPTableFunction;
+import org.apache.iotdb.db.queryengine.execution.function.table.SplitFunction;
 import org.apache.iotdb.db.queryengine.plan.analyze.ClusterPartitionFetcher;
 import org.apache.iotdb.db.queryengine.plan.analyze.IPartitionFetcher;
 import org.apache.iotdb.db.queryengine.plan.relational.function.OperatorType;
@@ -793,6 +794,8 @@ public class TableMetadataImpl implements Metadata {
       return new HOPTableFunction();
     } else if ("EXCLUDE".equalsIgnoreCase(functionName)) {
       return new ExcludeColumnFunction();
+    } else if ("SPLIT".equalsIgnoreCase(functionName)) {
+      return new SplitFunction();
     } else {
       return null;
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java
index 46c40c534a7..c987c94268c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java
@@ -815,6 +815,9 @@ public class TableDistributedPlanGenerator
   @Override
   public List<PlanNode> visitTableFunctionProcessor(
       TableFunctionProcessorNode node, PlanContext context) {
+    if (node.getChildren().isEmpty()) {
+      return Collections.singletonList(node);
+    }
     List<PlanNode> childrenNodes = node.getChild().accept(this, context);
     if (childrenNodes.size() == 1) {
       node.setChild(childrenNodes.get(0));

Reply via email to