This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch TableModelIngestion
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/TableModelIngestion by this
push:
new 419aa960059 add schema execution adaption
419aa960059 is described below
commit 419aa96005943663065c0ba1a38713a05f41a907
Author: jt2594838 <[email protected]>
AuthorDate: Tue Jun 11 18:40:36 2024 +0800
add schema execution adaption
---
.../protocol/thrift/impl/ClientRPCServiceImpl.java | 2 +-
.../iotdb/db/queryengine/plan/Coordinator.java | 47 ++++++
.../db/queryengine/plan/analyze/AnalyzeUtils.java | 168 +++++++++++++++++++++
.../queryengine/plan/analyze/AnalyzeVisitor.java | 148 ++++--------------
.../db/queryengine/plan/analyze/IAnalysis.java | 8 +
.../plan/analyze/schema/ISchemaAutoCreation.java | 3 +-
.../plan/analyze/schema/SchemaValidator.java | 14 ++
.../plan/parser/StatementGenerator.java | 14 +-
.../relational/analyzer/StatementAnalyzer.java | 59 +++++++-
.../plan/relational/metadata/Metadata.java | 17 +++
.../plan/relational/sql/ast/AstVisitor.java | 4 +
.../plan/relational/sql/ast/InsertTablet.java | 94 +++++++++++-
.../relational/sql/ast/WrappedInsertStatement.java | 89 +++++++++++
.../plan/relational/sql/ast/WrappedStatement.java | 18 ++-
.../db/queryengine/plan/statement/Statement.java | 4 +-
.../plan/statement/crud/InsertTabletStatement.java | 51 +++++++
.../thrift-datanode/src/main/thrift/client.thrift | 1 +
17 files changed, 606 insertions(+), 135 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
index 5a35238718d..08236745e6f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
@@ -2072,7 +2072,7 @@ public class ClientRPCServiceImpl implements
IClientRPCServiceWithHandler {
long queryId = SESSION_MANAGER.requestQueryId();
ExecutionResult result;
if (statement.isWriteToTable()) {
- result =
COORDINATOR.executeForTableModel(statement.toRelationalStatement(),
+ result = COORDINATOR.executeForTableModel(statement,
relationSqlParser, clientSession,
queryId,
SESSION_MANAGER.getSessionInfo(clientSession), "", metadata,
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
index 0ace106a035..fe20273f183 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
@@ -256,6 +256,53 @@ public class Coordinator {
startTime)));
}
+ public ExecutionResult executeForTableModel(
+ Statement statement,
+ SqlParser sqlParser,
+ IClientSession clientSession,
+ long queryId,
+ SessionInfo session,
+ String sql,
+ Metadata metadata,
+ long timeOut) {
+ return execution(
+ queryId,
+ session,
+ sql,
+ ((queryContext, startTime) ->
+ createQueryExecutionForTableModel(
+ statement,
+ sqlParser,
+ clientSession,
+ queryContext,
+ metadata,
+ timeOut > 0 ? timeOut : CONFIG.getQueryTimeoutThreshold(),
+ startTime)));
+ }
+
+ private IQueryExecution createQueryExecutionForTableModel(
+ Statement statement,
+ SqlParser sqlParser,
+ IClientSession clientSession,
+ MPPQueryContext queryContext,
+ Metadata metadata,
+ long timeOut,
+ long startTime) {
+ queryContext.setTimeOut(timeOut);
+ queryContext.setStartTime(startTime);
+ RelationalModelPlanner relationalModelPlanner =
+ new RelationalModelPlanner(
+ statement.toRelationalStatement(queryContext),
+ sqlParser,
+ metadata,
+ executor,
+ writeOperationExecutor,
+ scheduledExecutor,
+ SYNC_INTERNAL_SERVICE_CLIENT_MANAGER,
+ ASYNC_INTERNAL_SERVICE_CLIENT_MANAGER);
+ return new QueryExecution(relationalModelPlanner, queryContext, executor);
+ }
+
private IQueryExecution createQueryExecutionForTableModel(
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Statement
statement,
SqlParser sqlParser,
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeUtils.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeUtils.java
new file mode 100644
index 00000000000..af861730016
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeUtils.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.analyze;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
+import org.apache.iotdb.commons.exception.IoTDBException;
+import org.apache.iotdb.commons.partition.DataPartition;
+import org.apache.iotdb.commons.partition.DataPartitionQueryParam;
+import org.apache.iotdb.commons.service.metric.PerformanceOverviewMetrics;
+import org.apache.iotdb.db.exception.sql.SemanticException;
+import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
+import org.apache.iotdb.db.queryengine.plan.analyze.schema.ISchemaFetcher;
+import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertBaseStatement;
+import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertMultiTabletsStatement;
+import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowStatement;
+import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsStatement;
+import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
+import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AnalyzeUtils {
+
+ private static final PerformanceOverviewMetrics PERFORMANCE_OVERVIEW_METRICS
=
+ PerformanceOverviewMetrics.getInstance();
+ private static final Logger LOGGER =
LoggerFactory.getLogger(AnalyzeUtils.class);
+
+ private AnalyzeUtils() {
+ // util class
+ }
+
+ public static void validateSchema(
+ IAnalysis analysis, InsertBaseStatement insertStatement,
+ Runnable schemaValidation) {
+ final long startTime = System.nanoTime();
+ try {
+ schemaValidation.run();
+ } catch (SemanticException e) {
+ analysis.setFinishQueryAfterAnalyze(true);
+ if (e.getCause() instanceof IoTDBException) {
+ IoTDBException exception = (IoTDBException) e.getCause();
+ analysis.setFailStatus(
+ RpcUtils.getStatus(exception.getErrorCode(),
exception.getMessage()));
+ } else {
+ analysis.setFailStatus(RpcUtils.getStatus(TSStatusCode.METADATA_ERROR,
e.getMessage()));
+ }
+ } finally {
+
PERFORMANCE_OVERVIEW_METRICS.recordScheduleSchemaValidateCost(System.nanoTime()
- startTime);
+ }
+ boolean hasFailedMeasurement = insertStatement.hasFailedMeasurements();
+ String partialInsertMessage;
+ if (hasFailedMeasurement) {
+ partialInsertMessage =
+ String.format(
+ "Fail to insert measurements %s caused by %s",
+ insertStatement.getFailedMeasurements(),
insertStatement.getFailedMessages());
+ LOGGER.warn(partialInsertMessage);
+ analysis.setFailStatus(
+ RpcUtils.getStatus(TSStatusCode.METADATA_ERROR.getStatusCode(),
partialInsertMessage));
+ }
+ }
+
+ public static InsertBaseStatement removeLogicalView(
+ IAnalysis analysis, InsertBaseStatement insertBaseStatement) {
+ try {
+ return insertBaseStatement.removeLogicalView();
+ } catch (SemanticException e) {
+ analysis.setFinishQueryAfterAnalyze(true);
+ if (e.getCause() instanceof IoTDBException) {
+ IoTDBException exception = (IoTDBException) e.getCause();
+ analysis.setFailStatus(
+ RpcUtils.getStatus(exception.getErrorCode(),
exception.getMessage()));
+ } else {
+ analysis.setFailStatus(RpcUtils.getStatus(TSStatusCode.METADATA_ERROR,
e.getMessage()));
+ }
+ return insertBaseStatement;
+ }
+ }
+
+ /** get analysis according to statement and params */
+ public static Analysis getAnalysisForWriting(
+ Analysis analysis, List<DataPartitionQueryParam>
dataPartitionQueryParams, String userName,
+ IPartitionFetcher partitionFetcher) {
+
+ DataPartition dataPartition =
+ partitionFetcher.getOrCreateDataPartition(dataPartitionQueryParams,
userName);
+ if (dataPartition.isEmpty()) {
+ analysis.setFinishQueryAfterAnalyze(true);
+ analysis.setFailStatus(
+ RpcUtils.getStatus(
+ TSStatusCode.DATABASE_NOT_EXIST.getStatusCode(),
+ "Database not exists and failed to create automatically "
+ + "because enable_auto_create_schema is FALSE."));
+ }
+ analysis.setDataPartitionInfo(dataPartition);
+ return analysis;
+ }
+
+ public static Analysis computeAnalysisForInsertRows(
+ Analysis analysis, InsertRowsStatement insertRowsStatement, String
userName,
+ IPartitionFetcher partitionFetcher) {
+ Map<String, Set<TTimePartitionSlot>> dataPartitionQueryParamMap = new
HashMap<>();
+ for (InsertRowStatement insertRowStatement :
insertRowsStatement.getInsertRowStatementList()) {
+ Set<TTimePartitionSlot> timePartitionSlotSet =
+ dataPartitionQueryParamMap.computeIfAbsent(
+ insertRowStatement.getDevicePath().getFullPath(), k -> new
HashSet<>());
+ timePartitionSlotSet.add(insertRowStatement.getTimePartitionSlot());
+ }
+
+ List<DataPartitionQueryParam> dataPartitionQueryParams = new ArrayList<>();
+ for (Map.Entry<String, Set<TTimePartitionSlot>> entry :
dataPartitionQueryParamMap.entrySet()) {
+ DataPartitionQueryParam dataPartitionQueryParam = new
DataPartitionQueryParam();
+ dataPartitionQueryParam.setDevicePath(entry.getKey());
+ dataPartitionQueryParam.setTimePartitionSlotList(new
ArrayList<>(entry.getValue()));
+ dataPartitionQueryParams.add(dataPartitionQueryParam);
+ }
+
+ return getAnalysisForWriting(analysis, dataPartitionQueryParams, userName,
partitionFetcher);
+ }
+
+ public static Analysis computeAnalysisForMultiTablets(
+ Analysis analysis, InsertMultiTabletsStatement
insertMultiTabletsStatement, String userName
+ , IPartitionFetcher partitionFetcher) {
+ Map<String, Set<TTimePartitionSlot>> dataPartitionQueryParamMap = new
HashMap<>();
+ for (InsertTabletStatement insertTabletStatement :
+ insertMultiTabletsStatement.getInsertTabletStatementList()) {
+ Set<TTimePartitionSlot> timePartitionSlotSet =
+ dataPartitionQueryParamMap.computeIfAbsent(
+ insertTabletStatement.getDevicePath().getFullPath(), k -> new
HashSet<>());
+
timePartitionSlotSet.addAll(insertTabletStatement.getTimePartitionSlots());
+ }
+
+ List<DataPartitionQueryParam> dataPartitionQueryParams = new ArrayList<>();
+ for (Map.Entry<String, Set<TTimePartitionSlot>> entry :
dataPartitionQueryParamMap.entrySet()) {
+ DataPartitionQueryParam dataPartitionQueryParam = new
DataPartitionQueryParam();
+ dataPartitionQueryParam.setDevicePath(entry.getKey());
+ dataPartitionQueryParam.setTimePartitionSlotList(new
ArrayList<>(entry.getValue()));
+ dataPartitionQueryParams.add(dataPartitionQueryParam);
+ }
+
+ return getAnalysisForWriting(analysis, dataPartitionQueryParams, userName,
partitionFetcher);
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
index cf49f83c7ce..af95e958a8c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
@@ -24,7 +24,6 @@ import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
import org.apache.iotdb.commons.client.exception.ClientManagerException;
import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.exception.IllegalPathException;
-import org.apache.iotdb.commons.exception.IoTDBException;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.partition.DataPartition;
import org.apache.iotdb.commons.partition.DataPartitionQueryParam;
@@ -185,6 +184,11 @@ import static
org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant
import static
org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant.ENDTIME;
import static
org.apache.iotdb.db.queryengine.metric.QueryPlanCostMetricSet.PARTITION_FETCHER;
import static
org.apache.iotdb.db.queryengine.metric.QueryPlanCostMetricSet.SCHEMA_FETCHER;
+import static
org.apache.iotdb.db.queryengine.plan.analyze.AnalyzeUtils.computeAnalysisForInsertRows;
+import static
org.apache.iotdb.db.queryengine.plan.analyze.AnalyzeUtils.computeAnalysisForMultiTablets;
+import static
org.apache.iotdb.db.queryengine.plan.analyze.AnalyzeUtils.getAnalysisForWriting;
+import static
org.apache.iotdb.db.queryengine.plan.analyze.AnalyzeUtils.removeLogicalView;
+import static
org.apache.iotdb.db.queryengine.plan.analyze.AnalyzeUtils.validateSchema;
import static
org.apache.iotdb.db.queryengine.plan.analyze.ExpressionAnalyzer.bindSchemaForExpression;
import static
org.apache.iotdb.db.queryengine.plan.analyze.ExpressionAnalyzer.concatDeviceAndBindSchemaForExpression;
import static
org.apache.iotdb.db.queryengine.plan.analyze.ExpressionAnalyzer.getMeasurementExpression;
@@ -2616,7 +2620,8 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
context.setQueryType(QueryType.WRITE);
insertTabletStatement.semanticCheck();
Analysis analysis = new Analysis();
- validateSchema(analysis, insertTabletStatement, context);
+ validateSchema(analysis, insertTabletStatement,
+ () -> SchemaValidator.validate(schemaFetcher, insertTabletStatement,
context));
InsertBaseStatement realStatement = removeLogicalView(analysis,
insertTabletStatement);
if (analysis.isFinishQueryAfterAnalyze()) {
return analysis;
@@ -2634,12 +2639,12 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
return getAnalysisForWriting(
analysis,
Collections.singletonList(dataPartitionQueryParam),
- context.getSession().getUserName());
+ context.getSession().getUserName(), partitionFetcher);
} else {
return computeAnalysisForMultiTablets(
analysis,
(InsertMultiTabletsStatement) realStatement,
- context.getSession().getUserName());
+ context.getSession().getUserName(), partitionFetcher);
}
}
@@ -2648,7 +2653,8 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
context.setQueryType(QueryType.WRITE);
insertRowStatement.semanticCheck();
Analysis analysis = new Analysis();
- validateSchema(analysis, insertRowStatement, context);
+ validateSchema(analysis, insertRowStatement,
+ () -> SchemaValidator.validate(schemaFetcher, insertRowStatement,
context));
InsertBaseStatement realInsertStatement = removeLogicalView(analysis,
insertRowStatement);
if (analysis.isFinishQueryAfterAnalyze()) {
return analysis;
@@ -2665,41 +2671,22 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
return getAnalysisForWriting(
analysis,
Collections.singletonList(dataPartitionQueryParam),
- context.getSession().getUserName());
+ context.getSession().getUserName(), partitionFetcher);
} else {
return computeAnalysisForInsertRows(
- analysis, (InsertRowsStatement) realInsertStatement,
context.getSession().getUserName());
+ analysis, (InsertRowsStatement) realInsertStatement,
context.getSession().getUserName()
+ , partitionFetcher);
}
}
- private Analysis computeAnalysisForInsertRows(
- Analysis analysis, InsertRowsStatement insertRowsStatement, String
userName) {
- Map<String, Set<TTimePartitionSlot>> dataPartitionQueryParamMap = new
HashMap<>();
- for (InsertRowStatement insertRowStatement :
insertRowsStatement.getInsertRowStatementList()) {
- Set<TTimePartitionSlot> timePartitionSlotSet =
- dataPartitionQueryParamMap.computeIfAbsent(
- insertRowStatement.getDevicePath().getFullPath(), k -> new
HashSet<>());
- timePartitionSlotSet.add(insertRowStatement.getTimePartitionSlot());
- }
-
- List<DataPartitionQueryParam> dataPartitionQueryParams = new ArrayList<>();
- for (Map.Entry<String, Set<TTimePartitionSlot>> entry :
dataPartitionQueryParamMap.entrySet()) {
- DataPartitionQueryParam dataPartitionQueryParam = new
DataPartitionQueryParam();
- dataPartitionQueryParam.setDevicePath(entry.getKey());
- dataPartitionQueryParam.setTimePartitionSlotList(new
ArrayList<>(entry.getValue()));
- dataPartitionQueryParams.add(dataPartitionQueryParam);
- }
-
- return getAnalysisForWriting(analysis, dataPartitionQueryParams, userName);
- }
-
@Override
public Analysis visitInsertRows(
InsertRowsStatement insertRowsStatement, MPPQueryContext context) {
context.setQueryType(QueryType.WRITE);
insertRowsStatement.semanticCheck();
Analysis analysis = new Analysis();
- validateSchema(analysis, insertRowsStatement, context);
+ validateSchema(analysis, insertRowsStatement,
+ () -> SchemaValidator.validate(schemaFetcher, insertRowsStatement,
context));
InsertRowsStatement realInsertRowsStatement =
(InsertRowsStatement) removeLogicalView(analysis, insertRowsStatement);
if (analysis.isFinishQueryAfterAnalyze()) {
@@ -2708,29 +2695,7 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
analysis.setStatement(realInsertRowsStatement);
return computeAnalysisForInsertRows(
- analysis, realInsertRowsStatement, context.getSession().getUserName());
- }
-
- private Analysis computeAnalysisForMultiTablets(
- Analysis analysis, InsertMultiTabletsStatement
insertMultiTabletsStatement, String userName) {
- Map<String, Set<TTimePartitionSlot>> dataPartitionQueryParamMap = new
HashMap<>();
- for (InsertTabletStatement insertTabletStatement :
- insertMultiTabletsStatement.getInsertTabletStatementList()) {
- Set<TTimePartitionSlot> timePartitionSlotSet =
- dataPartitionQueryParamMap.computeIfAbsent(
- insertTabletStatement.getDevicePath().getFullPath(), k -> new
HashSet<>());
-
timePartitionSlotSet.addAll(insertTabletStatement.getTimePartitionSlots());
- }
-
- List<DataPartitionQueryParam> dataPartitionQueryParams = new ArrayList<>();
- for (Map.Entry<String, Set<TTimePartitionSlot>> entry :
dataPartitionQueryParamMap.entrySet()) {
- DataPartitionQueryParam dataPartitionQueryParam = new
DataPartitionQueryParam();
- dataPartitionQueryParam.setDevicePath(entry.getKey());
- dataPartitionQueryParam.setTimePartitionSlotList(new
ArrayList<>(entry.getValue()));
- dataPartitionQueryParams.add(dataPartitionQueryParam);
- }
-
- return getAnalysisForWriting(analysis, dataPartitionQueryParams, userName);
+ analysis, realInsertRowsStatement, context.getSession().getUserName(),
partitionFetcher);
}
@Override
@@ -2739,7 +2704,8 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
context.setQueryType(QueryType.WRITE);
insertMultiTabletsStatement.semanticCheck();
Analysis analysis = new Analysis();
- validateSchema(analysis, insertMultiTabletsStatement, context);
+ validateSchema(analysis, insertMultiTabletsStatement,
+ () -> SchemaValidator.validate(schemaFetcher,
insertMultiTabletsStatement, context));
InsertMultiTabletsStatement realStatement =
(InsertMultiTabletsStatement) removeLogicalView(analysis,
insertMultiTabletsStatement);
if (analysis.isFinishQueryAfterAnalyze()) {
@@ -2748,7 +2714,7 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
analysis.setStatement(realStatement);
return computeAnalysisForMultiTablets(
- analysis, realStatement, context.getSession().getUserName());
+ analysis, realStatement, context.getSession().getUserName(),
partitionFetcher);
}
@Override
@@ -2757,7 +2723,8 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
context.setQueryType(QueryType.WRITE);
insertRowsOfOneDeviceStatement.semanticCheck();
Analysis analysis = new Analysis();
- validateSchema(analysis, insertRowsOfOneDeviceStatement, context);
+ validateSchema(analysis, insertRowsOfOneDeviceStatement,
+ () -> SchemaValidator.validate(schemaFetcher,
insertRowsOfOneDeviceStatement, context));
InsertBaseStatement realInsertStatement =
removeLogicalView(analysis, insertRowsOfOneDeviceStatement);
if (analysis.isFinishQueryAfterAnalyze()) {
@@ -2775,10 +2742,12 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
return getAnalysisForWriting(
analysis,
Collections.singletonList(dataPartitionQueryParam),
- context.getSession().getUserName());
+ context.getSession().getUserName(),
+ partitionFetcher);
} else {
return computeAnalysisForInsertRows(
- analysis, (InsertRowsStatement) realInsertStatement,
context.getSession().getUserName());
+ analysis, (InsertRowsStatement) realInsertStatement,
context.getSession().getUserName()
+ , partitionFetcher);
}
}
@@ -2793,53 +2762,6 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
return analysis;
}
- private void validateSchema(
- Analysis analysis, InsertBaseStatement insertStatement, MPPQueryContext
context) {
- final long startTime = System.nanoTime();
- try {
- SchemaValidator.validate(schemaFetcher, insertStatement, context);
- } catch (SemanticException e) {
- analysis.setFinishQueryAfterAnalyze(true);
- if (e.getCause() instanceof IoTDBException) {
- IoTDBException exception = (IoTDBException) e.getCause();
- analysis.setFailStatus(
- RpcUtils.getStatus(exception.getErrorCode(),
exception.getMessage()));
- } else {
- analysis.setFailStatus(RpcUtils.getStatus(TSStatusCode.METADATA_ERROR,
e.getMessage()));
- }
- } finally {
-
PERFORMANCE_OVERVIEW_METRICS.recordScheduleSchemaValidateCost(System.nanoTime()
- startTime);
- }
- boolean hasFailedMeasurement = insertStatement.hasFailedMeasurements();
- String partialInsertMessage;
- if (hasFailedMeasurement) {
- partialInsertMessage =
- String.format(
- "Fail to insert measurements %s caused by %s",
- insertStatement.getFailedMeasurements(),
insertStatement.getFailedMessages());
- logger.warn(partialInsertMessage);
- analysis.setFailStatus(
- RpcUtils.getStatus(TSStatusCode.METADATA_ERROR.getStatusCode(),
partialInsertMessage));
- }
- }
-
- private InsertBaseStatement removeLogicalView(
- Analysis analysis, InsertBaseStatement insertBaseStatement) {
- try {
- return insertBaseStatement.removeLogicalView();
- } catch (SemanticException e) {
- analysis.setFinishQueryAfterAnalyze(true);
- if (e.getCause() instanceof IoTDBException) {
- IoTDBException exception = (IoTDBException) e.getCause();
- analysis.setFailStatus(
- RpcUtils.getStatus(exception.getErrorCode(),
exception.getMessage()));
- } else {
- analysis.setFailStatus(RpcUtils.getStatus(TSStatusCode.METADATA_ERROR,
e.getMessage()));
- }
- return insertBaseStatement;
- }
- }
-
@Override
public Analysis visitLoadFile(LoadTsFileStatement loadTsFileStatement,
MPPQueryContext context) {
LoadTsfileAnalyzer loadTsfileAnalyzer =
@@ -2851,24 +2773,6 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
}
}
- /** get analysis according to statement and params */
- private Analysis getAnalysisForWriting(
- Analysis analysis, List<DataPartitionQueryParam>
dataPartitionQueryParams, String userName) {
-
- DataPartition dataPartition =
- partitionFetcher.getOrCreateDataPartition(dataPartitionQueryParams,
userName);
- if (dataPartition.isEmpty()) {
- analysis.setFinishQueryAfterAnalyze(true);
- analysis.setFailStatus(
- RpcUtils.getStatus(
- TSStatusCode.DATABASE_NOT_EXIST.getStatusCode(),
- "Database not exists and failed to create automatically "
- + "because enable_auto_create_schema is FALSE."));
- }
- analysis.setDataPartitionInfo(dataPartition);
- return analysis;
- }
-
private boolean analyzeTimeseriesRegionScan(
WhereCondition timeCondition,
PathPatternTree patternTree,
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/IAnalysis.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/IAnalysis.java
index 207dea7b9f3..bb3821aef6e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/IAnalysis.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/IAnalysis.java
@@ -42,4 +42,12 @@ public interface IAnalysis {
DatasetHeader getRespDatasetHeader();
String getStatementType();
+
+ default void setFinishQueryAfterAnalyze(boolean b) {
+
+ }
+
+ default void setFailStatus(TSStatus status) {
+
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ISchemaAutoCreation.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ISchemaAutoCreation.java
index e54e8a37d9e..55f8dd5210c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ISchemaAutoCreation.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ISchemaAutoCreation.java
@@ -39,7 +39,8 @@ public interface ISchemaAutoCreation {
TSDataType getDataType(int index);
+ //TODO: Check necessity because all implementations return null
TSEncoding getEncoding(int index);
-
+ //TODO: Check necessity because all implementations return null
CompressionType getCompressionType(int index);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/SchemaValidator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/SchemaValidator.java
index 7bbdd5cefc6..1406698cc42 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/SchemaValidator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/SchemaValidator.java
@@ -24,6 +24,8 @@ import
org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.exception.sql.SemanticException;
import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
import org.apache.iotdb.db.queryengine.common.schematree.ISchemaTree;
+import org.apache.iotdb.db.queryengine.plan.relational.metadata.Metadata;
+import
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.WrappedInsertStatement;
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertBaseStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertMultiTabletsStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsOfOneDeviceStatement;
@@ -55,6 +57,18 @@ public class SchemaValidator {
}
}
+ public static void validate(
+ Metadata metadata, WrappedInsertStatement insertStatement,
MPPQueryContext context) {
+ try {
+
metadata.fetchAndComputeSchemaWithAutoCreate(insertStatement.getSchemaValidationList(),
+ context);
+ insertStatement.updateAfterSchemaValidation(context);
+ } catch (QueryProcessException e) {
+ throw new SemanticException(e.getMessage());
+ }
+ }
+
+
public static ISchemaTree validate(
ISchemaFetcher schemaFetcher,
List<PartialPath> devicePaths,
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/StatementGenerator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/StatementGenerator.java
index 43a117002c6..288f54fd77c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/StatementGenerator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/StatementGenerator.java
@@ -35,6 +35,7 @@ import
org.apache.iotdb.db.queryengine.plan.expression.leaf.ConstantOperand;
import org.apache.iotdb.db.queryengine.plan.expression.leaf.TimeSeriesOperand;
import org.apache.iotdb.db.queryengine.plan.expression.leaf.TimestampOperand;
import
org.apache.iotdb.db.queryengine.plan.expression.multi.FunctionExpression;
+import
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ColumnDefinition.ColumnCategory;
import org.apache.iotdb.db.queryengine.plan.statement.Statement;
import org.apache.iotdb.db.queryengine.plan.statement.component.FromComponent;
import
org.apache.iotdb.db.queryengine.plan.statement.component.GroupByTimeComponent;
@@ -335,8 +336,19 @@ public class StatementGenerator {
}
insertStatement.setDataTypes(dataTypes);
insertStatement.setAligned(insertTabletReq.isAligned);
- PERFORMANCE_OVERVIEW_METRICS.recordParseCost(System.nanoTime() -
startTime);
insertStatement.setWriteToTable(insertTabletReq.isWriteToTable());
+ if (insertTabletReq.isWriteToTable()) {
+ if (!insertTabletReq.isSetColumnCategories() ||
insertTabletReq.getColumnCategoriesSize() !=
insertTabletReq.getMeasurementsSize()) {
+ throw new IllegalArgumentException("Missing or invalid column
categories for table "
+ + "insertion");
+ }
+ ColumnCategory[] columnCategories = new
ColumnCategory[insertTabletReq.columnCategories.size()];
+ for (int i = 0; i < columnCategories.length; i++) {
+ columnCategories[i] =
ColumnCategory.values()[insertTabletReq.getColumnCategories().get(i)];
+ }
+ insertStatement.setColumnCategories(columnCategories);
+ }
+ PERFORMANCE_OVERVIEW_METRICS.recordParseCost(System.nanoTime() -
startTime);
return insertStatement;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/StatementAnalyzer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/StatementAnalyzer.java
index a15ed482daa..75e7153513a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/StatementAnalyzer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/StatementAnalyzer.java
@@ -19,11 +19,17 @@
package org.apache.iotdb.db.queryengine.plan.relational.analyzer;
+import java.util.Collections;
+import org.apache.iotdb.commons.partition.DataPartitionQueryParam;
import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
import org.apache.iotdb.db.exception.sql.SemanticException;
+import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
import org.apache.iotdb.db.queryengine.common.SessionInfo;
import org.apache.iotdb.db.queryengine.execution.warnings.IoTDBWarning;
import org.apache.iotdb.db.queryengine.execution.warnings.WarningCollector;
+import org.apache.iotdb.db.queryengine.plan.analyze.IAnalysis;
+import org.apache.iotdb.db.queryengine.plan.analyze.QueryType;
+import org.apache.iotdb.db.queryengine.plan.analyze.schema.SchemaValidator;
import org.apache.iotdb.db.queryengine.plan.relational.metadata.ColumnSchema;
import org.apache.iotdb.db.queryengine.plan.relational.metadata.Metadata;
import
org.apache.iotdb.db.queryengine.plan.relational.metadata.QualifiedObjectName;
@@ -57,6 +63,7 @@ import
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.GroupingElement;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.GroupingSets;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Identifier;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Insert;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.InsertTablet;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Intersect;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Join;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.JoinCriteria;
@@ -106,6 +113,9 @@ import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.ListMultimap;
import com.google.common.collect.Streams;
+import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertBaseStatement;
+import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertMultiTabletsStatement;
+import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
import org.apache.tsfile.read.common.type.RowType;
import org.apache.tsfile.read.common.type.Type;
@@ -134,6 +144,10 @@ import static java.util.Collections.emptyList;
import static java.util.Locale.ENGLISH;
import static java.util.Objects.requireNonNull;
import static
org.apache.iotdb.db.queryengine.execution.warnings.StandardWarningCode.REDUNDANT_ORDER_BY;
+import static
org.apache.iotdb.db.queryengine.plan.analyze.AnalyzeUtils.computeAnalysisForMultiTablets;
+import static
org.apache.iotdb.db.queryengine.plan.analyze.AnalyzeUtils.getAnalysisForWriting;
+import static
org.apache.iotdb.db.queryengine.plan.analyze.AnalyzeUtils.removeLogicalView;
+import static
org.apache.iotdb.db.queryengine.plan.analyze.AnalyzeUtils.validateSchema;
import static
org.apache.iotdb.db.queryengine.plan.relational.analyzer.AggregationAnalyzer.verifyOrderByAggregations;
import static
org.apache.iotdb.db.queryengine.plan.relational.analyzer.AggregationAnalyzer.verifySourceAggregations;
import static
org.apache.iotdb.db.queryengine.plan.relational.analyzer.CanonicalizationAware.canonicalizationAwareKey;
@@ -153,7 +167,7 @@ public class StatementAnalyzer {
private final StatementAnalyzerFactory statementAnalyzerFactory;
- private final Analysis analysis;
+ private Analysis analysis;
private final AccessControl accessControl;
@@ -212,6 +226,7 @@ public class StatementAnalyzer {
* scopes hierarchy should always have outer query scope (if provided) as
ancestor.
*/
private final class Visitor extends AstVisitor<Scope, Optional<Scope>> {
+
private final boolean isTopLevel;
private final Optional<Scope> outerQueryScope;
private final WarningCollector warningCollector;
@@ -352,6 +367,46 @@ public class StatementAnalyzer {
throw new SemanticException("Insert statement is not supported yet.");
}
+ protected Scope visitInsertTablet(InsertTablet insert, Optional<Scope>
scope) {
+ final Scope ret = Scope.create();
+
+ final MPPQueryContext context = insert.getContext();
+ final InsertTabletStatement insertTabletStatement =
insert.getInnerTreeStatement();
+ context.setQueryType(QueryType.WRITE);
+ insertTabletStatement.semanticCheck();
+ IAnalysis analysis = new
org.apache.iotdb.db.queryengine.plan.analyze.Analysis();
+
+ validateSchema(analysis, insertTabletStatement,
+ () -> SchemaValidator.validate(metadata, insert, context));
+ InsertBaseStatement realStatement = removeLogicalView(analysis,
insertTabletStatement);
+ if (analysis.isFinishQueryAfterAnalyze()) {
+ return ret;
+ }
+ analysis.setStatement(realStatement);
+
+ if (realStatement instanceof InsertTabletStatement) {
+ InsertTabletStatement realInsertTabletStatement =
(InsertTabletStatement) realStatement;
+ DataPartitionQueryParam dataPartitionQueryParam = new
DataPartitionQueryParam();
+ dataPartitionQueryParam.setDevicePath(
+ realInsertTabletStatement.getDevicePath().getFullPath());
+ dataPartitionQueryParam.setTimePartitionSlotList(
+ realInsertTabletStatement.getTimePartitionSlots());
+
+ analysis = getAnalysisForWriting(
+ analysis,
+ Collections.singletonList(dataPartitionQueryParam),
+ context.getSession().getUserName(), partitionFetcher);
+ } else {
+ analysis = computeAnalysisForMultiTablets(
+ analysis,
+ (InsertMultiTabletsStatement) realStatement,
+ context.getSession().getUserName(), partitionFetcher);
+ }
+ // TODO-TableIngestion: use IAnalysis
+ // StatementAnalyzer.this.analysis = analysis;
+ return ret;
+ }
+
@Override
protected Scope visitDelete(Delete node, Optional<Scope> scope) {
throw new SemanticException("Delete statement is not supported yet.");
@@ -1965,7 +2020,7 @@ public class StatementAnalyzer {
/**
* @return true if the Query / QuerySpecification containing the analyzed
Limit or FetchFirst,
- * must contain orderBy (i.e., for FetchFirst with ties).
+ * must contain orderBy (i.e., for FetchFirst with ties).
*/
private boolean analyzeLimit(Node node, Scope scope) {
// checkState(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/Metadata.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/Metadata.java
index f37aabbce8b..df047d7d652 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/Metadata.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/Metadata.java
@@ -19,7 +19,9 @@
package org.apache.iotdb.db.queryengine.plan.relational.metadata;
+import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
import org.apache.iotdb.db.queryengine.common.SessionInfo;
+import
org.apache.iotdb.db.queryengine.plan.analyze.schema.ISchemaComputationWithAutoCreation;
import org.apache.iotdb.db.queryengine.plan.relational.function.OperatorType;
import org.apache.iotdb.db.queryengine.plan.relational.security.AccessControl;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Expression;
@@ -67,4 +69,19 @@ public interface Metadata {
QualifiedObjectName tableName,
List<Expression> expressionList,
List<String> attributeColumns);
+
+ /**
+ * Fetch and compute the schema of target timeseries, with device and
measurement defined in given
+ * schemaComputationWithAutoCreation. The computation defined in given
+ * schemaComputationWithAutoCreation will be executed during scanning the
fetched schema. If some
+ * target timeseries doesn't exist, they will be auto created.
+ *
+ * @param schemaComputationWithAutoCreationList define the target devices,
measurements and
+ * computation
+ */
+ default void fetchAndComputeSchemaWithAutoCreate(
+ List<? extends ISchemaComputationWithAutoCreation>
schemaComputationWithAutoCreationList,
+ MPPQueryContext context) {
+ throw new UnsupportedOperationException();
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/AstVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/AstVisitor.java
index d2c1e487458..f2476e59a95 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/AstVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/AstVisitor.java
@@ -371,6 +371,10 @@ public abstract class AstVisitor<R, C> {
return visitStatement(node, context);
}
+ protected R visitInsertTablet(InsertTablet node, C context) {
+ return visitStatement(node, context);
+ }
+
protected R visitDelete(Delete node, C context) {
return visitStatement(node, context);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/InsertTablet.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/InsertTablet.java
index 2b858bdb8a7..38a423fae21 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/InsertTablet.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/InsertTablet.java
@@ -17,13 +17,99 @@
* under the License.
*/
-package org.apache.iotdb.db.queryengine.plan.relational.sql.tree;
+package org.apache.iotdb.db.queryengine.plan.relational.sql.ast;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.iotdb.commons.schema.view.LogicalViewSchema;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
+import
org.apache.iotdb.db.queryengine.common.schematree.IMeasurementSchemaInfo;
+import
org.apache.iotdb.db.queryengine.plan.analyze.schema.ISchemaComputationWithAutoCreation;
import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
+import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.utils.Pair;
-public class InsertTablet extends WrappedStatement {
+public class InsertTablet extends WrappedInsertStatement {
- public InsertTablet(InsertTabletStatement insertTabletStatement) {
- super(insertTabletStatement);
+ private InsertTabletStatement insertTabletStatement;
+
+ public InsertTablet(InsertTabletStatement insertTabletStatement,
MPPQueryContext context) {
+ super(insertTabletStatement, context);
+ this.insertTabletStatement = insertTabletStatement;
+ }
+
+ @Override
+ public <R, C> R accept(AstVisitor<R, C> visitor, C context) {
+ return visitor.visitInsertTablet(this, context);
+ }
+
+ @Override
+ public InsertTabletStatement getInnerTreeStatement() {
+ return insertTabletStatement;
+ }
+
+ @Override
+ public List<ISchemaComputationWithAutoCreation> getSchemaValidationList() {
+ Map<IDeviceID, ISchemaComputationWithAutoCreation> map = new HashMap<>();
+ for (int i = 0; i < insertTabletStatement.getRowCount(); i++) {
+ map.computeIfAbsent(insertTabletStatement.getTableDeviceID(i),
this::getSchemaComputation);
+ }
+ return new ArrayList<>(map.values());
+ }
+
+ @Override
+ public void updateAfterSchemaValidation(MPPQueryContext context) throws
QueryProcessException {
+ insertTabletStatement.updateAfterSchemaValidation(context);
+ }
+
+ @Override
+ public ISchemaComputationWithAutoCreation getSchemaComputation(IDeviceID
deviceID) {
+ return new SchemaExecutions(deviceID);
+ }
+
+ public class SchemaExecutions extends BasicSchemaExecutions {
+
+ public SchemaExecutions(IDeviceID deviceID) {
+ super(deviceID);
+ }
+
+ @Override
+ public void computeMeasurement(int index, IMeasurementSchemaInfo
measurementSchemaInfo) {
+ insertTabletStatement.computeMeasurement(index, measurementSchemaInfo);
+ }
+
+ @Override
+ public boolean hasLogicalViewNeedProcess() {
+ return insertTabletStatement.hasLogicalViewNeedProcess();
+ }
+
+ @Override
+ public List<LogicalViewSchema> getLogicalViewSchemaList() {
+ return insertTabletStatement.getLogicalViewSchemaList();
+ }
+
+ @Override
+ public List<Integer> getIndexListOfLogicalViewPaths() {
+ return insertTabletStatement.getIndexListOfLogicalViewPaths();
+ }
+
+ @Override
+ public void recordRangeOfLogicalViewSchemaListNow() {
+ insertTabletStatement.recordRangeOfLogicalViewSchemaListNow();
+ }
+
+ @Override
+ public Pair<Integer, Integer> getRangeOfLogicalViewSchemaListRecorded() {
+ return insertTabletStatement.getRangeOfLogicalViewSchemaListRecorded();
+ }
+
+ @Override
+ public void computeMeasurementOfView(int index, IMeasurementSchemaInfo
measurementSchemaInfo,
+ boolean isAligned) {
+ insertTabletStatement.computeMeasurementOfView(index,
measurementSchemaInfo, isAligned);
+ }
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/WrappedInsertStatement.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/WrappedInsertStatement.java
new file mode 100644
index 00000000000..e57e7b55c40
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/WrappedInsertStatement.java
@@ -0,0 +1,89 @@
+package org.apache.iotdb.db.queryengine.plan.relational.sql.ast;
+
+import java.util.List;
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.schema.view.LogicalViewSchema;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
+import
org.apache.iotdb.db.queryengine.common.schematree.IMeasurementSchemaInfo;
+import org.apache.iotdb.db.queryengine.execution.schedule.queue.ID;
+import
org.apache.iotdb.db.queryengine.plan.analyze.schema.ISchemaComputationWithAutoCreation;
+import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertBaseStatement;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.file.metadata.enums.CompressionType;
+import org.apache.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.tsfile.utils.Pair;
+
+public abstract class WrappedInsertStatement extends WrappedStatement {
+
+ protected InsertBaseStatement insertStatement;
+
+ public WrappedInsertStatement(
+ InsertBaseStatement innerTreeStatement,
+ MPPQueryContext context) {
+ super(innerTreeStatement, context);
+ this.insertStatement = innerTreeStatement;
+ }
+
+ @Override
+ public InsertBaseStatement getInnerTreeStatement() {
+ return insertStatement;
+ }
+
+ public abstract List<ISchemaComputationWithAutoCreation>
getSchemaValidationList();
+
+ public abstract void updateAfterSchemaValidation(MPPQueryContext context)
throws QueryProcessException;
+
+ public abstract ISchemaComputationWithAutoCreation
getSchemaComputation(IDeviceID deviceID);
+
+ public abstract class BasicSchemaExecutions implements
ISchemaComputationWithAutoCreation {
+ protected IDeviceID deviceID;
+
+ public BasicSchemaExecutions(IDeviceID deviceID) {
+ this.deviceID = deviceID;
+ }
+
+ @Override
+ public boolean isAligned() {
+ return true;
+ }
+
+ @Override
+ public TSDataType getDataType(int index) {
+ return insertStatement.getDataTypes()[index];
+ }
+
+ @Override
+ public TSEncoding getEncoding(int index) {
+ return null;
+ }
+
+ @Override
+ public CompressionType getCompressionType(int index) {
+ return null;
+ }
+
+ @Override
+ public PartialPath getDevicePath() {
+ //TODO-TableInsertion: use deviceId
+ try {
+ return new PartialPath(deviceID);
+ } catch (IllegalPathException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public String[] getMeasurements() {
+ return insertStatement.getMeasurements();
+ }
+
+ @Override
+ public void computeDevice(boolean isAligned) {
+ // ignored, table device must be aligned
+ }
+
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/WrappedStatement.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/WrappedStatement.java
index 62c7a7dee77..380388ee141 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/WrappedStatement.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/WrappedStatement.java
@@ -17,18 +17,22 @@
* under the License.
*/
-package org.apache.iotdb.db.queryengine.plan.relational.sql.tree;
+package org.apache.iotdb.db.queryengine.plan.relational.sql.ast;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
+import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
public abstract class WrappedStatement extends Statement {
- private final org.apache.iotdb.db.queryengine.plan.statement.Statement
innerTreeStatement;
+ protected final org.apache.iotdb.db.queryengine.plan.statement.Statement
innerTreeStatement;
+ protected final MPPQueryContext context;
- public
WrappedStatement(org.apache.iotdb.db.queryengine.plan.statement.Statement
innerTreeStatement) {
+ public
WrappedStatement(org.apache.iotdb.db.queryengine.plan.statement.Statement
innerTreeStatement,
+ MPPQueryContext context) {
super(null);
this.innerTreeStatement = innerTreeStatement;
+ this.context = context;
}
@Override
@@ -57,4 +61,12 @@ public abstract class WrappedStatement extends Statement {
public String toString() {
return innerTreeStatement.toString();
}
+
+ public org.apache.iotdb.db.queryengine.plan.statement.Statement
getInnerTreeStatement() {
+ return innerTreeStatement;
+ }
+
+ public MPPQueryContext getContext() {
+ return context;
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/Statement.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/Statement.java
index a19c61439c0..d7aac386a88 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/Statement.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/Statement.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.queryengine.plan.statement;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.auth.AuthorityChecker;
+import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
import org.apache.iotdb.db.queryengine.plan.parser.ASTVisitor;
import java.util.List;
@@ -67,7 +68,8 @@ public abstract class Statement extends StatementNode {
"Only the admin user can perform this operation");
}
- public org.apache.iotdb.db.queryengine.plan.relational.sql.tree.Statement
toRelationalStatement() {
+ public org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Statement
toRelationalStatement(
+ MPPQueryContext context) {
throw new UnsupportedOperationException("Method not implemented yet");
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java
index 35a0a6f18f6..53b60b67494 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java
@@ -26,13 +26,19 @@ import org.apache.iotdb.commons.utils.TimePartitionUtils;
import org.apache.iotdb.db.exception.metadata.DataTypeMismatchException;
import org.apache.iotdb.db.exception.metadata.PathNotExistException;
import org.apache.iotdb.db.exception.sql.SemanticException;
+import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
import
org.apache.iotdb.db.queryengine.common.schematree.IMeasurementSchemaInfo;
import org.apache.iotdb.db.queryengine.plan.analyze.schema.ISchemaValidation;
+import
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ColumnDefinition.ColumnCategory;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Statement;
import org.apache.iotdb.db.queryengine.plan.statement.StatementType;
import org.apache.iotdb.db.queryengine.plan.statement.StatementVisitor;
import org.apache.iotdb.db.utils.CommonUtils;
import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.file.metadata.IDeviceID.Factory;
+import org.apache.tsfile.file.metadata.StringArrayDeviceID;
import org.apache.tsfile.file.metadata.enums.CompressionType;
import org.apache.tsfile.file.metadata.enums.TSEncoding;
import org.apache.tsfile.utils.Binary;
@@ -60,6 +66,10 @@ public class InsertTabletStatement extends
InsertBaseStatement implements ISchem
private BitMap[] bitMaps;
private Object[] columns;
+ private ColumnCategory[] columnCategories;
+ private List<Integer> idColumnIndices;
+ private IDeviceID[] deviceIDs;
+
private int rowCount = 0;
/**
@@ -409,4 +419,45 @@ public class InsertTabletStatement extends
InsertBaseStatement implements ISchem
public void setWriteToTable(boolean writeToTable) {
this.writeToTable = writeToTable;
}
+
+ @Override
+ public Statement toRelationalStatement(MPPQueryContext context) {
+ return super.toRelationalStatement(context);
+ }
+
+ public ColumnCategory[] getColumnCategories() {
+ return columnCategories;
+ }
+
+ public void setColumnCategories(
+ ColumnCategory[] columnCategories) {
+ this.columnCategories = columnCategories;
+ idColumnIndices = new ArrayList<>();
+ for (int i = 0; i < columnCategories.length; i++) {
+ if (columnCategories[i].equals(ColumnCategory.ID)) {
+ idColumnIndices.add(i);
+ }
+ }
+ }
+
+ public List<Integer> getIdColumnIndices() {
+ return idColumnIndices;
+ }
+
+ public IDeviceID getTableDeviceID(int rowIdx) {
+ if (deviceIDs == null) {
+ deviceIDs = new IDeviceID[rowCount];
+ }
+ if (deviceIDs[rowIdx] == null) {
+ String[] deviceIdSegments = new String[idColumnIndices.size() + 1];
+ deviceIdSegments[0] = this.devicePath.getFullPath();
+ for (int i = 0; i < idColumnIndices.size(); i++) {
+ final Integer columnIndex = idColumnIndices.get(i);
+ deviceIdSegments[i + 1] = ((Binary[])
columns[columnIndex])[rowIdx].toString();
+ }
+ deviceIDs[rowIdx] = Factory.DEFAULT_FACTORY.create(deviceIdSegments);
+ }
+
+ return deviceIDs[rowIdx];
+ }
}
diff --git a/iotdb-protocol/thrift-datanode/src/main/thrift/client.thrift
b/iotdb-protocol/thrift-datanode/src/main/thrift/client.thrift
index 2838cd82128..4b760e27cef 100644
--- a/iotdb-protocol/thrift-datanode/src/main/thrift/client.thrift
+++ b/iotdb-protocol/thrift-datanode/src/main/thrift/client.thrift
@@ -234,6 +234,7 @@ struct TSInsertTabletReq {
7: required i32 size
8: optional bool isAligned
9: optional bool writeToTable
+ 10: optional list<byte> columnCategories
}
struct TSInsertTabletsReq {