This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch TableModelIngestion
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/TableModelIngestion by this 
push:
     new 419aa960059 add schema execution adaption
419aa960059 is described below

commit 419aa96005943663065c0ba1a38713a05f41a907
Author: jt2594838 <[email protected]>
AuthorDate: Tue Jun 11 18:40:36 2024 +0800

    add schema execution adaption
---
 .../protocol/thrift/impl/ClientRPCServiceImpl.java |   2 +-
 .../iotdb/db/queryengine/plan/Coordinator.java     |  47 ++++++
 .../db/queryengine/plan/analyze/AnalyzeUtils.java  | 168 +++++++++++++++++++++
 .../queryengine/plan/analyze/AnalyzeVisitor.java   | 148 ++++--------------
 .../db/queryengine/plan/analyze/IAnalysis.java     |   8 +
 .../plan/analyze/schema/ISchemaAutoCreation.java   |   3 +-
 .../plan/analyze/schema/SchemaValidator.java       |  14 ++
 .../plan/parser/StatementGenerator.java            |  14 +-
 .../relational/analyzer/StatementAnalyzer.java     |  59 +++++++-
 .../plan/relational/metadata/Metadata.java         |  17 +++
 .../plan/relational/sql/ast/AstVisitor.java        |   4 +
 .../plan/relational/sql/ast/InsertTablet.java      |  94 +++++++++++-
 .../relational/sql/ast/WrappedInsertStatement.java |  89 +++++++++++
 .../plan/relational/sql/ast/WrappedStatement.java  |  18 ++-
 .../db/queryengine/plan/statement/Statement.java   |   4 +-
 .../plan/statement/crud/InsertTabletStatement.java |  51 +++++++
 .../thrift-datanode/src/main/thrift/client.thrift  |   1 +
 17 files changed, 606 insertions(+), 135 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
index 5a35238718d..08236745e6f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
@@ -2072,7 +2072,7 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
       long queryId = SESSION_MANAGER.requestQueryId();
       ExecutionResult result;
       if (statement.isWriteToTable()) {
-        result = 
COORDINATOR.executeForTableModel(statement.toRelationalStatement(),
+        result = COORDINATOR.executeForTableModel(statement,
             relationSqlParser, clientSession,
             queryId,
             SESSION_MANAGER.getSessionInfo(clientSession), "", metadata,
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
index 0ace106a035..fe20273f183 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
@@ -256,6 +256,53 @@ public class Coordinator {
                 startTime)));
   }
 
