This is an automated email from the ASF dual-hosted git repository.
chenyz pushed a commit to branch udtf
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/udtf by this push:
new cb66448a1d7 add leaf operator
cb66448a1d7 is described below
commit cb66448a1d7f2985e3cb6a560fc32e65ef749dcc
Author: Chen YZ <[email protected]>
AuthorDate: Thu Jan 16 17:35:26 2025 +0800
add leaf operator
---
.../table/TableFunctionProcessorProvider.java | 8 +-
...cessor.java => TableFunctionLeafProcessor.java} | 17 +++-
.../processor/TableFunctionProcessorState.java | 2 +-
.../execution/function/table/HOPTableFunction.java | 45 ++++++---
.../execution/function/table/SplitFunction.java | 100 +++++++++++++++++++
.../function/TableFunctionLeafOperator.java | 110 +++++++++++++++++++++
.../process/function/TableFunctionOperator.java | 11 ++-
.../plan/planner/TableOperatorGenerator.java | 16 ++-
.../relational/metadata/TableMetadataImpl.java | 3 +
.../distribute/TableDistributedPlanGenerator.java | 3 +
10 files changed, 287 insertions(+), 28 deletions(-)
diff --git
a/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/table/TableFunctionProcessorProvider.java
b/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/table/TableFunctionProcessorProvider.java
index 180d8bb0e51..fcbc1a198c1 100644
---
a/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/table/TableFunctionProcessorProvider.java
+++
b/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/table/TableFunctionProcessorProvider.java
@@ -20,7 +20,7 @@
package org.apache.iotdb.udf.api.relational.table;
import
org.apache.iotdb.udf.api.relational.table.processor.TableFunctionDataProcessor;
-import
org.apache.iotdb.udf.api.relational.table.processor.TableFunctionSplitProcessor;
+import
org.apache.iotdb.udf.api.relational.table.processor.TableFunctionLeafProcessor;
public interface TableFunctionProcessorProvider {
/**
@@ -33,11 +33,11 @@ public interface TableFunctionProcessorProvider {
}
/**
- * This method returns a {@linkplain TableFunctionSplitProcessor}. All the
necessary information
+ * This method returns a {@linkplain TableFunctionLeafProcessor}. All the
necessary information
* collected during analysis is available in the implementation of
TableFunctionProcessorProvider.
* It is called once per each split processed by the table function.
*/
- default TableFunctionSplitProcessor getSplitProcessor() {
- throw new UnsupportedOperationException("this table function does not
process splits");
+ default TableFunctionLeafProcessor getSplitProcessor() {
+ throw new UnsupportedOperationException("this table function does not
process leaf data");
}
}
diff --git
a/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/table/processor/TableFunctionSplitProcessor.java
b/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/table/processor/TableFunctionLeafProcessor.java
similarity index 62%
rename from
iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/table/processor/TableFunctionSplitProcessor.java
rename to
iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/table/processor/TableFunctionLeafProcessor.java
index 49f747c3db1..a8a49af3fa8 100644
---
a/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/table/processor/TableFunctionSplitProcessor.java
+++
b/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/table/processor/TableFunctionLeafProcessor.java
@@ -19,4 +19,19 @@
package org.apache.iotdb.udf.api.relational.table.processor;
-public interface TableFunctionSplitProcessor {}
+import org.apache.tsfile.block.column.ColumnBuilder;
+
+import java.util.List;
+
+public interface TableFunctionLeafProcessor {
+ /**
+ * This method processes a portion of data. It is called multiple times
until the processor is
+ * fully processed.
+ *
+ * @param columnBuilders a list of {@link ColumnBuilder} for each column in
the output table.
+ */
+ void process(List<ColumnBuilder> columnBuilders);
+
+ /** This method is called to determine if the processor has finished
processing all data. */
+ boolean isFinish();
+}
diff --git
a/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/table/processor/TableFunctionProcessorState.java
b/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/table/processor/TableFunctionProcessorState.java
index 94502d930a6..e471319570c 100644
---
a/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/table/processor/TableFunctionProcessorState.java
+++
b/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/table/processor/TableFunctionProcessorState.java
@@ -11,7 +11,7 @@ import static java.util.Objects.requireNonNull;
/**
* The result of processing input by {@link TableFunctionDataProcessor} or
{@link
- * TableFunctionSplitProcessor}. It can optionally include a portion of output
data in the form of
+ * TableFunctionLeafProcessor}. It can optionally include a portion of output
data in the form of
* {@link List<Column>}
*
* <ul>
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/function/table/HOPTableFunction.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/function/table/HOPTableFunction.java
index e7b11e26ed1..ce4021a0771 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/function/table/HOPTableFunction.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/function/table/HOPTableFunction.java
@@ -50,6 +50,7 @@ public class HOPTableFunction implements TableFunction {
private static final String TIMECOL_PARAMETER_NAME = "TIMECOL";
private static final String SLIDE_PARAMETER_NAME = "SLIDE";
private static final String SIZE_PARAMETER_NAME = "SIZE";
+ private static final String START_PARAMETER_NAME = "START";
@Override
public List<ParameterSpecification> getArgumentsSpecification() {
@@ -64,7 +65,12 @@ public class HOPTableFunction implements TableFunction {
.type(Type.STRING)
.build(),
ScalarParameterSpecification.builder().name(SLIDE_PARAMETER_NAME).type(Type.INT64).build(),
-
ScalarParameterSpecification.builder().name(SIZE_PARAMETER_NAME).type(Type.INT64).build());
+
ScalarParameterSpecification.builder().name(SIZE_PARAMETER_NAME).type(Type.INT64).build(),
+ ScalarParameterSpecification.builder()
+ .name(START_PARAMETER_NAME)
+ .type(Type.TIMESTAMP)
+ .defaultValue(Long.MIN_VALUE)
+ .build());
}
private int findTimeColumnIndex(TableArgument tableArgument, String
expectedFieldName) {
@@ -115,6 +121,7 @@ public class HOPTableFunction implements TableFunction {
@Override
public TableFunctionDataProcessor getDataProcessor() {
return new HOPDataProcessor(
+ (Long) ((ScalarArgument)
arguments.get(START_PARAMETER_NAME)).getValue(),
(Long) ((ScalarArgument)
arguments.get(SLIDE_PARAMETER_NAME)).getValue() * 1000,
(Long) ((ScalarArgument)
arguments.get(SLIDE_PARAMETER_NAME)).getValue() * 1000,
requiredIndex);
@@ -127,33 +134,39 @@ public class HOPTableFunction implements TableFunction {
private final long slide;
private final long size;
private final int timeColumnIndex;
- private long startTime = Long.MIN_VALUE;
+ private long curTime;
- public HOPDataProcessor(long slide, long size, int timeColumnIndex) {
+ public HOPDataProcessor(long startTime, long slide, long size, int
timeColumnIndex) {
this.slide = slide;
this.size = size;
+ this.curTime = startTime;
this.timeColumnIndex = timeColumnIndex;
}
@Override
public void process(Record input, List<ColumnBuilder> columnBuilders) {
- long curTime = input.getLong(0);
- if (startTime == Long.MIN_VALUE) {
- startTime = curTime;
+ long timeValue = input.getLong(timeColumnIndex);
+ if (curTime == Long.MIN_VALUE) {
+ curTime = timeValue;
}
- while (curTime - startTime >= size) {
- startTime += slide;
+ if (curTime + size <= timeValue) {
+ // jump to appropriate window
+ long move = (timeValue - curTime - size) / slide + 1;
+ curTime += move * slide;
}
-
- for (int i = 0; i < input.size(); i++) {
- if (input.isNull(i)) {
- columnBuilders.get(i + 2).appendNull();
- } else {
- columnBuilders.get(i + 2).writeObject(input.getObject(i));
+ long slideTime = curTime;
+ while (slideTime <= timeValue && slideTime + size > timeValue) {
+ for (int i = 0; i < input.size(); i++) {
+ if (input.isNull(i)) {
+ columnBuilders.get(i + 2).appendNull();
+ } else {
+ columnBuilders.get(i + 2).writeObject(input.getObject(i));
+ }
}
+ columnBuilders.get(0).writeLong(slideTime);
+ columnBuilders.get(1).writeLong(slideTime + size);
+ slideTime += slide;
}
- columnBuilders.get(0).writeLong(startTime);
- columnBuilders.get(1).writeLong(startTime + size);
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/function/table/SplitFunction.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/function/table/SplitFunction.java
new file mode 100644
index 00000000000..263cfa79d7f
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/function/table/SplitFunction.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.execution.function.table;
+
+import org.apache.iotdb.udf.api.exception.UDFException;
+import org.apache.iotdb.udf.api.relational.TableFunction;
+import org.apache.iotdb.udf.api.relational.table.TableFunctionAnalysis;
+import
org.apache.iotdb.udf.api.relational.table.TableFunctionProcessorProvider;
+import org.apache.iotdb.udf.api.relational.table.argument.Argument;
+import org.apache.iotdb.udf.api.relational.table.argument.DescribedSchema;
+import org.apache.iotdb.udf.api.relational.table.argument.ScalarArgument;
+import
org.apache.iotdb.udf.api.relational.table.processor.TableFunctionLeafProcessor;
+import
org.apache.iotdb.udf.api.relational.table.specification.ParameterSpecification;
+import
org.apache.iotdb.udf.api.relational.table.specification.ScalarParameterSpecification;
+import org.apache.iotdb.udf.api.type.ColumnCategory;
+import org.apache.iotdb.udf.api.type.Type;
+
+import org.apache.tsfile.block.column.ColumnBuilder;
+import org.apache.tsfile.common.conf.TSFileConfig;
+import org.apache.tsfile.utils.Binary;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+public class SplitFunction implements TableFunction {
+ private final String INPUT_PARAMETER_NAME = "INPUT";
+ private final String SPLIT_PARAMETER_NAME = "SPLIT";
+
+ @Override
+ public List<ParameterSpecification> getArgumentsSpecification() {
+ return Arrays.asList(
+
ScalarParameterSpecification.builder().name(INPUT_PARAMETER_NAME).type(Type.STRING).build(),
+ ScalarParameterSpecification.builder()
+ .name(SPLIT_PARAMETER_NAME)
+ .type(Type.STRING)
+ .defaultValue(",")
+ .build());
+ }
+
+ @Override
+ public TableFunctionAnalysis analyze(Map<String, Argument> arguments) throws
UDFException {
+ DescribedSchema schema =
+ DescribedSchema.builder().addField("output", Type.STRING,
ColumnCategory.FIELD).build();
+ return TableFunctionAnalysis.builder().properColumnSchema(schema).build();
+ }
+
+ @Override
+ public TableFunctionProcessorProvider getProcessorProvider(Map<String,
Argument> arguments) {
+ return new TableFunctionProcessorProvider() {
+ @Override
+ public TableFunctionLeafProcessor getSplitProcessor() {
+ return new SplitProcessor(
+ (String) ((ScalarArgument)
arguments.get(INPUT_PARAMETER_NAME)).getValue(),
+ (String) ((ScalarArgument)
arguments.get(SPLIT_PARAMETER_NAME)).getValue());
+ }
+ };
+ }
+
+ private static class SplitProcessor implements TableFunctionLeafProcessor {
+ private final String input;
+ private final String split;
+ private boolean finish = false;
+
+ SplitProcessor(String input, String split) {
+ this.input = input;
+ this.split = split;
+ }
+
+ @Override
+ public void process(List<ColumnBuilder> columnBuilders) {
+ for (String s : input.split(split)) {
+ columnBuilders.get(0).writeBinary(new Binary(s,
TSFileConfig.STRING_CHARSET));
+ }
+ finish = true;
+ }
+
+ @Override
+ public boolean isFinish() {
+ return finish;
+ }
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/TableFunctionLeafOperator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/TableFunctionLeafOperator.java
new file mode 100644
index 00000000000..cd9e219c6b6
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/TableFunctionLeafOperator.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.execution.operator.process.function;
+
+import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
+import
org.apache.iotdb.db.queryengine.execution.operator.process.ProcessOperator;
+import
org.apache.iotdb.udf.api.relational.table.TableFunctionProcessorProvider;
+import
org.apache.iotdb.udf.api.relational.table.processor.TableFunctionLeafProcessor;
+
+import org.apache.tsfile.block.column.ColumnBuilder;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.read.common.block.TsBlock;
+import org.apache.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.tsfile.read.common.block.column.RunLengthEncodedColumn;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static
org.apache.iotdb.db.queryengine.execution.operator.source.relational.TableScanOperator.TIME_COLUMN_TEMPLATE;
+
+// only one input source is supported now
+public class TableFunctionLeafOperator implements ProcessOperator {
+
+ private final OperatorContext operatorContext;
+ private final TsBlockBuilder blockBuilder;
+
+ private final TableFunctionLeafProcessor processor;
+
+ public TableFunctionLeafOperator(
+ OperatorContext operatorContext,
+ TableFunctionProcessorProvider processorProvider,
+ List<TSDataType> outputDataTypes) {
+ this.operatorContext = operatorContext;
+ this.processor = processorProvider.getSplitProcessor();
+ this.blockBuilder = new TsBlockBuilder(outputDataTypes);
+ }
+
+ @Override
+ public OperatorContext getOperatorContext() {
+ return this.operatorContext;
+ }
+
+ @Override
+ public TsBlock next() throws Exception {
+ List<ColumnBuilder> columnBuilders = getOutputColumnBuilders();
+ processor.process(columnBuilders);
+ return buildTsBlock(columnBuilders);
+ }
+
+ private List<ColumnBuilder> getOutputColumnBuilders() {
+ blockBuilder.reset();
+ return Arrays.asList(blockBuilder.getValueColumnBuilders());
+ }
+
+ private TsBlock buildTsBlock(List<ColumnBuilder> columnBuilders) {
+ int positionCount = columnBuilders.get(0).getPositionCount();
+ blockBuilder.declarePositions(positionCount);
+ return blockBuilder.build(new RunLengthEncodedColumn(TIME_COLUMN_TEMPLATE,
positionCount));
+ }
+
+ @Override
+ public boolean hasNext() throws Exception {
+ return !processor.isFinish();
+ }
+
+ @Override
+ public void close() throws Exception {}
+
+ @Override
+ public boolean isFinished() throws Exception {
+ return processor.isFinish();
+ }
+
+ @Override
+ public long calculateMaxPeekMemory() {
+ return 0;
+ }
+
+ @Override
+ public long calculateMaxReturnSize() {
+ return 0;
+ }
+
+ @Override
+ public long calculateRetainedSizeAfterCallingNext() {
+ return 0;
+ }
+
+ @Override
+ public long ramBytesUsed() {
+ return 0;
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/TableFunctionOperator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/TableFunctionOperator.java
index a4e36451824..0ad55973e38 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/TableFunctionOperator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/TableFunctionOperator.java
@@ -69,7 +69,7 @@ public class TableFunctionOperator implements ProcessOperator
{
this.processorProvider = processorProvider;
this.partitionRecognizer =
new PartitionRecognizer(partitionChannels, requiredChannels,
outputDataTypes);
- this.partitionState = PartitionState.INIT_STATE;
+ this.partitionState = null;
this.blockBuilder = new TsBlockBuilder(outputDataTypes);
}
@@ -107,6 +107,8 @@ public class TableFunctionOperator implements
ProcessOperator {
@Override
public TsBlock next() throws Exception {
PartitionState.StateType stateType = partitionState.getStateType();
+ Iterator<Record> recordIterator = partitionState.getRecordIterator();
+ partitionState = null;
if (stateType == PartitionState.StateType.INIT
|| stateType == PartitionState.StateType.NEED_MORE_DATA) {
isBlocked = null;
@@ -126,7 +128,6 @@ public class TableFunctionOperator implements
ProcessOperator {
}
processor = processorProvider.getDataProcessor();
}
- Iterator<Record> recordIterator = partitionState.getRecordIterator();
while (recordIterator.hasNext()) {
processor.process(recordIterator.next(), columnBuilders);
}
@@ -147,8 +148,10 @@ public class TableFunctionOperator implements
ProcessOperator {
@Override
public boolean hasNext() throws Exception {
- isBlocked().get(); // wait for the next TsBlock
- partitionState = partitionRecognizer.getState();
+ if (partitionState == null) {
+ isBlocked().get(); // wait for the next TsBlock
+ partitionState = partitionRecognizer.getState();
+ }
return !finished;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
index a700dfb246c..01377a469ff 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
@@ -61,6 +61,7 @@ import
org.apache.iotdb.db.queryengine.execution.operator.process.fill.constant.
import
org.apache.iotdb.db.queryengine.execution.operator.process.fill.constant.FloatConstantFill;
import
org.apache.iotdb.db.queryengine.execution.operator.process.fill.constant.IntConstantFill;
import
org.apache.iotdb.db.queryengine.execution.operator.process.fill.constant.LongConstantFill;
+import
org.apache.iotdb.db.queryengine.execution.operator.process.function.TableFunctionLeafOperator;
import
org.apache.iotdb.db.queryengine.execution.operator.process.function.TableFunctionOperator;
import
org.apache.iotdb.db.queryengine.execution.operator.process.gapfill.GapFillWGroupWMoOperator;
import
org.apache.iotdb.db.queryengine.execution.operator.process.gapfill.GapFillWGroupWoMoOperator;
@@ -2252,8 +2253,19 @@ public class TableOperatorGenerator extends
PlanVisitor<Operator, LocalExecution
TableFunctionProcessorProvider processorProvider =
tableFunction.getProcessorProvider(node.getArguments());
if (node.getChildren().isEmpty()) {
- // TODO(UDF): leaf node
- throw new UnsupportedOperationException();
+ List<TSDataType> outputDataTypes =
+ node.getOutputSymbols().stream()
+ .map(context.getTypeProvider()::getTableModelType)
+ .map(InternalTypeManager::getTSDataType)
+ .collect(Collectors.toList());
+ OperatorContext operatorContext =
+ context
+ .getDriverContext()
+ .addOperatorContext(
+ context.getNextOperatorId(),
+ node.getPlanNodeId(),
+ TableFunctionLeafOperator.class.getSimpleName());
+ return new TableFunctionLeafOperator(operatorContext, processorProvider,
outputDataTypes);
} else {
Operator operator = node.getChild().accept(this, context);
OperatorContext operatorContext =
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/TableMetadataImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/TableMetadataImpl.java
index ff00e007d6f..2a556889336 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/TableMetadataImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/TableMetadataImpl.java
@@ -32,6 +32,7 @@ import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
import org.apache.iotdb.db.queryengine.common.SessionInfo;
import
org.apache.iotdb.db.queryengine.execution.function.table.ExcludeColumnFunction;
import
org.apache.iotdb.db.queryengine.execution.function.table.HOPTableFunction;
+import org.apache.iotdb.db.queryengine.execution.function.table.SplitFunction;
import org.apache.iotdb.db.queryengine.plan.analyze.ClusterPartitionFetcher;
import org.apache.iotdb.db.queryengine.plan.analyze.IPartitionFetcher;
import org.apache.iotdb.db.queryengine.plan.relational.function.OperatorType;
@@ -793,6 +794,8 @@ public class TableMetadataImpl implements Metadata {
return new HOPTableFunction();
} else if ("EXCLUDE".equalsIgnoreCase(functionName)) {
return new ExcludeColumnFunction();
+ } else if ("SPLIT".equalsIgnoreCase(functionName)) {
+ return new SplitFunction();
} else {
return null;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java
index 46c40c534a7..c987c94268c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java
@@ -815,6 +815,9 @@ public class TableDistributedPlanGenerator
@Override
public List<PlanNode> visitTableFunctionProcessor(
TableFunctionProcessorNode node, PlanContext context) {
+ if (node.getChildren().isEmpty()) {
+ return Collections.singletonList(node);
+ }
List<PlanNode> childrenNodes = node.getChild().accept(this, context);
if (childrenNodes.size() == 1) {
node.setChild(childrenNodes.get(0));