JackieTien97 commented on code in PR #10507:
URL: https://github.com/apache/iotdb/pull/10507#discussion_r1308709659


##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ModelManager.java:
##########
@@ -63,15 +81,41 @@ public ModelInfo getModelInfo() {
   }
 
   public TSStatus createModel(TCreateModelReq req) {
-    ModelInformation modelInformation =
-        new ModelInformation(
-            req.getModelId(),
-            req.getModelTask(),
-            req.getModelType(),
-            req.isIsAuto(),
-            req.getQueryExpressions(),
-            req.getQueryFilter());
-    return configManager.getProcedureManager().createModel(modelInformation, 
req.getModelConfigs());
+    TaskType taskType = req.getTaskType();
+
+    Map<String, String> options = req.getOptions();
+    ModelInformation modelInformation;
+    if (taskType == TaskType.FORECAST) {
+      String inputTypeListStr = options.get(INPUT_TYPE_LIST);
+      List<TSDataType> inputTypeList =
+          Arrays.stream(inputTypeListStr.substring(1, 
inputTypeListStr.length() - 1).split(","))

Review Comment:
   Why we need to remove the first and last char? What's the first and last 
char?
   



##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/ModelInfo.java:
##########
@@ -119,35 +124,58 @@ public ModelTableResp showModel(ShowModelPlan plan) {
     }
   }
 
-  public TrailTableResp showTrail(ShowTrailPlan plan) {
+  public TrialTableResp showTrial(ShowTrialPlan plan) {
     acquireModelTableLock();

Review Comment:
   use readwrite lock instead of `ReentrantLock`.



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java:
##########
@@ -698,6 +698,9 @@ public class IoTDBConfig {
   /** The number of threads in the thread pool that execute insert-tablet 
tasks. */
   private int intoOperationExecutionThreadCount = 2;
 
+  /** The number of threads in the thread pool that execute model inference 
tasks. */
+  private int modelInferenceExecutionThreadCount = 10;

Review Comment:
   put this config in iotdb-common.properties



##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/model/DropModelProcedure.java:
##########
@@ -145,6 +149,10 @@ protected Flow executeFromState(ConfigNodeProcedureEnv 
env, DropModelState state
         case CONFIG_NODE_DROPPED:
           
env.getConfigManager().getModelManager().getModelInfo().releaseModelTableLock();

Review Comment:
   We should never try to lock and unlock in different phase of one procedure, 
because procedure may be restarted in a differernt CN.



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java:
##########
@@ -243,6 +252,11 @@ public Analysis visitQuery(QueryStatement queryStatement, 
MPPQueryContext contex
     try {
       // check for semantic errors
       queryStatement.semanticCheck();
+      analysis.setStatement(queryStatement);

Review Comment:
   What's this for?



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java:
##########
@@ -341,6 +355,46 @@ public Analysis visitQuery(QueryStatement queryStatement, 
MPPQueryContext contex
     return analysis;
   }
 
+  private void analyzeModelInference(Analysis analysis, QueryStatement 
queryStatement) {
+    FunctionExpression modelInferenceExpression =
+        (FunctionExpression)
+            
queryStatement.getSelectComponent().getResultColumns().get(0).getExpression();
+    String modelId = 
modelInferenceExpression.getFunctionAttributes().get(MODEL_ID);
+
+    ModelInformation modelInformation = 
partitionFetcher.getModelInformation(modelId);
+    if (modelInformation == null || !modelInformation.available()) {
+      throw new SemanticException("Model " + modelId + " is not available");
+    }
+
+    ModelInferenceFunction functionType =
+        
ModelInferenceFunction.valueOf(modelInferenceExpression.getFunctionName().toUpperCase());
+    if (functionType == FORECAST) {
+      ForecastModelInferenceDescriptor modelInferenceDescriptor =
+          new ForecastModelInferenceDescriptor(
+              functionType, (ForecastModeInformation) modelInformation);
+      Map<String, String> modelInferenceAttributes =
+          modelInferenceExpression.getFunctionAttributes();
+      if (modelInferenceAttributes.containsKey("predict_length")) {
+        modelInferenceDescriptor.setExpectedPredictLength(
+            Integer.parseInt(modelInferenceAttributes.get("predict_length")));
+      }
+      analysis.setModelInferenceDescriptor(modelInferenceDescriptor);
+
+      List<ResultColumn> newResultColumns = new ArrayList<>();
+      for (Expression inputExpression : 
modelInferenceExpression.getExpressions()) {
+        newResultColumns.add(new ResultColumn(inputExpression));
+      }
+      queryStatement.getSelectComponent().setResultColumns(newResultColumns);
+
+      OrderByComponent descTimeOrder = new OrderByComponent();
+      descTimeOrder.addSortItem(new SortItem("TIME", Ordering.DESC));
+      queryStatement.setOrderByComponent(descTimeOrder);

Review Comment:
   Why we need to order by time desc?



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/ml/ForecastOperator.java:
##########
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.execution.operator.process.ml;
+
+import org.apache.iotdb.commons.conf.CommonDescriptor;
+import org.apache.iotdb.db.exception.ModelInferenceProcessException;
+import org.apache.iotdb.db.protocol.client.MLNodeClient;
+import org.apache.iotdb.db.queryengine.execution.operator.Operator;
+import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
+import 
org.apache.iotdb.db.queryengine.execution.operator.process.ProcessOperator;
+import org.apache.iotdb.mlnode.rpc.thrift.TForecastResp;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.TsBlockSerde;
+
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+
+import static com.google.common.util.concurrent.Futures.successfulAsList;
+
+public class ForecastOperator implements ProcessOperator {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ForecastOperator.class);
+
+  private final OperatorContext operatorContext;
+  private final Operator child;
+
+  private final String modelPath;
+  private final List<TSDataType> inputTypeList;
+  private final List<String> inputColumnNameList;
+  private final int expectedPredictLength;
+
+  private final TsBlockBuilder inputTsBlockBuilder;
+
+  private MLNodeClient client;
+  private final ExecutorService modelInferenceExecutor;
+  private ListenableFuture<TForecastResp> forecastExecutionFuture;
+
+  private boolean finished = false;
+
+  private final long maxRetainedSize;
+  private final long maxReturnSize;
+
+  public ForecastOperator(
+      OperatorContext operatorContext,
+      Operator child,
+      String modelPath,
+      List<TSDataType> inputTypeList,
+      List<String> inputColumnNameList,
+      int expectedPredictLength,
+      ExecutorService modelInferenceExecutor,
+      long maxRetainedSize,
+      long maxReturnSize) {
+    this.operatorContext = operatorContext;
+    this.child = child;
+    this.modelPath = modelPath;
+    this.inputTypeList = inputTypeList;
+    this.inputColumnNameList = inputColumnNameList;
+    this.expectedPredictLength = expectedPredictLength;
+    this.inputTsBlockBuilder = new TsBlockBuilder(inputTypeList);
+    this.modelInferenceExecutor = modelInferenceExecutor;
+    this.maxRetainedSize = maxRetainedSize;
+    this.maxReturnSize = maxReturnSize;
+  }
+
+  @Override
+  public OperatorContext getOperatorContext() {
+    return operatorContext;
+  }
+
+  @Override
+  public ListenableFuture<?> isBlocked() {
+    ListenableFuture<?> childBlocked = child.isBlocked();
+    boolean executionDone = forecastExecutionDone();
+    if (executionDone && childBlocked.isDone()) {
+      return NOT_BLOCKED;
+    } else if (childBlocked.isDone()) {
+      return forecastExecutionFuture;
+    } else if (executionDone) {
+      return childBlocked;
+    } else {
+      return successfulAsList(Arrays.asList(forecastExecutionFuture, 
childBlocked));
+    }
+  }
+
+  private boolean forecastExecutionDone() {
+    if (forecastExecutionFuture == null) {
+      return true;
+    }
+    return forecastExecutionFuture.isDone();
+  }
+
+  @Override
+  public boolean hasNext() throws Exception {
+    return !finished;
+  }
+
+  @Override
+  public TsBlock next() throws Exception {
+    if (forecastExecutionFuture == null) {
+      if (child.hasNextWithTimer()) {
+        TsBlock inputTsBlock = child.nextWithTimer();
+        if (inputTsBlock != null) {
+          appendTsBlockToBuilder(inputTsBlock);
+        }
+      } else {
+        submitForecastTask();
+      }
+      return null;
+    } else {
+      try {
+        if (!forecastExecutionFuture.isDone()) {
+          throw new IllegalStateException(
+              "The operator cannot continue until the forecast execution is 
done.");
+        }
+
+        TForecastResp forecastResp = forecastExecutionFuture.get();
+        if (forecastResp.getStatus().getCode() != 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+          String message =
+              String.format(
+                  "Error occurred while executing forecast: %s",
+                  forecastResp.getStatus().getMessage());
+          throw new ModelInferenceProcessException(message);
+        }
+
+        finished = true;
+        TsBlock resultTsBlock =
+            new 
TsBlockSerde().deserialize(forecastResp.bufferForForecastResult());
+        resultTsBlock = modifyTimeColumn(resultTsBlock);
+        return resultTsBlock;
+      } catch (InterruptedException e) {
+        LOGGER.warn(
+            "{}: interrupted when processing write operation future with 
exception {}", this, e);

Review Comment:
   ```suggestion
           LOGGER.warn(
               "{}: interrupted when processing write operation future", this, 
e);
   ```



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/ml/ForecastOperator.java:
##########
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.execution.operator.process.ml;
+
+import org.apache.iotdb.commons.conf.CommonDescriptor;
+import org.apache.iotdb.db.exception.ModelInferenceProcessException;
+import org.apache.iotdb.db.protocol.client.MLNodeClient;
+import org.apache.iotdb.db.queryengine.execution.operator.Operator;
+import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
+import 
org.apache.iotdb.db.queryengine.execution.operator.process.ProcessOperator;
+import org.apache.iotdb.mlnode.rpc.thrift.TForecastResp;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.TsBlockSerde;
+
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+
+import static com.google.common.util.concurrent.Futures.successfulAsList;
+
+public class ForecastOperator implements ProcessOperator {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ForecastOperator.class);
+
+  private final OperatorContext operatorContext;
+  private final Operator child;
+
+  private final String modelPath;
+  private final List<TSDataType> inputTypeList;
+  private final List<String> inputColumnNameList;
+  private final int expectedPredictLength;
+
+  private final TsBlockBuilder inputTsBlockBuilder;
+
+  private MLNodeClient client;
+  private final ExecutorService modelInferenceExecutor;
+  private ListenableFuture<TForecastResp> forecastExecutionFuture;
+
+  private boolean finished = false;
+
+  private final long maxRetainedSize;
+  private final long maxReturnSize;
+
+  public ForecastOperator(
+      OperatorContext operatorContext,
+      Operator child,
+      String modelPath,
+      List<TSDataType> inputTypeList,
+      List<String> inputColumnNameList,
+      int expectedPredictLength,
+      ExecutorService modelInferenceExecutor,
+      long maxRetainedSize,
+      long maxReturnSize) {
+    this.operatorContext = operatorContext;
+    this.child = child;
+    this.modelPath = modelPath;
+    this.inputTypeList = inputTypeList;
+    this.inputColumnNameList = inputColumnNameList;
+    this.expectedPredictLength = expectedPredictLength;
+    this.inputTsBlockBuilder = new TsBlockBuilder(inputTypeList);
+    this.modelInferenceExecutor = modelInferenceExecutor;
+    this.maxRetainedSize = maxRetainedSize;
+    this.maxReturnSize = maxReturnSize;
+  }
+
+  @Override
+  public OperatorContext getOperatorContext() {
+    return operatorContext;
+  }
+
+  @Override
+  public ListenableFuture<?> isBlocked() {
+    ListenableFuture<?> childBlocked = child.isBlocked();
+    boolean executionDone = forecastExecutionDone();
+    if (executionDone && childBlocked.isDone()) {
+      return NOT_BLOCKED;
+    } else if (childBlocked.isDone()) {
+      return forecastExecutionFuture;
+    } else if (executionDone) {
+      return childBlocked;
+    } else {
+      return successfulAsList(Arrays.asList(forecastExecutionFuture, 
childBlocked));
+    }
+  }
+
+  private boolean forecastExecutionDone() {
+    if (forecastExecutionFuture == null) {
+      return true;
+    }
+    return forecastExecutionFuture.isDone();
+  }
+
+  @Override
+  public boolean hasNext() throws Exception {
+    return !finished;
+  }
+
+  @Override
+  public TsBlock next() throws Exception {
+    if (forecastExecutionFuture == null) {
+      if (child.hasNextWithTimer()) {
+        TsBlock inputTsBlock = child.nextWithTimer();
+        if (inputTsBlock != null) {
+          appendTsBlockToBuilder(inputTsBlock);
+        }
+      } else {
+        submitForecastTask();
+      }
+      return null;
+    } else {
+      try {
+        if (!forecastExecutionFuture.isDone()) {
+          throw new IllegalStateException(
+              "The operator cannot continue until the forecast execution is 
done.");
+        }
+
+        TForecastResp forecastResp = forecastExecutionFuture.get();
+        if (forecastResp.getStatus().getCode() != 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+          String message =
+              String.format(
+                  "Error occurred while executing forecast: %s",
+                  forecastResp.getStatus().getMessage());
+          throw new ModelInferenceProcessException(message);
+        }
+
+        finished = true;
+        TsBlock resultTsBlock =
+            new 
TsBlockSerde().deserialize(forecastResp.bufferForForecastResult());
+        resultTsBlock = modifyTimeColumn(resultTsBlock);
+        return resultTsBlock;
+      } catch (InterruptedException e) {
+        LOGGER.warn(
+            "{}: interrupted when processing write operation future with 
exception {}", this, e);
+        Thread.currentThread().interrupt();
+        throw new ModelInferenceProcessException(e.getMessage());

Review Comment:
   Either log this exception and handle it, or rethrow it with some contextual 
information.



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/component/ResultColumn.java:
##########
@@ -68,16 +69,14 @@ public class ResultColumn extends StatementNode {
 
   private final ColumnType columnType;
 
-  public ResultColumn(Expression expression, String alias, ColumnType 
columnType) {
+  public ResultColumn(Expression expression, String alias) {
     this.expression = expression;
     this.alias = alias;
-    this.columnType = columnType;
+    this.columnType = ExpressionAnalyzer.identifyOutputColumnType(expression, 
true);

Review Comment:
   Why do this change?



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/ml/ForecastOperator.java:
##########
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.execution.operator.process.ml;
+
+import org.apache.iotdb.commons.conf.CommonDescriptor;
+import org.apache.iotdb.db.exception.ModelInferenceProcessException;
+import org.apache.iotdb.db.protocol.client.MLNodeClient;
+import org.apache.iotdb.db.queryengine.execution.operator.Operator;
+import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
+import 
org.apache.iotdb.db.queryengine.execution.operator.process.ProcessOperator;
+import org.apache.iotdb.mlnode.rpc.thrift.TForecastResp;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.TsBlockSerde;
+
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+
+import static com.google.common.util.concurrent.Futures.successfulAsList;
+
+public class ForecastOperator implements ProcessOperator {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ForecastOperator.class);
+
+  private final OperatorContext operatorContext;
+  private final Operator child;
+
+  private final String modelPath;
+  private final List<TSDataType> inputTypeList;
+  private final List<String> inputColumnNameList;
+  private final int expectedPredictLength;
+
+  private final TsBlockBuilder inputTsBlockBuilder;
+
+  private MLNodeClient client;
+  private final ExecutorService modelInferenceExecutor;
+  private ListenableFuture<TForecastResp> forecastExecutionFuture;
+
+  private boolean finished = false;
+
+  private final long maxRetainedSize;
+  private final long maxReturnSize;
+
+  public ForecastOperator(
+      OperatorContext operatorContext,
+      Operator child,
+      String modelPath,
+      List<TSDataType> inputTypeList,
+      List<String> inputColumnNameList,
+      int expectedPredictLength,
+      ExecutorService modelInferenceExecutor,
+      long maxRetainedSize,
+      long maxReturnSize) {
+    this.operatorContext = operatorContext;
+    this.child = child;
+    this.modelPath = modelPath;
+    this.inputTypeList = inputTypeList;
+    this.inputColumnNameList = inputColumnNameList;
+    this.expectedPredictLength = expectedPredictLength;
+    this.inputTsBlockBuilder = new TsBlockBuilder(inputTypeList);
+    this.modelInferenceExecutor = modelInferenceExecutor;
+    this.maxRetainedSize = maxRetainedSize;
+    this.maxReturnSize = maxReturnSize;
+  }
+
+  @Override
+  public OperatorContext getOperatorContext() {
+    return operatorContext;
+  }
+
+  @Override
+  public ListenableFuture<?> isBlocked() {
+    ListenableFuture<?> childBlocked = child.isBlocked();
+    boolean executionDone = forecastExecutionDone();
+    if (executionDone && childBlocked.isDone()) {
+      return NOT_BLOCKED;
+    } else if (childBlocked.isDone()) {
+      return forecastExecutionFuture;
+    } else if (executionDone) {
+      return childBlocked;
+    } else {
+      return successfulAsList(Arrays.asList(forecastExecutionFuture, 
childBlocked));
+    }
+  }
+
+  private boolean forecastExecutionDone() {
+    if (forecastExecutionFuture == null) {
+      return true;
+    }
+    return forecastExecutionFuture.isDone();
+  }
+
+  @Override
+  public boolean hasNext() throws Exception {
+    return !finished;
+  }
+
+  @Override
+  public TsBlock next() throws Exception {
+    if (forecastExecutionFuture == null) {
+      if (child.hasNextWithTimer()) {
+        TsBlock inputTsBlock = child.nextWithTimer();
+        if (inputTsBlock != null) {
+          appendTsBlockToBuilder(inputTsBlock);
+        }
+      } else {
+        submitForecastTask();
+      }
+      return null;
+    } else {
+      try {
+        if (!forecastExecutionFuture.isDone()) {
+          throw new IllegalStateException(
+              "The operator cannot continue until the forecast execution is 
done.");
+        }
+
+        TForecastResp forecastResp = forecastExecutionFuture.get();
+        if (forecastResp.getStatus().getCode() != 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+          String message =
+              String.format(
+                  "Error occurred while executing forecast: %s",
+                  forecastResp.getStatus().getMessage());
+          throw new ModelInferenceProcessException(message);
+        }
+
+        finished = true;
+        TsBlock resultTsBlock =
+            new 
TsBlockSerde().deserialize(forecastResp.bufferForForecastResult());
+        resultTsBlock = modifyTimeColumn(resultTsBlock);
+        return resultTsBlock;
+      } catch (InterruptedException e) {
+        LOGGER.warn(
+            "{}: interrupted when processing write operation future with 
exception {}", this, e);
+        Thread.currentThread().interrupt();
+        throw new ModelInferenceProcessException(e.getMessage());
+      } catch (ExecutionException e) {
+        throw new ModelInferenceProcessException(e.getMessage());
+      }
+    }
+  }
+
+  private TsBlock modifyTimeColumn(TsBlock resultTsBlock) {
+    long delta =
+        
CommonDescriptor.getInstance().getConfig().getTimestampPrecision().equals("ms")
+            ? 1_000_000L
+            : 1_000L;
+
+    TsBlockBuilder newTsBlockBuilder = 
TsBlockBuilder.createWithOnlyTimeColumn();
+    TimeColumnBuilder timeColumnBuilder = 
newTsBlockBuilder.getTimeColumnBuilder();
+    for (int i = 0; i < resultTsBlock.getPositionCount(); i++) {
+      timeColumnBuilder.writeLong(resultTsBlock.getTimeByIndex(i) / delta);
+      newTsBlockBuilder.declarePosition();
+    }
+    return 
newTsBlockBuilder.build().appendValueColumns(resultTsBlock.getValueColumns());
+  }
+
+  private void appendTsBlockToBuilder(TsBlock inputTsBlock) {
+    TimeColumnBuilder timeColumnBuilder = 
inputTsBlockBuilder.getTimeColumnBuilder();
+    ColumnBuilder[] columnBuilders = 
inputTsBlockBuilder.getValueColumnBuilders();
+
+    for (int i = 0; i < inputTsBlock.getPositionCount(); i++) {
+      timeColumnBuilder.writeLong(inputTsBlock.getTimeByIndex(i));
+      for (int columnIndex = 0; columnIndex < 
inputTsBlock.getValueColumnCount(); columnIndex++) {
+        columnBuilders[columnIndex].write(inputTsBlock.getColumn(columnIndex), 
i);
+      }
+      inputTsBlockBuilder.declarePosition();
+    }
+  }
+
+  private void submitForecastTask() {
+    try {
+      if (client == null) {
+        client = new MLNodeClient();

Review Comment:
   should get from a client pool  instead of newing one each time.



##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/model/CreateModelProcedure.java:
##########
@@ -124,6 +125,10 @@ protected Flow executeFromState(ConfigNodeProcedureEnv 
env, CreateModelState sta
         case ML_NODE_ACTIVE:
           
env.getConfigManager().getModelManager().getModelInfo().releaseModelTableLock();

Review Comment:
   We should never try to lock and unlock in different phase of one procedure, 
because procedure may be restarted in a differernt CN.



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java:
##########
@@ -78,6 +78,7 @@ public class FragmentInstanceManager {
   private final CounterStat failedInstances = new CounterStat();
 
   private final ExecutorService intoOperationExecutor;
+  private final ExecutorService modelInferenceExecutor;

Review Comment:
   Why we need this threadPool. `IntoOperator` use threadPool to avoid 
DeadLock, it seems that you won't have dead lock in `ForecastOperator`.



##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ModelManager.java:
##########
@@ -63,15 +81,41 @@ public ModelInfo getModelInfo() {
   }
 
   public TSStatus createModel(TCreateModelReq req) {
-    ModelInformation modelInformation =
-        new ModelInformation(
-            req.getModelId(),
-            req.getModelTask(),
-            req.getModelType(),
-            req.isIsAuto(),
-            req.getQueryExpressions(),
-            req.getQueryFilter());
-    return configManager.getProcedureManager().createModel(modelInformation, 
req.getModelConfigs());
+    TaskType taskType = req.getTaskType();
+
+    Map<String, String> options = req.getOptions();
+    ModelInformation modelInformation;
+    if (taskType == TaskType.FORECAST) {
+      String inputTypeListStr = options.get(INPUT_TYPE_LIST);
+      List<TSDataType> inputTypeList =
+          Arrays.stream(inputTypeListStr.substring(1, 
inputTypeListStr.length() - 1).split(","))
+              .sequential()
+              .map(s -> TSDataType.valueOf(s.toUpperCase().trim()))
+              .collect(Collectors.toList());
+
+      String predictIndexListStr = options.get(PREDICT_INDEX_LIST);
+      List<Integer> predictIndexList =
+          Arrays.stream(
+                  predictIndexListStr.substring(1, 
predictIndexListStr.length() - 1).split(","))
+              .sequential()

Review Comment:
   same as above



##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ModelManager.java:
##########
@@ -63,15 +81,41 @@ public ModelInfo getModelInfo() {
   }
 
   public TSStatus createModel(TCreateModelReq req) {
-    ModelInformation modelInformation =
-        new ModelInformation(
-            req.getModelId(),
-            req.getModelTask(),
-            req.getModelType(),
-            req.isIsAuto(),
-            req.getQueryExpressions(),
-            req.getQueryFilter());
-    return configManager.getProcedureManager().createModel(modelInformation, 
req.getModelConfigs());
+    TaskType taskType = req.getTaskType();
+
+    Map<String, String> options = req.getOptions();
+    ModelInformation modelInformation;
+    if (taskType == TaskType.FORECAST) {
+      String inputTypeListStr = options.get(INPUT_TYPE_LIST);
+      List<TSDataType> inputTypeList =
+          Arrays.stream(inputTypeListStr.substring(1, 
inputTypeListStr.length() - 1).split(","))
+              .sequential()

Review Comment:
   ```suggestion
   ```
   no need to call this.



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ConcatPathRewriter.java:
##########
@@ -119,9 +119,7 @@ private List<ResultColumn> concatSelectWithFrom(
           ExpressionAnalyzer.concatExpressionWithSuffixPaths(
               resultColumn.getExpression(), prefixPaths, patternTree);
       for (Expression resultExpression : resultExpressions) {
-        resultColumns.add(
-            new ResultColumn(
-                resultExpression, resultColumn.getAlias(), 
resultColumn.getColumnType()));
+        resultColumns.add(new ResultColumn(resultExpression, 
resultColumn.getAlias()));

Review Comment:
   Why we can't directly use the `ColumnType` in `resultColumn`?



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java:
##########
@@ -341,6 +355,46 @@ public Analysis visitQuery(QueryStatement queryStatement, 
MPPQueryContext contex
     return analysis;
   }
 
+  private void analyzeModelInference(Analysis analysis, QueryStatement 
queryStatement) {
+    FunctionExpression modelInferenceExpression =
+        (FunctionExpression)
+            
queryStatement.getSelectComponent().getResultColumns().get(0).getExpression();
+    String modelId = 
modelInferenceExpression.getFunctionAttributes().get(MODEL_ID);
+
+    ModelInformation modelInformation = 
partitionFetcher.getModelInformation(modelId);

Review Comment:
   `getModelInformation` should not be put into `partitionFetcher`. 
`IPartitionFetcher` is only responsible for fetching data or schema partition. 
You should add another `IModelFetcher`



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/IPartitionFetcher.java:
##########
@@ -88,4 +89,6 @@ SchemaNodeManagementPartition 
getSchemaNodeManagementPartitionWithLevel(
 
   /** Invalid all partition cache */
   void invalidAllCache();
+
+  ModelInformation getModelInformation(String modelId);

Review Comment:
   move it into another `IModelFetcher`.



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/model/ShowModelsTask.java:
##########
@@ -79,16 +78,15 @@ public static void buildTsBlock(
       builder.getColumnBuilder(2).writeBinary(Binary.valueOf(modelType));
       builder.getColumnBuilder(3).writeBinary(Binary.valueOf(queryBody));
       builder.getColumnBuilder(4).writeBinary(Binary.valueOf(trainingState));
-      builder.getColumnBuilder(5).writeBinary(Binary.valueOf(modelPath));
-      
builder.getColumnBuilder(6).writeBinary(Binary.valueOf(modelHyperparameter.get(0)));
+      
builder.getColumnBuilder(5).writeBinary(Binary.valueOf(modelHyperparameter.get(0)));
       builder.declarePosition();
 
       for (int i = 1; i < listSize; i++) {
         builder.getTimeColumnBuilder().writeLong(0L);
-        for (int columnIndex = 0; columnIndex <= 5; columnIndex++) {
+        for (int columnIndex = 0; columnIndex <= 4; columnIndex++) {

Review Comment:
   Avoid using this magic number `4`.



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ClusterPartitionFetcher.java:
##########
@@ -293,6 +296,28 @@ public void invalidAllCache() {
     partitionCache.invalidAllCache();
   }
 
+  @Override
+  public ModelInformation getModelInformation(String modelId) {
+    try (ConfigNodeClient client =
+        configNodeClientManager.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) 
{
+      TGetModelInfoResp getModelInfoResp = client.getModelInfo(new 
TGetModelInfoReq(modelId));
+      if (getModelInfoResp.getStatus().getCode() == 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        if (getModelInfoResp.modelInfo != null) {
+          return ModelInformation.deserialize(getModelInfoResp.modelInfo);
+        } else {
+          return null;
+        }
+      } else {
+        throw new StatementAnalyzeException(
+            "An error occurred when executing getModelInformation():"
+                + getModelInfoResp.getStatus().getMessage());
+      }
+    } catch (ClientManagerException | TException e) {
+      throw new StatementAnalyzeException(
+          "An error occurred when executing getModelInformation():" + 
e.getMessage());
+    }
+  }
+

Review Comment:
   put it into another `ModelFetcher`



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java:
##########
@@ -214,6 +218,22 @@ public PlanNode visitQuery(QueryStatement queryStatement, 
MPPQueryContext contex
             .planOffset(queryStatement.getRowOffset())
             .planLimit(queryStatement.getRowLimit());
 
+    if (queryStatement.isModelInferenceQuery()) {
+      ModelInferenceDescriptor modelInferenceDescriptor = 
analysis.getModelInferenceDescriptor();
+      if (Objects.requireNonNull(modelInferenceDescriptor.getFunctionType())
+          == ModelInferenceFunction.FORECAST) {
+        ForecastModelInferenceDescriptor forecastModelInferenceDescriptor =
+            (ForecastModelInferenceDescriptor) modelInferenceDescriptor;
+        planBuilder
+            .planLimit(forecastModelInferenceDescriptor.getModelInputLength())

Review Comment:
   Why plan this limit node. does forecast only output one line result?



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ExpressionTypeAnalyzer.java:
##########
@@ -269,6 +269,8 @@ public TSDataType 
visitFunctionExpression(FunctionExpression functionExpression,
             functionExpression,
             TypeInferenceUtils.getBuiltInScalarFunctionDataType(
                 functionExpression, 
expressionTypes.get(NodeRef.of(inputExpressions.get(0)))));
+      } else if (functionExpression.isModelInferenceFunction()) {
+        return setExpressionType(functionExpression, TSDataType.DOUBLE);

Review Comment:
   only support `DOUBLE`? What if the input type is float or int32 or int64, 
does the forecast results are still double?



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/ml/ForecastOperator.java:
##########
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.execution.operator.process.ml;
+
+import org.apache.iotdb.commons.conf.CommonDescriptor;
+import org.apache.iotdb.db.exception.ModelInferenceProcessException;
+import org.apache.iotdb.db.protocol.client.MLNodeClient;
+import org.apache.iotdb.db.queryengine.execution.operator.Operator;
+import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
+import 
org.apache.iotdb.db.queryengine.execution.operator.process.ProcessOperator;
+import org.apache.iotdb.mlnode.rpc.thrift.TForecastResp;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.TsBlockSerde;
+
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+
+import static com.google.common.util.concurrent.Futures.successfulAsList;
+
+public class ForecastOperator implements ProcessOperator {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ForecastOperator.class);
+
+  private final OperatorContext operatorContext;
+  private final Operator child;
+
+  private final String modelPath;
+  private final List<TSDataType> inputTypeList;
+  private final List<String> inputColumnNameList;
+  private final int expectedPredictLength;
+
+  private final TsBlockBuilder inputTsBlockBuilder;
+
+  private MLNodeClient client;
+  private final ExecutorService modelInferenceExecutor;
+  private ListenableFuture<TForecastResp> forecastExecutionFuture;
+
+  private boolean finished = false;
+
+  private final long maxRetainedSize;
+  private final long maxReturnSize;
+
+  public ForecastOperator(
+      OperatorContext operatorContext,
+      Operator child,
+      String modelPath,
+      List<TSDataType> inputTypeList,
+      List<String> inputColumnNameList,
+      int expectedPredictLength,
+      ExecutorService modelInferenceExecutor,
+      long maxRetainedSize,
+      long maxReturnSize) {
+    this.operatorContext = operatorContext;
+    this.child = child;
+    this.modelPath = modelPath;
+    this.inputTypeList = inputTypeList;
+    this.inputColumnNameList = inputColumnNameList;
+    this.expectedPredictLength = expectedPredictLength;
+    this.inputTsBlockBuilder = new TsBlockBuilder(inputTypeList);
+    this.modelInferenceExecutor = modelInferenceExecutor;
+    this.maxRetainedSize = maxRetainedSize;
+    this.maxReturnSize = maxReturnSize;
+  }
+
+  @Override
+  public OperatorContext getOperatorContext() {
+    return operatorContext;
+  }
+
+  @Override
+  public ListenableFuture<?> isBlocked() {
+    ListenableFuture<?> childBlocked = child.isBlocked();
+    boolean executionDone = forecastExecutionDone();
+    if (executionDone && childBlocked.isDone()) {
+      return NOT_BLOCKED;
+    } else if (childBlocked.isDone()) {
+      return forecastExecutionFuture;
+    } else if (executionDone) {
+      return childBlocked;
+    } else {
+      return successfulAsList(Arrays.asList(forecastExecutionFuture, 
childBlocked));
+    }
+  }
+
+  private boolean forecastExecutionDone() {
+    if (forecastExecutionFuture == null) {
+      return true;
+    }
+    return forecastExecutionFuture.isDone();
+  }
+
+  @Override
+  public boolean hasNext() throws Exception {
+    return !finished;
+  }
+
+  @Override
+  public TsBlock next() throws Exception {
+    if (forecastExecutionFuture == null) {
+      if (child.hasNextWithTimer()) {
+        TsBlock inputTsBlock = child.nextWithTimer();
+        if (inputTsBlock != null) {
+          appendTsBlockToBuilder(inputTsBlock);
+        }
+      } else {
+        submitForecastTask();
+      }
+      return null;
+    } else {
+      try {
+        if (!forecastExecutionFuture.isDone()) {
+          throw new IllegalStateException(
+              "The operator cannot continue until the forecast execution is 
done.");
+        }
+
+        TForecastResp forecastResp = forecastExecutionFuture.get();
+        if (forecastResp.getStatus().getCode() != 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+          String message =
+              String.format(
+                  "Error occurred while executing forecast: %s",
+                  forecastResp.getStatus().getMessage());
+          throw new ModelInferenceProcessException(message);
+        }
+
+        finished = true;
+        TsBlock resultTsBlock =
+            new 
TsBlockSerde().deserialize(forecastResp.bufferForForecastResult());
+        resultTsBlock = modifyTimeColumn(resultTsBlock);
+        return resultTsBlock;
+      } catch (InterruptedException e) {
+        LOGGER.warn(
+            "{}: interrupted when processing write operation future with 
exception {}", this, e);
+        Thread.currentThread().interrupt();
+        throw new ModelInferenceProcessException(e.getMessage());
+      } catch (ExecutionException e) {
+        throw new ModelInferenceProcessException(e.getMessage());
+      }
+    }
+  }
+
+  private TsBlock modifyTimeColumn(TsBlock resultTsBlock) {
+    long delta =
+        
CommonDescriptor.getInstance().getConfig().getTimestampPrecision().equals("ms")
+            ? 1_000_000L
+            : 1_000L;

Review Comment:
   We've got not only `ms` and `us`, but also `ns`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to