+  public ExecutionResult executeForTableModel(
+      Statement statement,
+      SqlParser sqlParser,
+      IClientSession clientSession,
+      long queryId,
+      SessionInfo session,
+      String sql,
+      Metadata metadata,
+      long timeOut) {
+    return execution(
+        queryId,
+        session,
+        sql,
+        ((queryContext, startTime) ->
+            createQueryExecutionForTableModel(
+                statement,
+                sqlParser,
+                clientSession,
+                queryContext,
+                metadata,
+                timeOut > 0 ? timeOut : CONFIG.getQueryTimeoutThreshold(),
+                startTime)));
+  }
+
+  private IQueryExecution createQueryExecutionForTableModel(
+      Statement statement,
+      SqlParser sqlParser,
+      IClientSession clientSession,
+      MPPQueryContext queryContext,
+      Metadata metadata,
+      long timeOut,
+      long startTime) {
+    queryContext.setTimeOut(timeOut);
+    queryContext.setStartTime(startTime);
+    RelationalModelPlanner relationalModelPlanner =
+        new RelationalModelPlanner(
+            statement.toRelationalStatement(queryContext),
+            sqlParser,
+            metadata,
+            executor,
+            writeOperationExecutor,
+            scheduledExecutor,
+            SYNC_INTERNAL_SERVICE_CLIENT_MANAGER,
+            ASYNC_INTERNAL_SERVICE_CLIENT_MANAGER);
+    return new QueryExecution(relationalModelPlanner, queryContext, executor);
+  }
+
   private IQueryExecution createQueryExecutionForTableModel(
       org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Statement 
statement,
       SqlParser sqlParser,
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeUtils.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeUtils.java
new file mode 100644
index 00000000000..af861730016
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeUtils.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.analyze;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
+import org.apache.iotdb.commons.exception.IoTDBException;
+import org.apache.iotdb.commons.partition.DataPartition;
+import org.apache.iotdb.commons.partition.DataPartitionQueryParam;
+import org.apache.iotdb.commons.service.metric.PerformanceOverviewMetrics;
+import org.apache.iotdb.db.exception.sql.SemanticException;
+import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
+import org.apache.iotdb.db.queryengine.plan.analyze.schema.ISchemaFetcher;
+import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertBaseStatement;
+import 
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertMultiTabletsStatement;
+import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowStatement;
+import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsStatement;
+import 
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
+import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AnalyzeUtils {
+
+  private static final PerformanceOverviewMetrics PERFORMANCE_OVERVIEW_METRICS 
=
+      PerformanceOverviewMetrics.getInstance();
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(AnalyzeUtils.class);
+
+  private AnalyzeUtils() {
+    // util class
+  }
+
+  public static void validateSchema(
+      IAnalysis analysis, InsertBaseStatement insertStatement,
+      Runnable schemaValidation) {
+    final long startTime = System.nanoTime();
+    try {
+      schemaValidation.run();
+    } catch (SemanticException e) {
+      analysis.setFinishQueryAfterAnalyze(true);
+      if (e.getCause() instanceof IoTDBException) {
+        IoTDBException exception = (IoTDBException) e.getCause();
+        analysis.setFailStatus(
+            RpcUtils.getStatus(exception.getErrorCode(), 
exception.getMessage()));
+      } else {
+        analysis.setFailStatus(RpcUtils.getStatus(TSStatusCode.METADATA_ERROR, 
e.getMessage()));
+      }
+    } finally {
+      
PERFORMANCE_OVERVIEW_METRICS.recordScheduleSchemaValidateCost(System.nanoTime() 
- startTime);
+    }
+    boolean hasFailedMeasurement = insertStatement.hasFailedMeasurements();
+    String partialInsertMessage;
+    if (hasFailedMeasurement) {
+      partialInsertMessage =
+          String.format(
+              "Fail to insert measurements %s caused by %s",
+              insertStatement.getFailedMeasurements(), 
insertStatement.getFailedMessages());
+      LOGGER.warn(partialInsertMessage);
+      analysis.setFailStatus(
+          RpcUtils.getStatus(TSStatusCode.METADATA_ERROR.getStatusCode(), 
partialInsertMessage));
+    }
+  }
+
+  public static InsertBaseStatement removeLogicalView(
+      IAnalysis analysis, InsertBaseStatement insertBaseStatement) {
+    try {
+      return insertBaseStatement.removeLogicalView();
+    } catch (SemanticException e) {
+      analysis.setFinishQueryAfterAnalyze(true);
+      if (e.getCause() instanceof IoTDBException) {
+        IoTDBException exception = (IoTDBException) e.getCause();
+        analysis.setFailStatus(
+            RpcUtils.getStatus(exception.getErrorCode(), 
exception.getMessage()));
+      } else {
+        analysis.setFailStatus(RpcUtils.getStatus(TSStatusCode.METADATA_ERROR, 
e.getMessage()));
+      }
+      return insertBaseStatement;
+    }
+  }
+
+  /** get analysis according to statement and params */
+  public static Analysis getAnalysisForWriting(
+      Analysis analysis, List<DataPartitionQueryParam> 
dataPartitionQueryParams, String userName,
+      IPartitionFetcher partitionFetcher) {
+
+    DataPartition dataPartition =
+        partitionFetcher.getOrCreateDataPartition(dataPartitionQueryParams, 
userName);
+    if (dataPartition.isEmpty()) {
+      analysis.setFinishQueryAfterAnalyze(true);
+      analysis.setFailStatus(
+          RpcUtils.getStatus(
+              TSStatusCode.DATABASE_NOT_EXIST.getStatusCode(),
+              "Database not exists and failed to create automatically "
+                  + "because enable_auto_create_schema is FALSE."));
+    }
+    analysis.setDataPartitionInfo(dataPartition);
+    return analysis;
+  }
+
+  public static Analysis computeAnalysisForInsertRows(
+      Analysis analysis, InsertRowsStatement insertRowsStatement, String 
userName,
+      IPartitionFetcher partitionFetcher) {
+    Map<String, Set<TTimePartitionSlot>> dataPartitionQueryParamMap = new 
HashMap<>();
+    for (InsertRowStatement insertRowStatement : 
insertRowsStatement.getInsertRowStatementList()) {
+      Set<TTimePartitionSlot> timePartitionSlotSet =
+          dataPartitionQueryParamMap.computeIfAbsent(
+              insertRowStatement.getDevicePath().getFullPath(), k -> new 
HashSet<>());
+      timePartitionSlotSet.add(insertRowStatement.getTimePartitionSlot());
+    }
+
+    List<DataPartitionQueryParam> dataPartitionQueryParams = new ArrayList<>();
+    for (Map.Entry<String, Set<TTimePartitionSlot>> entry : 
dataPartitionQueryParamMap.entrySet()) {
+      DataPartitionQueryParam dataPartitionQueryParam = new 
DataPartitionQueryParam();
+      dataPartitionQueryParam.setDevicePath(entry.getKey());
+      dataPartitionQueryParam.setTimePartitionSlotList(new 
ArrayList<>(entry.getValue()));
+      dataPartitionQueryParams.add(dataPartitionQueryParam);
+    }
+
+    return getAnalysisForWriting(analysis, dataPartitionQueryParams, userName, 
partitionFetcher);
+  }
+
+  public static Analysis computeAnalysisForMultiTablets(
+      Analysis analysis, InsertMultiTabletsStatement 
insertMultiTabletsStatement, String userName
+      , IPartitionFetcher partitionFetcher) {
+    Map<String, Set<TTimePartitionSlot>> dataPartitionQueryParamMap = new 
HashMap<>();
+    for (InsertTabletStatement insertTabletStatement :
+        insertMultiTabletsStatement.getInsertTabletStatementList()) {
+      Set<TTimePartitionSlot> timePartitionSlotSet =
+          dataPartitionQueryParamMap.computeIfAbsent(
+              insertTabletStatement.getDevicePath().getFullPath(), k -> new 
HashSet<>());
+      
timePartitionSlotSet.addAll(insertTabletStatement.getTimePartitionSlots());
+    }
+
+    List<DataPartitionQueryParam> dataPartitionQueryParams = new ArrayList<>();
+    for (Map.Entry<String, Set<TTimePartitionSlot>> entry : 
dataPartitionQueryParamMap.entrySet()) {
+      DataPartitionQueryParam dataPartitionQueryParam = new 
DataPartitionQueryParam();
+      dataPartitionQueryParam.setDevicePath(entry.getKey());
+      dataPartitionQueryParam.setTimePartitionSlotList(new 
ArrayList<>(entry.getValue()));
+      dataPartitionQueryParams.add(dataPartitionQueryParam);
+    }
+
+    return getAnalysisForWriting(analysis, dataPartitionQueryParams, userName, 
partitionFetcher);
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
index cf49f83c7ce..af95e958a8c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
@@ -24,7 +24,6 @@ import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
 import org.apache.iotdb.commons.client.exception.ClientManagerException;
 import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.commons.exception.IllegalPathException;
-import org.apache.iotdb.commons.exception.IoTDBException;
 import org.apache.iotdb.commons.exception.MetadataException;
 import org.apache.iotdb.commons.partition.DataPartition;
 import org.apache.iotdb.commons.partition.DataPartitionQueryParam;
@@ -185,6 +184,11 @@ import static 
org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant
 import static 
org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant.ENDTIME;
 import static 
org.apache.iotdb.db.queryengine.metric.QueryPlanCostMetricSet.PARTITION_FETCHER;
 import static 
org.apache.iotdb.db.queryengine.metric.QueryPlanCostMetricSet.SCHEMA_FETCHER;
+import static 
org.apache.iotdb.db.queryengine.plan.analyze.AnalyzeUtils.computeAnalysisForInsertRows;
+import static 
org.apache.iotdb.db.queryengine.plan.analyze.AnalyzeUtils.computeAnalysisForMultiTablets;
+import static 
org.apache.iotdb.db.queryengine.plan.analyze.AnalyzeUtils.getAnalysisForWriting;
+import static 
org.apache.iotdb.db.queryengine.plan.analyze.AnalyzeUtils.removeLogicalView;
+import static 
org.apache.iotdb.db.queryengine.plan.analyze.AnalyzeUtils.validateSchema;
 import static 
org.apache.iotdb.db.queryengine.plan.analyze.ExpressionAnalyzer.bindSchemaForExpression;
 import static 
org.apache.iotdb.db.queryengine.plan.analyze.ExpressionAnalyzer.concatDeviceAndBindSchemaForExpression;
 import static 
org.apache.iotdb.db.queryengine.plan.analyze.ExpressionAnalyzer.getMeasurementExpression;
@@ -2616,7 +2620,8 @@ public class AnalyzeVisitor extends 
StatementVisitor<Analysis, MPPQueryContext>
     context.setQueryType(QueryType.WRITE);
     insertTabletStatement.semanticCheck();
     Analysis analysis = new Analysis();
-    validateSchema(analysis, insertTabletStatement, context);
+    validateSchema(analysis, insertTabletStatement,
+        () -> SchemaValidator.validate(schemaFetcher, insertTabletStatement, 
context));
     InsertBaseStatement realStatement = removeLogicalView(analysis, 
insertTabletStatement);
     if (analysis.isFinishQueryAfterAnalyze()) {
       return analysis;
@@ -2634,12 +2639,12 @@ public class AnalyzeVisitor extends 
StatementVisitor<Analysis, MPPQueryContext>
       return getAnalysisForWriting(
           analysis,
           Collections.singletonList(dataPartitionQueryParam),
-          context.getSession().getUserName());
+          context.getSession().getUserName(), partitionFetcher);
     } else {
       return computeAnalysisForMultiTablets(
           analysis,
           (InsertMultiTabletsStatement) realStatement,
-          context.getSession().getUserName());
+          context.getSession().getUserName(), partitionFetcher);
     }
   }
 
