This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch ty/forecast in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit c859cf70b25a9a158407050ded9a7b3f11a6a0a3 Author: JackieTien97 <[email protected]> AuthorDate: Mon Apr 28 10:50:08 2025 +0800 finish --- .../relational/function/ForecastTableFunction.java | 35 +++++++++++++--------- 1 file changed, 21 insertions(+), 14 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/ForecastTableFunction.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/ForecastTableFunction.java index 481b5154ad5..9b8475731fa 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/ForecastTableFunction.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/ForecastTableFunction.java @@ -33,6 +33,7 @@ import org.apache.iotdb.rpc.TSStatusCode; import org.apache.iotdb.udf.api.relational.TableFunction; import org.apache.iotdb.udf.api.relational.access.Record; import org.apache.iotdb.udf.api.relational.table.TableFunctionAnalysis; +import org.apache.iotdb.udf.api.relational.table.TableFunctionHandle; import org.apache.iotdb.udf.api.relational.table.TableFunctionProcessorProvider; import org.apache.iotdb.udf.api.relational.table.argument.Argument; import org.apache.iotdb.udf.api.relational.table.argument.DescribedSchema; @@ -72,7 +73,7 @@ import static org.apache.iotdb.rpc.TSStatusCode.CAN_NOT_CONNECT_AINODE; public class ForecastTableFunction implements TableFunction { - private static class ForecastTableFunctionHandle { + private static class ForecastTableFunctionHandle implements TableFunctionHandle { TEndPoint targetAINode; String modelId; int maxInputLength; @@ -100,6 +101,7 @@ public class ForecastTableFunction implements TableFunction { this.types = types; } + @Override public byte[] serialize() { try (PublicBAOS publicBAOS = new PublicBAOS(); DataOutputStream outputStream = new DataOutputStream(publicBAOS)) { @@ -124,6 +126,7 @@ public class ForecastTableFunction implements TableFunction { } } + @Override public void deserialize(byte[] bytes) { ByteBuffer buffer = ByteBuffer.wrap(bytes); this.targetAINode = @@ -311,7 +314,6 @@ public class ForecastTableFunction implements TableFunction { String options = (String) ((ScalarArgument) arguments.get(OPTIONS_PARAMETER_NAME)).getValue(); - // TODO put functionHandle into TableFunctionAnalysis after after yanze's pr being merged ForecastTableFunctionHandle functionHandle = new ForecastTableFunctionHandle( keepInput, @@ -325,10 +327,27 @@ public class ForecastTableFunction implements TableFunction { // outputColumnSchema return TableFunctionAnalysis.builder() .properColumnSchema(properColumnSchemaBuilder.build()) + .handle(functionHandle) .requiredColumns(INPUT_PARAMETER_NAME, requiredIndexList) .build(); } + @Override + public TableFunctionHandle createTableFunctionHandle() { + return new ForecastTableFunctionHandle(); + } + + @Override + public TableFunctionProcessorProvider getProcessorProvider( + TableFunctionHandle tableFunctionHandle) { + return new TableFunctionProcessorProvider() { + @Override + public TableFunctionDataProcessor getDataProcessor() { + return new ForecastDataProcessor((ForecastTableFunctionHandle) tableFunctionHandle); + } + }; + } + private ModelInferenceDescriptor getModelInfo(String modelId) { return MODEL_FETCHER.fetchModel(modelId); } @@ -343,18 +362,6 @@ public class ForecastTableFunction implements TableFunction { } } - @Override - public TableFunctionProcessorProvider getProcessorProvider(Map<String, Argument> arguments) { - // TODO use functionHandle in parameter after yanze's pr being merged - ForecastTableFunctionHandle functionHandle = new ForecastTableFunctionHandle(); - return new TableFunctionProcessorProvider() { - @Override - public TableFunctionDataProcessor getDataProcessor() { - return new ForecastDataProcessor(functionHandle); - } - }; - } - private static Map<String, String> parseOptions(String options) { String[] optionArray = options.split(","); if (optionArray.length == 0) {