@@ -2648,7 +2653,8 @@ public class AnalyzeVisitor extends 
StatementVisitor<Analysis, MPPQueryContext>
     context.setQueryType(QueryType.WRITE);
     insertRowStatement.semanticCheck();
     Analysis analysis = new Analysis();
-    validateSchema(analysis, insertRowStatement, context);
+    validateSchema(analysis, insertRowStatement,
+        () -> SchemaValidator.validate(schemaFetcher, insertRowStatement, 
context));
     InsertBaseStatement realInsertStatement = removeLogicalView(analysis, 
insertRowStatement);
     if (analysis.isFinishQueryAfterAnalyze()) {
       return analysis;
@@ -2665,41 +2671,22 @@ public class AnalyzeVisitor extends 
StatementVisitor<Analysis, MPPQueryContext>
       return getAnalysisForWriting(
           analysis,
           Collections.singletonList(dataPartitionQueryParam),
-          context.getSession().getUserName());
+          context.getSession().getUserName(), partitionFetcher);
     } else {
       return computeAnalysisForInsertRows(
-          analysis, (InsertRowsStatement) realInsertStatement, 
context.getSession().getUserName());
+          analysis, (InsertRowsStatement) realInsertStatement, 
context.getSession().getUserName()
+          , partitionFetcher);
     }
   }
 
-  private Analysis computeAnalysisForInsertRows(
-      Analysis analysis, InsertRowsStatement insertRowsStatement, String 
userName) {
-    Map<String, Set<TTimePartitionSlot>> dataPartitionQueryParamMap = new 
HashMap<>();
-    for (InsertRowStatement insertRowStatement : 
insertRowsStatement.getInsertRowStatementList()) {
-      Set<TTimePartitionSlot> timePartitionSlotSet =
-          dataPartitionQueryParamMap.computeIfAbsent(
-              insertRowStatement.getDevicePath().getFullPath(), k -> new 
HashSet<>());
-      timePartitionSlotSet.add(insertRowStatement.getTimePartitionSlot());
-    }
-
-    List<DataPartitionQueryParam> dataPartitionQueryParams = new ArrayList<>();
-    for (Map.Entry<String, Set<TTimePartitionSlot>> entry : 
dataPartitionQueryParamMap.entrySet()) {
-      DataPartitionQueryParam dataPartitionQueryParam = new 
DataPartitionQueryParam();
-      dataPartitionQueryParam.setDevicePath(entry.getKey());
-      dataPartitionQueryParam.setTimePartitionSlotList(new 
ArrayList<>(entry.getValue()));
-      dataPartitionQueryParams.add(dataPartitionQueryParam);
-    }
-
-    return getAnalysisForWriting(analysis, dataPartitionQueryParams, userName);
-  }
-
   @Override
   public Analysis visitInsertRows(
       InsertRowsStatement insertRowsStatement, MPPQueryContext context) {
     context.setQueryType(QueryType.WRITE);
     insertRowsStatement.semanticCheck();
     Analysis analysis = new Analysis();
-    validateSchema(analysis, insertRowsStatement, context);
+    validateSchema(analysis, insertRowsStatement,
+        () -> SchemaValidator.validate(schemaFetcher, insertRowsStatement, 
context));
     InsertRowsStatement realInsertRowsStatement =
         (InsertRowsStatement) removeLogicalView(analysis, insertRowsStatement);
     if (analysis.isFinishQueryAfterAnalyze()) {
@@ -2708,29 +2695,7 @@ public class AnalyzeVisitor extends 
StatementVisitor<Analysis, MPPQueryContext>
     analysis.setStatement(realInsertRowsStatement);
 
     return computeAnalysisForInsertRows(
-        analysis, realInsertRowsStatement, context.getSession().getUserName());
-  }
-
-  private Analysis computeAnalysisForMultiTablets(
-      Analysis analysis, InsertMultiTabletsStatement 
insertMultiTabletsStatement, String userName) {
-    Map<String, Set<TTimePartitionSlot>> dataPartitionQueryParamMap = new 
HashMap<>();
-    for (InsertTabletStatement insertTabletStatement :
-        insertMultiTabletsStatement.getInsertTabletStatementList()) {
-      Set<TTimePartitionSlot> timePartitionSlotSet =
-          dataPartitionQueryParamMap.computeIfAbsent(
-              insertTabletStatement.getDevicePath().getFullPath(), k -> new 
HashSet<>());
-      
timePartitionSlotSet.addAll(insertTabletStatement.getTimePartitionSlots());
-    }
-
-    List<DataPartitionQueryParam> dataPartitionQueryParams = new ArrayList<>();
-    for (Map.Entry<String, Set<TTimePartitionSlot>> entry : 
dataPartitionQueryParamMap.entrySet()) {
-      DataPartitionQueryParam dataPartitionQueryParam = new 
DataPartitionQueryParam();
-      dataPartitionQueryParam.setDevicePath(entry.getKey());
-      dataPartitionQueryParam.setTimePartitionSlotList(new 
ArrayList<>(entry.getValue()));
-      dataPartitionQueryParams.add(dataPartitionQueryParam);
-    }
-
-    return getAnalysisForWriting(analysis, dataPartitionQueryParams, userName);
+        analysis, realInsertRowsStatement, context.getSession().getUserName(), 
partitionFetcher);
   }
 
   @Override
@@ -2739,7 +2704,8 @@ public class AnalyzeVisitor extends 
StatementVisitor<Analysis, MPPQueryContext>
     context.setQueryType(QueryType.WRITE);
     insertMultiTabletsStatement.semanticCheck();
     Analysis analysis = new Analysis();
-    validateSchema(analysis, insertMultiTabletsStatement, context);
+    validateSchema(analysis, insertMultiTabletsStatement,
+        () -> SchemaValidator.validate(schemaFetcher, 
insertMultiTabletsStatement, context));
     InsertMultiTabletsStatement realStatement =
         (InsertMultiTabletsStatement) removeLogicalView(analysis, 
insertMultiTabletsStatement);
     if (analysis.isFinishQueryAfterAnalyze()) {
@@ -2748,7 +2714,7 @@ public class AnalyzeVisitor extends 
StatementVisitor<Analysis, MPPQueryContext>
     analysis.setStatement(realStatement);
 
     return computeAnalysisForMultiTablets(
-        analysis, realStatement, context.getSession().getUserName());
+        analysis, realStatement, context.getSession().getUserName(), 
partitionFetcher);
   }
 
   @Override
@@ -2757,7 +2723,8 @@ public class AnalyzeVisitor extends 
StatementVisitor<Analysis, MPPQueryContext>
     context.setQueryType(QueryType.WRITE);
     insertRowsOfOneDeviceStatement.semanticCheck();
     Analysis analysis = new Analysis();
-    validateSchema(analysis, insertRowsOfOneDeviceStatement, context);
+    validateSchema(analysis, insertRowsOfOneDeviceStatement,
+        () -> SchemaValidator.validate(schemaFetcher, 
insertRowsOfOneDeviceStatement, context));
     InsertBaseStatement realInsertStatement =
         removeLogicalView(analysis, insertRowsOfOneDeviceStatement);
     if (analysis.isFinishQueryAfterAnalyze()) {
@@ -2775,10 +2742,12 @@ public class AnalyzeVisitor extends 
StatementVisitor<Analysis, MPPQueryContext>
       return getAnalysisForWriting(
           analysis,
           Collections.singletonList(dataPartitionQueryParam),
-          context.getSession().getUserName());
+          context.getSession().getUserName(),
+          partitionFetcher);
     } else {
       return computeAnalysisForInsertRows(
-          analysis, (InsertRowsStatement) realInsertStatement, 
context.getSession().getUserName());
+          analysis, (InsertRowsStatement) realInsertStatement, 
context.getSession().getUserName()
+          , partitionFetcher);
     }
   }
 
@@ -2793,53 +2762,6 @@ public class AnalyzeVisitor extends 
StatementVisitor<Analysis, MPPQueryContext>
     return analysis;
   }
 
-  private void validateSchema(
-      Analysis analysis, InsertBaseStatement insertStatement, MPPQueryContext 
context) {
-    final long startTime = System.nanoTime();
-    try {
-      SchemaValidator.validate(schemaFetcher, insertStatement, context);
-    } catch (SemanticException e) {
-      analysis.setFinishQueryAfterAnalyze(true);
-      if (e.getCause() instanceof IoTDBException) {
-        IoTDBException exception = (IoTDBException) e.getCause();
-        analysis.setFailStatus(
-            RpcUtils.getStatus(exception.getErrorCode(), 
exception.getMessage()));
-      } else {
-        analysis.setFailStatus(RpcUtils.getStatus(TSStatusCode.METADATA_ERROR, 
e.getMessage()));
-      }
-    } finally {
-      
PERFORMANCE_OVERVIEW_METRICS.recordScheduleSchemaValidateCost(System.nanoTime() 
- startTime);
-    }
-    boolean hasFailedMeasurement = insertStatement.hasFailedMeasurements();
-    String partialInsertMessage;
-    if (hasFailedMeasurement) {
-      partialInsertMessage =
-          String.format(
-              "Fail to insert measurements %s caused by %s",
-              insertStatement.getFailedMeasurements(), 
insertStatement.getFailedMessages());
-      logger.warn(partialInsertMessage);
-      analysis.setFailStatus(
-          RpcUtils.getStatus(TSStatusCode.METADATA_ERROR.getStatusCode(), 
partialInsertMessage));
-    }
-  }
-
-  private InsertBaseStatement removeLogicalView(
-      Analysis analysis, InsertBaseStatement insertBaseStatement) {
-    try {
-      return insertBaseStatement.removeLogicalView();
-    } catch (SemanticException e) {
-      analysis.setFinishQueryAfterAnalyze(true);
-      if (e.getCause() instanceof IoTDBException) {
-        IoTDBException exception = (IoTDBException) e.getCause();
-        analysis.setFailStatus(
-            RpcUtils.getStatus(exception.getErrorCode(), 
exception.getMessage()));
-      } else {
-        analysis.setFailStatus(RpcUtils.getStatus(TSStatusCode.METADATA_ERROR, 
e.getMessage()));
-      }
-      return insertBaseStatement;
-    }
-  }
-
   @Override
   public Analysis visitLoadFile(LoadTsFileStatement loadTsFileStatement, 
MPPQueryContext context) {
     LoadTsfileAnalyzer loadTsfileAnalyzer =
@@ -2851,24 +2773,6 @@ public class AnalyzeVisitor extends 
StatementVisitor<Analysis, MPPQueryContext>
     }
   }
 
-  /** get analysis according to statement and params */
-  private Analysis getAnalysisForWriting(
-      Analysis analysis, List<DataPartitionQueryParam> 
dataPartitionQueryParams, String userName) {
-
-    DataPartition dataPartition =
-        partitionFetcher.getOrCreateDataPartition(dataPartitionQueryParams, 
userName);
-    if (dataPartition.isEmpty()) {
-      analysis.setFinishQueryAfterAnalyze(true);
-      analysis.setFailStatus(
-          RpcUtils.getStatus(
-              TSStatusCode.DATABASE_NOT_EXIST.getStatusCode(),
-              "Database not exists and failed to create automatically "
-                  + "because enable_auto_create_schema is FALSE."));
-    }
-    analysis.setDataPartitionInfo(dataPartition);
-    return analysis;
-  }
-
   private boolean analyzeTimeseriesRegionScan(
       WhereCondition timeCondition,
       PathPatternTree patternTree,
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/IAnalysis.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/IAnalysis.java
index 207dea7b9f3..bb3821aef6e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/IAnalysis.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/IAnalysis.java
@@ -42,4 +42,12 @@ public interface IAnalysis {
   DatasetHeader getRespDatasetHeader();
 
   String getStatementType();
+
+  default void setFinishQueryAfterAnalyze(boolean b) {
+
+  }
+
+  default void setFailStatus(TSStatus status) {
+
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ISchemaAutoCreation.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ISchemaAutoCreation.java
index e54e8a37d9e..55f8dd5210c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ISchemaAutoCreation.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ISchemaAutoCreation.java
@@ -39,7 +39,8 @@ public interface ISchemaAutoCreation {
 
   TSDataType getDataType(int index);
 
+  //TODO: Check necessity because all implementations return null
   TSEncoding getEncoding(int index);
-
+  //TODO: Check necessity because all implementations return null
   CompressionType getCompressionType(int index);
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/SchemaValidator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/SchemaValidator.java
index 7bbdd5cefc6..1406698cc42 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/SchemaValidator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/SchemaValidator.java
@@ -24,6 +24,8 @@ import 
org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.exception.sql.SemanticException;
 import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
 import org.apache.iotdb.db.queryengine.common.schematree.ISchemaTree;
+import org.apache.iotdb.db.queryengine.plan.relational.metadata.Metadata;
+import 
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.WrappedInsertStatement;
 import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertBaseStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertMultiTabletsStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsOfOneDeviceStatement;
@@ -55,6 +57,18 @@ public class SchemaValidator {
     }
   }
 
+  public static void validate(
+      Metadata metadata, WrappedInsertStatement insertStatement, 
MPPQueryContext context) {
+    try {
+      
metadata.fetchAndComputeSchemaWithAutoCreate(insertStatement.getSchemaValidationList(),
+          context);
+      insertStatement.updateAfterSchemaValidation(context);
+    } catch (QueryProcessException e) {
+      throw new SemanticException(e.getMessage());
+    }
+  }
+
+
   public static ISchemaTree validate(
       ISchemaFetcher schemaFetcher,
       List<PartialPath> devicePaths,
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/StatementGenerator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/StatementGenerator.java
index 43a117002c6..288f54fd77c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/StatementGenerator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/StatementGenerator.java
@@ -35,6 +35,7 @@ import 
org.apache.iotdb.db.queryengine.plan.expression.leaf.ConstantOperand;
 import org.apache.iotdb.db.queryengine.plan.expression.leaf.TimeSeriesOperand;
 import org.apache.iotdb.db.queryengine.plan.expression.leaf.TimestampOperand;
 import 
org.apache.iotdb.db.queryengine.plan.expression.multi.FunctionExpression;
+import 
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ColumnDefinition.ColumnCategory;
 import org.apache.iotdb.db.queryengine.plan.statement.Statement;
 import org.apache.iotdb.db.queryengine.plan.statement.component.FromComponent;
 import 
org.apache.iotdb.db.queryengine.plan.statement.component.GroupByTimeComponent;
@@ -335,8 +336,19 @@ public class StatementGenerator {
     }
     insertStatement.setDataTypes(dataTypes);
     insertStatement.setAligned(insertTabletReq.isAligned);
-    PERFORMANCE_OVERVIEW_METRICS.recordParseCost(System.nanoTime() - 
startTime);
     insertStatement.setWriteToTable(insertTabletReq.isWriteToTable());
+    if (insertTabletReq.isWriteToTable()) {
+      if (!insertTabletReq.isSetColumnCategories() || 
insertTabletReq.getColumnCategoriesSize() != 
insertTabletReq.getMeasurementsSize()) {
+        throw new IllegalArgumentException("Missing or invalid column 
categories for table "
+            + "insertion");
+      }
+      ColumnCategory[] columnCategories = new 
ColumnCategory[insertTabletReq.columnCategories.size()];
+      for (int i = 0; i < columnCategories.length; i++) {
+        columnCategories[i] = 
ColumnCategory.values()[insertTabletReq.getColumnCategories().get(i)];
+      }
+      insertStatement.setColumnCategories(columnCategories);
+    }
+    PERFORMANCE_OVERVIEW_METRICS.recordParseCost(System.nanoTime() - 
startTime);
     return insertStatement;
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/StatementAnalyzer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/StatementAnalyzer.java
index a15ed482daa..75e7153513a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/StatementAnalyzer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/StatementAnalyzer.java
@@ -19,11 +19,17 @@
 
 package org.apache.iotdb.db.queryengine.plan.relational.analyzer;
 
+import java.util.Collections;
+import org.apache.iotdb.commons.partition.DataPartitionQueryParam;
 import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
 import org.apache.iotdb.db.exception.sql.SemanticException;
+import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
 import org.apache.iotdb.db.queryengine.common.SessionInfo;
 import org.apache.iotdb.db.queryengine.execution.warnings.IoTDBWarning;
 import org.apache.iotdb.db.queryengine.execution.warnings.WarningCollector;
+import org.apache.iotdb.db.queryengine.plan.analyze.IAnalysis;
+import org.apache.iotdb.db.queryengine.plan.analyze.QueryType;
+import org.apache.iotdb.db.queryengine.plan.analyze.schema.SchemaValidator;
 import org.apache.iotdb.db.queryengine.plan.relational.metadata.ColumnSchema;
 import org.apache.iotdb.db.queryengine.plan.relational.metadata.Metadata;
 import 
org.apache.iotdb.db.queryengine.plan.relational.metadata.QualifiedObjectName;
@@ -57,6 +63,7 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.GroupingElement;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.GroupingSets;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Identifier;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Insert;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.InsertTablet;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Intersect;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Join;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.JoinCriteria;
@@ -106,6 +113,9 @@ import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.ListMultimap;
 import com.google.common.collect.Streams;
+import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertBaseStatement;
+import 
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertMultiTabletsStatement;
+import 
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
 import org.apache.tsfile.read.common.type.RowType;
 import org.apache.tsfile.read.common.type.Type;
 
@@ -134,6 +144,10 @@ import static java.util.Collections.emptyList;
 import static java.util.Locale.ENGLISH;
 import static java.util.Objects.requireNonNull;
 import static 
org.apache.iotdb.db.queryengine.execution.warnings.StandardWarningCode.REDUNDANT_ORDER_BY;
+import static 
org.apache.iotdb.db.queryengine.plan.analyze.AnalyzeUtils.computeAnalysisForMultiTablets;
+import static 
org.apache.iotdb.db.queryengine.plan.analyze.AnalyzeUtils.getAnalysisForWriting;
+import static 
org.apache.iotdb.db.queryengine.plan.analyze.AnalyzeUtils.removeLogicalView;
+import static 
org.apache.iotdb.db.queryengine.plan.analyze.AnalyzeUtils.validateSchema;
 import static 
org.apache.iotdb.db.queryengine.plan.relational.analyzer.AggregationAnalyzer.verifyOrderByAggregations;
 import static 
org.apache.iotdb.db.queryengine.plan.relational.analyzer.AggregationAnalyzer.verifySourceAggregations;
 import static 
org.apache.iotdb.db.queryengine.plan.relational.analyzer.CanonicalizationAware.canonicalizationAwareKey;
@@ -153,7 +167,7 @@ public class StatementAnalyzer {
 
   private final StatementAnalyzerFactory statementAnalyzerFactory;
 
-  private final Analysis analysis;
+  private Analysis analysis;
 
   private final AccessControl accessControl;
 
@@ -212,6 +226,7 @@ public class StatementAnalyzer {
    * scopes hierarchy should always have outer query scope (if provided) as 
ancestor.
    */
   private final class Visitor extends AstVisitor<Scope, Optional<Scope>> {
+
     private final boolean isTopLevel;
     private final Optional<Scope> outerQueryScope;
     private final WarningCollector warningCollector;
@@ -352,6 +367,46 @@ public class StatementAnalyzer {
       throw new SemanticException("Insert statement is not supported yet.");
     }
 
+    protected Scope visitInsertTablet(InsertTablet insert, Optional<Scope> 
scope) {
+      final Scope ret = Scope.create();
+
+      final MPPQueryContext context = insert.getContext();
+      final InsertTabletStatement insertTabletStatement = 
insert.getInnerTreeStatement();
+      context.setQueryType(QueryType.WRITE);
+      insertTabletStatement.semanticCheck();
+      IAnalysis analysis = new 
org.apache.iotdb.db.queryengine.plan.analyze.Analysis();
+
+      validateSchema(analysis, insertTabletStatement,
+          () -> SchemaValidator.validate(metadata, insert, context));
+      InsertBaseStatement realStatement = removeLogicalView(analysis, 
insertTabletStatement);
+      if (analysis.isFinishQueryAfterAnalyze()) {
+        return ret;
+      }
+      analysis.setStatement(realStatement);
+
+      if (realStatement instanceof InsertTabletStatement) {
+        InsertTabletStatement realInsertTabletStatement = 
(InsertTabletStatement) realStatement;
+        DataPartitionQueryParam dataPartitionQueryParam = new 
DataPartitionQueryParam();
+        dataPartitionQueryParam.setDevicePath(
+            realInsertTabletStatement.getDevicePath().getFullPath());
+        dataPartitionQueryParam.setTimePartitionSlotList(
+            realInsertTabletStatement.getTimePartitionSlots());
+
+        analysis = getAnalysisForWriting(
+            analysis,
+            Collections.singletonList(dataPartitionQueryParam),
+            context.getSession().getUserName(), partitionFetcher);
+      } else {
+        analysis =  computeAnalysisForMultiTablets(
+            analysis,
+            (InsertMultiTabletsStatement) realStatement,
+            context.getSession().getUserName(), partitionFetcher);
+      }
+      // TODO-TableIngestion: use IAnalysis
+      // StatementAnalyzer.this.analysis = analysis;
+      return ret;
+    }
+
     @Override
     protected Scope visitDelete(Delete node, Optional<Scope> scope) {
       throw new SemanticException("Delete statement is not supported yet.");
@@ -1965,7 +2020,7 @@ public class StatementAnalyzer {
 
     /**
      * @return true if the Query / QuerySpecification containing the analyzed 
Limit or FetchFirst,
-     *     must contain orderBy (i.e., for FetchFirst with ties).
+     * must contain orderBy (i.e., for FetchFirst with ties).
      */
     private boolean analyzeLimit(Node node, Scope scope) {
       //      checkState(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/Metadata.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/Metadata.java
index f37aabbce8b..df047d7d652 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/Metadata.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/Metadata.java
@@ -19,7 +19,9 @@
 
 package org.apache.iotdb.db.queryengine.plan.relational.metadata;
 
+import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
 import org.apache.iotdb.db.queryengine.common.SessionInfo;
+import 
org.apache.iotdb.db.queryengine.plan.analyze.schema.ISchemaComputationWithAutoCreation;
 import org.apache.iotdb.db.queryengine.plan.relational.function.OperatorType;
 import org.apache.iotdb.db.queryengine.plan.relational.security.AccessControl;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Expression;
@@ -67,4 +69,19 @@ public interface Metadata {
       QualifiedObjectName tableName,
       List<Expression> expressionList,
       List<String> attributeColumns);
+
+  /**
+   * Fetch and compute the schema of target timeseries, with device and 
measurement defined in given
+   * schemaComputationWithAutoCreation. The computation defined in given
+   * schemaComputationWithAutoCreation will be executed during scanning the 
fetched schema. If some
+   * target timeseries doesn't exist, they will be auto created.
+   *
+   * @param schemaComputationWithAutoCreationList define the target devices, 
measurements and
+   *     computation
+   */
+  default void fetchAndComputeSchemaWithAutoCreate(
+      List<? extends ISchemaComputationWithAutoCreation> 
schemaComputationWithAutoCreationList,
+      MPPQueryContext context) {
+    throw new UnsupportedOperationException();
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/AstVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/AstVisitor.java
index d2c1e487458..f2476e59a95 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/AstVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/AstVisitor.java
@@ -371,6 +371,10 @@ public abstract class AstVisitor<R, C> {
     return visitStatement(node, context);
   }
 
+  protected R visitInsertTablet(InsertTablet node, C context) {
+    return visitStatement(node, context);
+  }
+
   protected R visitDelete(Delete node, C context) {
     return visitStatement(node, context);
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/InsertTablet.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/InsertTablet.java
index 2b858bdb8a7..38a423fae21 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/InsertTablet.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/InsertTablet.java
@@ -17,13 +17,99 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.queryengine.plan.relational.sql.tree;
+package org.apache.iotdb.db.queryengine.plan.relational.sql.ast;
 
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.iotdb.commons.schema.view.LogicalViewSchema;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
+import 
org.apache.iotdb.db.queryengine.common.schematree.IMeasurementSchemaInfo;
+import 
org.apache.iotdb.db.queryengine.plan.analyze.schema.ISchemaComputationWithAutoCreation;
 import 
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
+import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.utils.Pair;
 
-public class InsertTablet extends WrappedStatement {
+public class InsertTablet extends WrappedInsertStatement {
 
-  public InsertTablet(InsertTabletStatement insertTabletStatement) {
-    super(insertTabletStatement);
+  private InsertTabletStatement insertTabletStatement;
+
+  public InsertTablet(InsertTabletStatement insertTabletStatement, 
MPPQueryContext context) {
+    super(insertTabletStatement, context);
+    this.insertTabletStatement = insertTabletStatement;
+  }
+
+  @Override
+  public <R, C> R accept(AstVisitor<R, C> visitor, C context) {
+    return visitor.visitInsertTablet(this, context);
+  }
+
+  @Override
+  public InsertTabletStatement getInnerTreeStatement() {
+    return insertTabletStatement;
+  }
+
+  @Override
+  public List<ISchemaComputationWithAutoCreation> getSchemaValidationList() {
+    Map<IDeviceID, ISchemaComputationWithAutoCreation> map = new HashMap<>();
+    for (int i = 0; i < insertTabletStatement.getRowCount(); i++) {
+      map.computeIfAbsent(insertTabletStatement.getTableDeviceID(i), 
this::getSchemaComputation);
+    }
+    return new ArrayList<>(map.values());
+  }
+
+  @Override
+  public void updateAfterSchemaValidation(MPPQueryContext context) throws 
QueryProcessException {
+    insertTabletStatement.updateAfterSchemaValidation(context);
+  }
+
+  @Override
+  public ISchemaComputationWithAutoCreation getSchemaComputation(IDeviceID 
deviceID) {
+    return new SchemaExecutions(deviceID);
+  }
+
+  public class SchemaExecutions extends BasicSchemaExecutions {
+
+    public SchemaExecutions(IDeviceID deviceID) {
+      super(deviceID);
+    }
+
+    @Override
+    public void computeMeasurement(int index, IMeasurementSchemaInfo 
measurementSchemaInfo) {
+      insertTabletStatement.computeMeasurement(index, measurementSchemaInfo);
+    }
+
+    @Override
+    public boolean hasLogicalViewNeedProcess() {
+      return insertTabletStatement.hasLogicalViewNeedProcess();
+    }
+
+    @Override
+    public List<LogicalViewSchema> getLogicalViewSchemaList() {
+      return insertTabletStatement.getLogicalViewSchemaList();
+    }
+
+    @Override
+    public List<Integer> getIndexListOfLogicalViewPaths() {
+      return insertTabletStatement.getIndexListOfLogicalViewPaths();
+    }
+
+    @Override
+    public void recordRangeOfLogicalViewSchemaListNow() {
+      insertTabletStatement.recordRangeOfLogicalViewSchemaListNow();
+    }
+
+    @Override
+    public Pair<Integer, Integer> getRangeOfLogicalViewSchemaListRecorded() {
+      return insertTabletStatement.getRangeOfLogicalViewSchemaListRecorded();
+    }
+
+    @Override
+    public void computeMeasurementOfView(int index, IMeasurementSchemaInfo 
measurementSchemaInfo,
+        boolean isAligned) {
+      insertTabletStatement.computeMeasurementOfView(index, 
measurementSchemaInfo, isAligned);
+    }
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/WrappedInsertStatement.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/WrappedInsertStatement.java
new file mode 100644
index 00000000000..e57e7b55c40
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/WrappedInsertStatement.java
@@ -0,0 +1,89 @@
+package org.apache.iotdb.db.queryengine.plan.relational.sql.ast;
+
+import java.util.List;
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.schema.view.LogicalViewSchema;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
+import 
org.apache.iotdb.db.queryengine.common.schematree.IMeasurementSchemaInfo;
+import org.apache.iotdb.db.queryengine.execution.schedule.queue.ID;
+import 
org.apache.iotdb.db.queryengine.plan.analyze.schema.ISchemaComputationWithAutoCreation;
+import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertBaseStatement;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.file.metadata.enums.CompressionType;
+import org.apache.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.tsfile.utils.Pair;
+
+public abstract class WrappedInsertStatement extends WrappedStatement {
+
+  protected InsertBaseStatement insertStatement;
+
+  public WrappedInsertStatement(
+      InsertBaseStatement innerTreeStatement,
+      MPPQueryContext context) {
+    super(innerTreeStatement, context);
+    this.insertStatement = innerTreeStatement;
+  }
+
+  @Override
+  public InsertBaseStatement getInnerTreeStatement() {
+    return insertStatement;
+  }
+
+  public abstract List<ISchemaComputationWithAutoCreation> 
getSchemaValidationList();
+
+  public abstract void updateAfterSchemaValidation(MPPQueryContext context) 
throws QueryProcessException;
+
+  public abstract ISchemaComputationWithAutoCreation 
getSchemaComputation(IDeviceID deviceID);
+
+  public abstract class BasicSchemaExecutions implements 
ISchemaComputationWithAutoCreation {
+    protected IDeviceID deviceID;
+
+    public BasicSchemaExecutions(IDeviceID deviceID) {
+      this.deviceID = deviceID;
+    }
+
+    @Override
+    public boolean isAligned() {
+      return true;
+    }
+
+    @Override
+    public TSDataType getDataType(int index) {
+      return insertStatement.getDataTypes()[index];
+    }
+
+    @Override
+    public TSEncoding getEncoding(int index) {
+      return null;
+    }
+
+    @Override
+    public CompressionType getCompressionType(int index) {
+      return null;
+    }
+
+    @Override
+    public PartialPath getDevicePath() {
+      //TODO-TableInsertion: use deviceId
+      try {
+        return new PartialPath(deviceID);
+      } catch (IllegalPathException e) {
+        throw new RuntimeException(e);
+      }
+    }
+
+    @Override
+    public String[] getMeasurements() {
+      return insertStatement.getMeasurements();
+    }
+
+    @Override
+    public void computeDevice(boolean isAligned) {
+      // ignored, table device must be aligned
+    }
+
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/WrappedStatement.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/WrappedStatement.java
index 62c7a7dee77..380388ee141 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/WrappedStatement.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/WrappedStatement.java
@@ -17,18 +17,22 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.queryengine.plan.relational.sql.tree;
+package org.apache.iotdb.db.queryengine.plan.relational.sql.ast;
 
 import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
+import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
 
 public abstract class WrappedStatement extends Statement {
-  private final org.apache.iotdb.db.queryengine.plan.statement.Statement 
innerTreeStatement;
+  protected final org.apache.iotdb.db.queryengine.plan.statement.Statement 
innerTreeStatement;
+  protected final MPPQueryContext context;
 
-  public 
WrappedStatement(org.apache.iotdb.db.queryengine.plan.statement.Statement 
innerTreeStatement) {
+  public 
WrappedStatement(org.apache.iotdb.db.queryengine.plan.statement.Statement 
innerTreeStatement,
+      MPPQueryContext context) {
     super(null);
     this.innerTreeStatement = innerTreeStatement;
+    this.context = context;
   }
 
   @Override
@@ -57,4 +61,12 @@ public abstract class WrappedStatement extends Statement {
   public String toString() {
     return innerTreeStatement.toString();
   }
+
+  public org.apache.iotdb.db.queryengine.plan.statement.Statement 
getInnerTreeStatement() {
+    return innerTreeStatement;
+  }
+
+  public MPPQueryContext getContext() {
+    return context;
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/Statement.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/Statement.java
index a19c61439c0..d7aac386a88 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/Statement.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/Statement.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.queryengine.plan.statement;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.db.auth.AuthorityChecker;
+import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
 import org.apache.iotdb.db.queryengine.plan.parser.ASTVisitor;
 
 import java.util.List;
@@ -67,7 +68,8 @@ public abstract class Statement extends StatementNode {
         "Only the admin user can perform this operation");
   }
 
-  public org.apache.iotdb.db.queryengine.plan.relational.sql.tree.Statement 
toRelationalStatement() {
+  public org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Statement 
toRelationalStatement(
+      MPPQueryContext context) {
     throw new UnsupportedOperationException("Method not implemented yet");
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java
index 35a0a6f18f6..53b60b67494 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java
@@ -26,13 +26,19 @@ import org.apache.iotdb.commons.utils.TimePartitionUtils;
 import org.apache.iotdb.db.exception.metadata.DataTypeMismatchException;
 import org.apache.iotdb.db.exception.metadata.PathNotExistException;
 import org.apache.iotdb.db.exception.sql.SemanticException;
+import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
 import 
org.apache.iotdb.db.queryengine.common.schematree.IMeasurementSchemaInfo;
 import org.apache.iotdb.db.queryengine.plan.analyze.schema.ISchemaValidation;
+import 
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ColumnDefinition.ColumnCategory;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Statement;
 import org.apache.iotdb.db.queryengine.plan.statement.StatementType;
 import org.apache.iotdb.db.queryengine.plan.statement.StatementVisitor;
 import org.apache.iotdb.db.utils.CommonUtils;
 
 import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.file.metadata.IDeviceID.Factory;
+import org.apache.tsfile.file.metadata.StringArrayDeviceID;
 import org.apache.tsfile.file.metadata.enums.CompressionType;
 import org.apache.tsfile.file.metadata.enums.TSEncoding;
 import org.apache.tsfile.utils.Binary;
@@ -60,6 +66,10 @@ public class InsertTabletStatement extends 
InsertBaseStatement implements ISchem
   private BitMap[] bitMaps;
   private Object[] columns;
 
+  private ColumnCategory[] columnCategories;
+  private List<Integer> idColumnIndices;
+  private IDeviceID[] deviceIDs;
+
   private int rowCount = 0;
 
   /**
@@ -409,4 +419,45 @@ public class InsertTabletStatement extends 
InsertBaseStatement implements ISchem
   public void setWriteToTable(boolean writeToTable) {
     this.writeToTable = writeToTable;
   }
+
+  @Override
+  public Statement toRelationalStatement(MPPQueryContext context) {
+    return super.toRelationalStatement(context);
+  }
+
+  public ColumnCategory[] getColumnCategories() {
+    return columnCategories;
+  }
+
+  public void setColumnCategories(
+      ColumnCategory[] columnCategories) {
+    this.columnCategories = columnCategories;
+    idColumnIndices = new ArrayList<>();
+    for (int i = 0; i < columnCategories.length; i++) {
+      if (columnCategories[i].equals(ColumnCategory.ID)) {
+        idColumnIndices.add(i);
+      }
+    }
+  }
+
+  public List<Integer> getIdColumnIndices() {
+    return idColumnIndices;
+  }
+
+  public IDeviceID getTableDeviceID(int rowIdx) {
+    if (deviceIDs == null) {
+      deviceIDs = new IDeviceID[rowCount];
+    }
+    if (deviceIDs[rowIdx] == null) {
+      String[] deviceIdSegments = new String[idColumnIndices.size() + 1];
+      deviceIdSegments[0] = this.devicePath.getFullPath();
+      for (int i = 0; i < idColumnIndices.size(); i++) {
+        final Integer columnIndex = idColumnIndices.get(i);
+        deviceIdSegments[i + 1] = ((Binary[]) 
columns[columnIndex])[rowIdx].toString();
+      }
+      deviceIDs[rowIdx] = Factory.DEFAULT_FACTORY.create(deviceIdSegments);
+    }
+
+    return deviceIDs[rowIdx];
+  }
 }
diff --git a/iotdb-protocol/thrift-datanode/src/main/thrift/client.thrift 
b/iotdb-protocol/thrift-datanode/src/main/thrift/client.thrift
index 2838cd82128..4b760e27cef 100644
--- a/iotdb-protocol/thrift-datanode/src/main/thrift/client.thrift
+++ b/iotdb-protocol/thrift-datanode/src/main/thrift/client.thrift
@@ -234,6 +234,7 @@ struct TSInsertTabletReq {
   7: required i32 size
   8: optional bool isAligned
   9: optional bool writeToTable
+  10: optional list<byte> columnCategories
 }
 
 struct TSInsertTabletsReq {

Reply via email to