This is an automated email from the ASF dual-hosted git repository. jiangtian pushed a commit to branch TableModelIngestion in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 4707dfd7f4d231ef5caf8defc9368b6cad3c3e1c Author: jt2594838 <[email protected]> AuthorDate: Tue Jun 11 18:40:36 2024 +0800 add schema execution adaption --- .../protocol/thrift/impl/ClientRPCServiceImpl.java | 2 +- .../iotdb/db/queryengine/plan/Coordinator.java | 47 ++++++ .../db/queryengine/plan/analyze/AnalyzeUtils.java | 168 +++++++++++++++++++++ .../queryengine/plan/analyze/AnalyzeVisitor.java | 148 ++++-------------- .../db/queryengine/plan/analyze/IAnalysis.java | 8 + .../plan/analyze/schema/ISchemaAutoCreation.java | 3 +- .../plan/analyze/schema/SchemaValidator.java | 14 ++ .../plan/parser/StatementGenerator.java | 14 +- .../relational/analyzer/StatementAnalyzer.java | 59 +++++++- .../plan/relational/metadata/Metadata.java | 17 +++ .../plan/relational/sql/ast/AstVisitor.java | 4 + .../plan/relational/sql/ast/InsertTablet.java | 94 +++++++++++- .../relational/sql/ast/WrappedInsertStatement.java | 89 +++++++++++ .../plan/relational/sql/ast/WrappedStatement.java | 18 ++- .../db/queryengine/plan/statement/Statement.java | 4 +- .../plan/statement/crud/InsertTabletStatement.java | 51 +++++++ .../thrift-datanode/src/main/thrift/client.thrift | 1 + 17 files changed, 606 insertions(+), 135 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java index 5a35238718d..08236745e6f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java @@ -2072,7 +2072,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { long queryId = SESSION_MANAGER.requestQueryId(); ExecutionResult result; if (statement.isWriteToTable()) { - result = COORDINATOR.executeForTableModel(statement.toRelationalStatement(), + result = COORDINATOR.executeForTableModel(statement, relationSqlParser, clientSession, queryId, SESSION_MANAGER.getSessionInfo(clientSession), "", metadata, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java index 0ace106a035..fe20273f183 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java @@ -256,6 +256,53 @@ public class Coordinator { startTime))); } + public ExecutionResult executeForTableModel( + Statement statement, + SqlParser sqlParser, + IClientSession clientSession, + long queryId, + SessionInfo session, + String sql, + Metadata metadata, + long timeOut) { + return execution( + queryId, + session, + sql, + ((queryContext, startTime) -> + createQueryExecutionForTableModel( + statement, + sqlParser, + clientSession, + queryContext, + metadata, + timeOut > 0 ? timeOut : CONFIG.getQueryTimeoutThreshold(), + startTime))); + } + + private IQueryExecution createQueryExecutionForTableModel( + Statement statement, + SqlParser sqlParser, + IClientSession clientSession, + MPPQueryContext queryContext, + Metadata metadata, + long timeOut, + long startTime) { + queryContext.setTimeOut(timeOut); + queryContext.setStartTime(startTime); + RelationalModelPlanner relationalModelPlanner = + new RelationalModelPlanner( + statement.toRelationalStatement(queryContext), + sqlParser, + metadata, + executor, + writeOperationExecutor, + scheduledExecutor, + SYNC_INTERNAL_SERVICE_CLIENT_MANAGER, + ASYNC_INTERNAL_SERVICE_CLIENT_MANAGER); + return new QueryExecution(relationalModelPlanner, queryContext, executor); + } + private IQueryExecution createQueryExecutionForTableModel( org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Statement statement, SqlParser sqlParser, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeUtils.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeUtils.java new file mode 100644 index 00000000000..af861730016 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeUtils.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.analyze; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot; +import org.apache.iotdb.commons.exception.IoTDBException; +import org.apache.iotdb.commons.partition.DataPartition; +import org.apache.iotdb.commons.partition.DataPartitionQueryParam; +import org.apache.iotdb.commons.service.metric.PerformanceOverviewMetrics; +import org.apache.iotdb.db.exception.sql.SemanticException; +import org.apache.iotdb.db.queryengine.common.MPPQueryContext; +import org.apache.iotdb.db.queryengine.plan.analyze.schema.ISchemaFetcher; +import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertBaseStatement; +import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertMultiTabletsStatement; +import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowStatement; +import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsStatement; +import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement; +import org.apache.iotdb.rpc.RpcUtils; +import org.apache.iotdb.rpc.TSStatusCode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class AnalyzeUtils { + + private static final PerformanceOverviewMetrics PERFORMANCE_OVERVIEW_METRICS = + PerformanceOverviewMetrics.getInstance(); + private static final Logger LOGGER = LoggerFactory.getLogger(AnalyzeUtils.class); + + private AnalyzeUtils() { + // util class + } + + public static void validateSchema( + IAnalysis analysis, InsertBaseStatement insertStatement, + Runnable schemaValidation) { + final long startTime = System.nanoTime(); + try { + schemaValidation.run(); + } catch (SemanticException e) { + analysis.setFinishQueryAfterAnalyze(true); + if (e.getCause() instanceof IoTDBException) { + IoTDBException exception = (IoTDBException) e.getCause(); + analysis.setFailStatus( + RpcUtils.getStatus(exception.getErrorCode(), exception.getMessage())); + } else { + analysis.setFailStatus(RpcUtils.getStatus(TSStatusCode.METADATA_ERROR, e.getMessage())); + } + } finally { + PERFORMANCE_OVERVIEW_METRICS.recordScheduleSchemaValidateCost(System.nanoTime() - startTime); + } + boolean hasFailedMeasurement = insertStatement.hasFailedMeasurements(); + String partialInsertMessage; + if (hasFailedMeasurement) { + partialInsertMessage = + String.format( + "Fail to insert measurements %s caused by %s", + insertStatement.getFailedMeasurements(), insertStatement.getFailedMessages()); + LOGGER.warn(partialInsertMessage); + analysis.setFailStatus( + RpcUtils.getStatus(TSStatusCode.METADATA_ERROR.getStatusCode(), partialInsertMessage)); + } + } + + public static InsertBaseStatement removeLogicalView( + IAnalysis analysis, InsertBaseStatement insertBaseStatement) { + try { + return insertBaseStatement.removeLogicalView(); + } catch (SemanticException e) { + analysis.setFinishQueryAfterAnalyze(true); + if (e.getCause() instanceof IoTDBException) { + IoTDBException exception = (IoTDBException) e.getCause(); + analysis.setFailStatus( + RpcUtils.getStatus(exception.getErrorCode(), exception.getMessage())); + } else { + analysis.setFailStatus(RpcUtils.getStatus(TSStatusCode.METADATA_ERROR, e.getMessage())); + } + return insertBaseStatement; + } + } + + /** get analysis according to statement and params */ + public static Analysis getAnalysisForWriting( + Analysis analysis, List<DataPartitionQueryParam> dataPartitionQueryParams, String userName, + IPartitionFetcher partitionFetcher) { + + DataPartition dataPartition = + partitionFetcher.getOrCreateDataPartition(dataPartitionQueryParams, userName); + if (dataPartition.isEmpty()) { + analysis.setFinishQueryAfterAnalyze(true); + analysis.setFailStatus( + RpcUtils.getStatus( + TSStatusCode.DATABASE_NOT_EXIST.getStatusCode(), + "Database not exists and failed to create automatically " + + "because enable_auto_create_schema is FALSE.")); + } + analysis.setDataPartitionInfo(dataPartition); + return analysis; + } + + public static Analysis computeAnalysisForInsertRows( + Analysis analysis, InsertRowsStatement insertRowsStatement, String userName, + IPartitionFetcher partitionFetcher) { + Map<String, Set<TTimePartitionSlot>> dataPartitionQueryParamMap = new HashMap<>(); + for (InsertRowStatement insertRowStatement : insertRowsStatement.getInsertRowStatementList()) { + Set<TTimePartitionSlot> timePartitionSlotSet = + dataPartitionQueryParamMap.computeIfAbsent( + insertRowStatement.getDevicePath().getFullPath(), k -> new HashSet<>()); + timePartitionSlotSet.add(insertRowStatement.getTimePartitionSlot()); + } + + List<DataPartitionQueryParam> dataPartitionQueryParams = new ArrayList<>(); + for (Map.Entry<String, Set<TTimePartitionSlot>> entry : dataPartitionQueryParamMap.entrySet()) { + DataPartitionQueryParam dataPartitionQueryParam = new DataPartitionQueryParam(); + dataPartitionQueryParam.setDevicePath(entry.getKey()); + dataPartitionQueryParam.setTimePartitionSlotList(new ArrayList<>(entry.getValue())); + dataPartitionQueryParams.add(dataPartitionQueryParam); + } + + return getAnalysisForWriting(analysis, dataPartitionQueryParams, userName, partitionFetcher); + } + + public static Analysis computeAnalysisForMultiTablets( + Analysis analysis, InsertMultiTabletsStatement insertMultiTabletsStatement, String userName + , IPartitionFetcher partitionFetcher) { + Map<String, Set<TTimePartitionSlot>> dataPartitionQueryParamMap = new HashMap<>(); + for (InsertTabletStatement insertTabletStatement : + insertMultiTabletsStatement.getInsertTabletStatementList()) { + Set<TTimePartitionSlot> timePartitionSlotSet = + dataPartitionQueryParamMap.computeIfAbsent( + insertTabletStatement.getDevicePath().getFullPath(), k -> new HashSet<>()); + timePartitionSlotSet.addAll(insertTabletStatement.getTimePartitionSlots()); + } + + List<DataPartitionQueryParam> dataPartitionQueryParams = new ArrayList<>(); + for (Map.Entry<String, Set<TTimePartitionSlot>> entry : dataPartitionQueryParamMap.entrySet()) { + DataPartitionQueryParam dataPartitionQueryParam = new DataPartitionQueryParam(); + dataPartitionQueryParam.setDevicePath(entry.getKey()); + dataPartitionQueryParam.setTimePartitionSlotList(new ArrayList<>(entry.getValue())); + dataPartitionQueryParams.add(dataPartitionQueryParam); + } + + return getAnalysisForWriting(analysis, dataPartitionQueryParams, userName, partitionFetcher); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java index cf49f83c7ce..af95e958a8c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java @@ -24,7 +24,6 @@ import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot; import org.apache.iotdb.commons.client.exception.ClientManagerException; import org.apache.iotdb.commons.conf.IoTDBConstant; import org.apache.iotdb.commons.exception.IllegalPathException; -import org.apache.iotdb.commons.exception.IoTDBException; import org.apache.iotdb.commons.exception.MetadataException; import org.apache.iotdb.commons.partition.DataPartition; import org.apache.iotdb.commons.partition.DataPartitionQueryParam; @@ -185,6 +184,11 @@ import static org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant import static org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant.ENDTIME; import static org.apache.iotdb.db.queryengine.metric.QueryPlanCostMetricSet.PARTITION_FETCHER; import static org.apache.iotdb.db.queryengine.metric.QueryPlanCostMetricSet.SCHEMA_FETCHER; +import static org.apache.iotdb.db.queryengine.plan.analyze.AnalyzeUtils.computeAnalysisForInsertRows; +import static org.apache.iotdb.db.queryengine.plan.analyze.AnalyzeUtils.computeAnalysisForMultiTablets; +import static org.apache.iotdb.db.queryengine.plan.analyze.AnalyzeUtils.getAnalysisForWriting; +import static org.apache.iotdb.db.queryengine.plan.analyze.AnalyzeUtils.removeLogicalView; +import static org.apache.iotdb.db.queryengine.plan.analyze.AnalyzeUtils.validateSchema; import static org.apache.iotdb.db.queryengine.plan.analyze.ExpressionAnalyzer.bindSchemaForExpression; import static org.apache.iotdb.db.queryengine.plan.analyze.ExpressionAnalyzer.concatDeviceAndBindSchemaForExpression; import static org.apache.iotdb.db.queryengine.plan.analyze.ExpressionAnalyzer.getMeasurementExpression; @@ -2616,7 +2620,8 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext> context.setQueryType(QueryType.WRITE); insertTabletStatement.semanticCheck(); Analysis analysis = new Analysis(); - validateSchema(analysis, insertTabletStatement, context); + validateSchema(analysis, insertTabletStatement, + () -> SchemaValidator.validate(schemaFetcher, insertTabletStatement, context)); InsertBaseStatement realStatement = removeLogicalView(analysis, insertTabletStatement); if (analysis.isFinishQueryAfterAnalyze()) { return analysis; @@ -2634,12 +2639,12 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext> return getAnalysisForWriting( analysis, Collections.singletonList(dataPartitionQueryParam), - context.getSession().getUserName()); + context.getSession().getUserName(), partitionFetcher); } else { return computeAnalysisForMultiTablets( analysis, (InsertMultiTabletsStatement) realStatement, - context.getSession().getUserName()); + context.getSession().getUserName(), partitionFetcher); } } @@ -2648,7 +2653,8 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext> context.setQueryType(QueryType.WRITE); insertRowStatement.semanticCheck(); Analysis analysis = new Analysis(); - validateSchema(analysis, insertRowStatement, context); + validateSchema(analysis, insertRowStatement, + () -> SchemaValidator.validate(schemaFetcher, insertRowStatement, context)); InsertBaseStatement realInsertStatement = removeLogicalView(analysis, insertRowStatement); if (analysis.isFinishQueryAfterAnalyze()) { return analysis; @@ -2665,41 +2671,22 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext> return getAnalysisForWriting( analysis, Collections.singletonList(dataPartitionQueryParam), - context.getSession().getUserName()); + context.getSession().getUserName(), partitionFetcher); } else { return computeAnalysisForInsertRows( - analysis, (InsertRowsStatement) realInsertStatement, context.getSession().getUserName()); + analysis, (InsertRowsStatement) realInsertStatement, context.getSession().getUserName() + , partitionFetcher); } } - private Analysis computeAnalysisForInsertRows( - Analysis analysis, InsertRowsStatement insertRowsStatement, String userName) { - Map<String, Set<TTimePartitionSlot>> dataPartitionQueryParamMap = new HashMap<>(); - for (InsertRowStatement insertRowStatement : insertRowsStatement.getInsertRowStatementList()) { - Set<TTimePartitionSlot> timePartitionSlotSet = - dataPartitionQueryParamMap.computeIfAbsent( - insertRowStatement.getDevicePath().getFullPath(), k -> new HashSet<>()); - timePartitionSlotSet.add(insertRowStatement.getTimePartitionSlot()); - } - - List<DataPartitionQueryParam> dataPartitionQueryParams = new ArrayList<>(); - for (Map.Entry<String, Set<TTimePartitionSlot>> entry : dataPartitionQueryParamMap.entrySet()) { - DataPartitionQueryParam dataPartitionQueryParam = new DataPartitionQueryParam(); - dataPartitionQueryParam.setDevicePath(entry.getKey()); - dataPartitionQueryParam.setTimePartitionSlotList(new ArrayList<>(entry.getValue())); - dataPartitionQueryParams.add(dataPartitionQueryParam); - } - - return getAnalysisForWriting(analysis, dataPartitionQueryParams, userName); - } - @Override public Analysis visitInsertRows( InsertRowsStatement insertRowsStatement, MPPQueryContext context) { context.setQueryType(QueryType.WRITE); insertRowsStatement.semanticCheck(); Analysis analysis = new Analysis(); - validateSchema(analysis, insertRowsStatement, context); + validateSchema(analysis, insertRowsStatement, + () -> SchemaValidator.validate(schemaFetcher, insertRowsStatement, context)); InsertRowsStatement realInsertRowsStatement = (InsertRowsStatement) removeLogicalView(analysis, insertRowsStatement); if (analysis.isFinishQueryAfterAnalyze()) { @@ -2708,29 +2695,7 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext> analysis.setStatement(realInsertRowsStatement); return computeAnalysisForInsertRows( - analysis, realInsertRowsStatement, context.getSession().getUserName()); - } - - private Analysis computeAnalysisForMultiTablets( - Analysis analysis, InsertMultiTabletsStatement insertMultiTabletsStatement, String userName) { - Map<String, Set<TTimePartitionSlot>> dataPartitionQueryParamMap = new HashMap<>(); - for (InsertTabletStatement insertTabletStatement : - insertMultiTabletsStatement.getInsertTabletStatementList()) { - Set<TTimePartitionSlot> timePartitionSlotSet = - dataPartitionQueryParamMap.computeIfAbsent( - insertTabletStatement.getDevicePath().getFullPath(), k -> new HashSet<>()); - timePartitionSlotSet.addAll(insertTabletStatement.getTimePartitionSlots()); - } - - List<DataPartitionQueryParam> dataPartitionQueryParams = new ArrayList<>(); - for (Map.Entry<String, Set<TTimePartitionSlot>> entry : dataPartitionQueryParamMap.entrySet()) { - DataPartitionQueryParam dataPartitionQueryParam = new DataPartitionQueryParam(); - dataPartitionQueryParam.setDevicePath(entry.getKey()); - dataPartitionQueryParam.setTimePartitionSlotList(new ArrayList<>(entry.getValue())); - dataPartitionQueryParams.add(dataPartitionQueryParam); - } - - return getAnalysisForWriting(analysis, dataPartitionQueryParams, userName); + analysis, realInsertRowsStatement, context.getSession().getUserName(), partitionFetcher); } @Override @@ -2739,7 +2704,8 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext> context.setQueryType(QueryType.WRITE); insertMultiTabletsStatement.semanticCheck(); Analysis analysis = new Analysis(); - validateSchema(analysis, insertMultiTabletsStatement, context); + validateSchema(analysis, insertMultiTabletsStatement, + () -> SchemaValidator.validate(schemaFetcher, insertMultiTabletsStatement, context)); InsertMultiTabletsStatement realStatement = (InsertMultiTabletsStatement) removeLogicalView(analysis, insertMultiTabletsStatement); if (analysis.isFinishQueryAfterAnalyze()) { @@ -2748,7 +2714,7 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext> analysis.setStatement(realStatement); return computeAnalysisForMultiTablets( - analysis, realStatement, context.getSession().getUserName()); + analysis, realStatement, context.getSession().getUserName(), partitionFetcher); } @Override @@ -2757,7 +2723,8 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext> context.setQueryType(QueryType.WRITE); insertRowsOfOneDeviceStatement.semanticCheck(); Analysis analysis = new Analysis(); - validateSchema(analysis, insertRowsOfOneDeviceStatement, context); + validateSchema(analysis, insertRowsOfOneDeviceStatement, + () -> SchemaValidator.validate(schemaFetcher, insertRowsOfOneDeviceStatement, context)); InsertBaseStatement realInsertStatement = removeLogicalView(analysis, insertRowsOfOneDeviceStatement); if (analysis.isFinishQueryAfterAnalyze()) { @@ -2775,10 +2742,12 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext> return getAnalysisForWriting( analysis, Collections.singletonList(dataPartitionQueryParam), - context.getSession().getUserName()); + context.getSession().getUserName(), + partitionFetcher); } else { return computeAnalysisForInsertRows( - analysis, (InsertRowsStatement) realInsertStatement, context.getSession().getUserName()); + analysis, (InsertRowsStatement) realInsertStatement, context.getSession().getUserName() + , partitionFetcher); } } @@ -2793,53 +2762,6 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext> return analysis; } - private void validateSchema( - Analysis analysis, InsertBaseStatement insertStatement, MPPQueryContext context) { - final long startTime = System.nanoTime(); - try { - SchemaValidator.validate(schemaFetcher, insertStatement, context); - } catch (SemanticException e) { - analysis.setFinishQueryAfterAnalyze(true); - if (e.getCause() instanceof IoTDBException) { - IoTDBException exception = (IoTDBException) e.getCause(); - analysis.setFailStatus( - RpcUtils.getStatus(exception.getErrorCode(), exception.getMessage())); - } else { - analysis.setFailStatus(RpcUtils.getStatus(TSStatusCode.METADATA_ERROR, e.getMessage())); - } - } finally { - PERFORMANCE_OVERVIEW_METRICS.recordScheduleSchemaValidateCost(System.nanoTime() - startTime); - } - boolean hasFailedMeasurement = insertStatement.hasFailedMeasurements(); - String partialInsertMessage; - if (hasFailedMeasurement) { - partialInsertMessage = - String.format( - "Fail to insert measurements %s caused by %s", - insertStatement.getFailedMeasurements(), insertStatement.getFailedMessages()); - logger.warn(partialInsertMessage); - analysis.setFailStatus( - RpcUtils.getStatus(TSStatusCode.METADATA_ERROR.getStatusCode(), partialInsertMessage)); - } - } - - private InsertBaseStatement removeLogicalView( - Analysis analysis, InsertBaseStatement insertBaseStatement) { - try { - return insertBaseStatement.removeLogicalView(); - } catch (SemanticException e) { - analysis.setFinishQueryAfterAnalyze(true); - if (e.getCause() instanceof IoTDBException) { - IoTDBException exception = (IoTDBException) e.getCause(); - analysis.setFailStatus( - RpcUtils.getStatus(exception.getErrorCode(), exception.getMessage())); - } else { - analysis.setFailStatus(RpcUtils.getStatus(TSStatusCode.METADATA_ERROR, e.getMessage())); - } - return insertBaseStatement; - } - } - @Override public Analysis visitLoadFile(LoadTsFileStatement loadTsFileStatement, MPPQueryContext context) { LoadTsfileAnalyzer loadTsfileAnalyzer = @@ -2851,24 +2773,6 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext> } } - /** get analysis according to statement and params */ - private Analysis getAnalysisForWriting( - Analysis analysis, List<DataPartitionQueryParam> dataPartitionQueryParams, String userName) { - - DataPartition dataPartition = - partitionFetcher.getOrCreateDataPartition(dataPartitionQueryParams, userName); - if (dataPartition.isEmpty()) { - analysis.setFinishQueryAfterAnalyze(true); - analysis.setFailStatus( - RpcUtils.getStatus( - TSStatusCode.DATABASE_NOT_EXIST.getStatusCode(), - "Database not exists and failed to create automatically " - + "because enable_auto_create_schema is FALSE.")); - } - analysis.setDataPartitionInfo(dataPartition); - return analysis; - } - private boolean analyzeTimeseriesRegionScan( WhereCondition timeCondition, PathPatternTree patternTree, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/IAnalysis.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/IAnalysis.java index 207dea7b9f3..bb3821aef6e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/IAnalysis.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/IAnalysis.java @@ -42,4 +42,12 @@ public interface IAnalysis { DatasetHeader getRespDatasetHeader(); String getStatementType(); + + default void setFinishQueryAfterAnalyze(boolean b) { + + } + + default void setFailStatus(TSStatus status) { + + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ISchemaAutoCreation.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ISchemaAutoCreation.java index e54e8a37d9e..55f8dd5210c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ISchemaAutoCreation.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ISchemaAutoCreation.java @@ -39,7 +39,8 @@ public interface ISchemaAutoCreation { TSDataType getDataType(int index); + //TODO: Check necessity because all implementations return null TSEncoding getEncoding(int index); - + //TODO: Check necessity because all implementations return null CompressionType getCompressionType(int index); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/SchemaValidator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/SchemaValidator.java index 7bbdd5cefc6..1406698cc42 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/SchemaValidator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/SchemaValidator.java @@ -24,6 +24,8 @@ import org.apache.iotdb.db.exception.query.QueryProcessException; import org.apache.iotdb.db.exception.sql.SemanticException; import org.apache.iotdb.db.queryengine.common.MPPQueryContext; import org.apache.iotdb.db.queryengine.common.schematree.ISchemaTree; +import org.apache.iotdb.db.queryengine.plan.relational.metadata.Metadata; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.WrappedInsertStatement; import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertBaseStatement; import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertMultiTabletsStatement; import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsOfOneDeviceStatement; @@ -55,6 +57,18 @@ public class SchemaValidator { } } + public static void validate( + Metadata metadata, WrappedInsertStatement insertStatement, MPPQueryContext context) { + try { + metadata.fetchAndComputeSchemaWithAutoCreate(insertStatement.getSchemaValidationList(), + context); + insertStatement.updateAfterSchemaValidation(context); + } catch (QueryProcessException e) { + throw new SemanticException(e.getMessage()); + } + } + + public static ISchemaTree validate( ISchemaFetcher schemaFetcher, List<PartialPath> devicePaths, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/StatementGenerator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/StatementGenerator.java index 43a117002c6..288f54fd77c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/StatementGenerator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/StatementGenerator.java @@ -35,6 +35,7 @@ import org.apache.iotdb.db.queryengine.plan.expression.leaf.ConstantOperand; import org.apache.iotdb.db.queryengine.plan.expression.leaf.TimeSeriesOperand; import org.apache.iotdb.db.queryengine.plan.expression.leaf.TimestampOperand; import org.apache.iotdb.db.queryengine.plan.expression.multi.FunctionExpression; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ColumnDefinition.ColumnCategory; import org.apache.iotdb.db.queryengine.plan.statement.Statement; import org.apache.iotdb.db.queryengine.plan.statement.component.FromComponent; import org.apache.iotdb.db.queryengine.plan.statement.component.GroupByTimeComponent; @@ -335,8 +336,19 @@ public class StatementGenerator { } insertStatement.setDataTypes(dataTypes); insertStatement.setAligned(insertTabletReq.isAligned); - PERFORMANCE_OVERVIEW_METRICS.recordParseCost(System.nanoTime() - startTime); insertStatement.setWriteToTable(insertTabletReq.isWriteToTable()); + if (insertTabletReq.isWriteToTable()) { + if (!insertTabletReq.isSetColumnCategories() || insertTabletReq.getColumnCategoriesSize() != insertTabletReq.getMeasurementsSize()) { + throw new IllegalArgumentException("Missing or invalid column categories for table " + + "insertion"); + } + ColumnCategory[] columnCategories = new ColumnCategory[insertTabletReq.columnCategories.size()]; + for (int i = 0; i < columnCategories.length; i++) { + columnCategories[i] = ColumnCategory.values()[insertTabletReq.getColumnCategories().get(i)]; + } + insertStatement.setColumnCategories(columnCategories); + } + PERFORMANCE_OVERVIEW_METRICS.recordParseCost(System.nanoTime() - startTime); return insertStatement; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/StatementAnalyzer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/StatementAnalyzer.java index a15ed482daa..75e7153513a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/StatementAnalyzer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/StatementAnalyzer.java @@ -19,11 +19,17 @@ package org.apache.iotdb.db.queryengine.plan.relational.analyzer; +import java.util.Collections; +import org.apache.iotdb.commons.partition.DataPartitionQueryParam; import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory; import org.apache.iotdb.db.exception.sql.SemanticException; +import org.apache.iotdb.db.queryengine.common.MPPQueryContext; import org.apache.iotdb.db.queryengine.common.SessionInfo; import org.apache.iotdb.db.queryengine.execution.warnings.IoTDBWarning; import org.apache.iotdb.db.queryengine.execution.warnings.WarningCollector; +import org.apache.iotdb.db.queryengine.plan.analyze.IAnalysis; +import org.apache.iotdb.db.queryengine.plan.analyze.QueryType; +import org.apache.iotdb.db.queryengine.plan.analyze.schema.SchemaValidator; import org.apache.iotdb.db.queryengine.plan.relational.metadata.ColumnSchema; import org.apache.iotdb.db.queryengine.plan.relational.metadata.Metadata; import org.apache.iotdb.db.queryengine.plan.relational.metadata.QualifiedObjectName; @@ -57,6 +63,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.GroupingElement; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.GroupingSets; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Identifier; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Insert; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.InsertTablet; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Intersect; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Join; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.JoinCriteria; @@ -106,6 +113,9 @@ import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import com.google.common.collect.ListMultimap; import com.google.common.collect.Streams; +import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertBaseStatement; +import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertMultiTabletsStatement; +import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement; import org.apache.tsfile.read.common.type.RowType; import org.apache.tsfile.read.common.type.Type; @@ -134,6 +144,10 @@ import static java.util.Collections.emptyList; import static java.util.Locale.ENGLISH; import static java.util.Objects.requireNonNull; import static org.apache.iotdb.db.queryengine.execution.warnings.StandardWarningCode.REDUNDANT_ORDER_BY; +import static org.apache.iotdb.db.queryengine.plan.analyze.AnalyzeUtils.computeAnalysisForMultiTablets; +import static org.apache.iotdb.db.queryengine.plan.analyze.AnalyzeUtils.getAnalysisForWriting; +import static org.apache.iotdb.db.queryengine.plan.analyze.AnalyzeUtils.removeLogicalView; +import static org.apache.iotdb.db.queryengine.plan.analyze.AnalyzeUtils.validateSchema; import static org.apache.iotdb.db.queryengine.plan.relational.analyzer.AggregationAnalyzer.verifyOrderByAggregations; import static org.apache.iotdb.db.queryengine.plan.relational.analyzer.AggregationAnalyzer.verifySourceAggregations; import static org.apache.iotdb.db.queryengine.plan.relational.analyzer.CanonicalizationAware.canonicalizationAwareKey; @@ -153,7 +167,7 @@ public class StatementAnalyzer { private final StatementAnalyzerFactory statementAnalyzerFactory; - private final Analysis analysis; + private Analysis analysis; private final AccessControl accessControl; @@ -212,6 +226,7 @@ public class StatementAnalyzer { * scopes hierarchy should always have outer query scope (if provided) as ancestor. */ private final class Visitor extends AstVisitor<Scope, Optional<Scope>> { + private final boolean isTopLevel; private final Optional<Scope> outerQueryScope; private final WarningCollector warningCollector; @@ -352,6 +367,46 @@ public class StatementAnalyzer { throw new SemanticException("Insert statement is not supported yet."); } + protected Scope visitInsertTablet(InsertTablet insert, Optional<Scope> scope) { + final Scope ret = Scope.create(); + + final MPPQueryContext context = insert.getContext(); + final InsertTabletStatement insertTabletStatement = insert.getInnerTreeStatement(); + context.setQueryType(QueryType.WRITE); + insertTabletStatement.semanticCheck(); + IAnalysis analysis = new org.apache.iotdb.db.queryengine.plan.analyze.Analysis(); + + validateSchema(analysis, insertTabletStatement, + () -> SchemaValidator.validate(metadata, insert, context)); + InsertBaseStatement realStatement = removeLogicalView(analysis, insertTabletStatement); + if (analysis.isFinishQueryAfterAnalyze()) { + return ret; + } + analysis.setStatement(realStatement); + + if (realStatement instanceof InsertTabletStatement) { + InsertTabletStatement realInsertTabletStatement = (InsertTabletStatement) realStatement; + DataPartitionQueryParam dataPartitionQueryParam = new DataPartitionQueryParam(); + dataPartitionQueryParam.setDevicePath( + realInsertTabletStatement.getDevicePath().getFullPath()); + dataPartitionQueryParam.setTimePartitionSlotList( + realInsertTabletStatement.getTimePartitionSlots()); + + analysis = getAnalysisForWriting( + analysis, + Collections.singletonList(dataPartitionQueryParam), + context.getSession().getUserName(), partitionFetcher); + } else { + analysis = computeAnalysisForMultiTablets( + analysis, + (InsertMultiTabletsStatement) realStatement, + context.getSession().getUserName(), partitionFetcher); + } + // TODO-TableIngestion: use IAnalysis + // StatementAnalyzer.this.analysis = analysis; + return ret; + } + @Override protected Scope visitDelete(Delete node, Optional<Scope> scope) { throw new SemanticException("Delete statement is not supported yet."); @@ -1965,7 +2020,7 @@ public class StatementAnalyzer { /** * @return true if the Query / QuerySpecification containing the analyzed Limit or FetchFirst, - * must contain orderBy (i.e., for FetchFirst with ties). + * must contain orderBy (i.e., for FetchFirst with ties). */ private boolean analyzeLimit(Node node, Scope scope) { // checkState( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/Metadata.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/Metadata.java index f37aabbce8b..df047d7d652 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/Metadata.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/Metadata.java @@ -19,7 +19,9 @@ package org.apache.iotdb.db.queryengine.plan.relational.metadata; +import org.apache.iotdb.db.queryengine.common.MPPQueryContext; import org.apache.iotdb.db.queryengine.common.SessionInfo; +import org.apache.iotdb.db.queryengine.plan.analyze.schema.ISchemaComputationWithAutoCreation; import org.apache.iotdb.db.queryengine.plan.relational.function.OperatorType; import org.apache.iotdb.db.queryengine.plan.relational.security.AccessControl; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Expression; @@ -67,4 +69,19 @@ public interface Metadata { QualifiedObjectName tableName, List<Expression> expressionList, List<String> attributeColumns); + + /** + * Fetch and compute the schema of target timeseries, with device and measurement defined in given + * schemaComputationWithAutoCreation. The computation defined in given + * schemaComputationWithAutoCreation will be executed during scanning the fetched schema. If some + * target timeseries doesn't exist, they will be auto created. + * + * @param schemaComputationWithAutoCreationList define the target devices, measurements and + * computation + */ + default void fetchAndComputeSchemaWithAutoCreate( + List<? extends ISchemaComputationWithAutoCreation> schemaComputationWithAutoCreationList, + MPPQueryContext context) { + throw new UnsupportedOperationException(); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/AstVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/AstVisitor.java index d2c1e487458..f2476e59a95 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/AstVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/AstVisitor.java @@ -371,6 +371,10 @@ public abstract class AstVisitor<R, C> { return visitStatement(node, context); } + protected R visitInsertTablet(InsertTablet node, C context) { + return visitStatement(node, context); + } + protected R visitDelete(Delete node, C context) { return visitStatement(node, context); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/InsertTablet.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/InsertTablet.java index 2b858bdb8a7..38a423fae21 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/InsertTablet.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/InsertTablet.java @@ -17,13 +17,99 @@ * under the License. */ -package org.apache.iotdb.db.queryengine.plan.relational.sql.tree; +package org.apache.iotdb.db.queryengine.plan.relational.sql.ast; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.iotdb.commons.schema.view.LogicalViewSchema; +import org.apache.iotdb.db.exception.query.QueryProcessException; +import org.apache.iotdb.db.queryengine.common.MPPQueryContext; +import org.apache.iotdb.db.queryengine.common.schematree.IMeasurementSchemaInfo; +import org.apache.iotdb.db.queryengine.plan.analyze.schema.ISchemaComputationWithAutoCreation; import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement; +import org.apache.tsfile.file.metadata.IDeviceID; +import org.apache.tsfile.utils.Pair; -public class InsertTablet extends WrappedStatement { +public class InsertTablet extends WrappedInsertStatement { - public InsertTablet(InsertTabletStatement insertTabletStatement) { - super(insertTabletStatement); + private InsertTabletStatement insertTabletStatement; + + public InsertTablet(InsertTabletStatement insertTabletStatement, MPPQueryContext context) { + super(insertTabletStatement, context); + this.insertTabletStatement = insertTabletStatement; + } + + @Override + public <R, C> R accept(AstVisitor<R, C> visitor, C context) { + return visitor.visitInsertTablet(this, context); + } + + @Override + public InsertTabletStatement getInnerTreeStatement() { + return insertTabletStatement; + } + + @Override + public List<ISchemaComputationWithAutoCreation> getSchemaValidationList() { + Map<IDeviceID, ISchemaComputationWithAutoCreation> map = new HashMap<>(); + for (int i = 0; i < insertTabletStatement.getRowCount(); i++) { + map.computeIfAbsent(insertTabletStatement.getTableDeviceID(i), this::getSchemaComputation); + } + return new ArrayList<>(map.values()); + } + + @Override + public void updateAfterSchemaValidation(MPPQueryContext context) throws QueryProcessException { + insertTabletStatement.updateAfterSchemaValidation(context); + } + + @Override + public ISchemaComputationWithAutoCreation getSchemaComputation(IDeviceID deviceID) { + return new SchemaExecutions(deviceID); + } + + public class SchemaExecutions extends BasicSchemaExecutions { + + public SchemaExecutions(IDeviceID deviceID) { + super(deviceID); + } + + @Override + public void computeMeasurement(int index, IMeasurementSchemaInfo measurementSchemaInfo) { + insertTabletStatement.computeMeasurement(index, measurementSchemaInfo); + } + + @Override + public boolean hasLogicalViewNeedProcess() { + return insertTabletStatement.hasLogicalViewNeedProcess(); + } + + @Override + public List<LogicalViewSchema> getLogicalViewSchemaList() { + return insertTabletStatement.getLogicalViewSchemaList(); + } + + @Override + public List<Integer> getIndexListOfLogicalViewPaths() { + return insertTabletStatement.getIndexListOfLogicalViewPaths(); + } + + @Override + public void recordRangeOfLogicalViewSchemaListNow() { + insertTabletStatement.recordRangeOfLogicalViewSchemaListNow(); + } + + @Override + public Pair<Integer, Integer> getRangeOfLogicalViewSchemaListRecorded() { + return insertTabletStatement.getRangeOfLogicalViewSchemaListRecorded(); + } + + @Override + public void computeMeasurementOfView(int index, IMeasurementSchemaInfo measurementSchemaInfo, + boolean isAligned) { + insertTabletStatement.computeMeasurementOfView(index, measurementSchemaInfo, isAligned); + } } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/WrappedInsertStatement.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/WrappedInsertStatement.java new file mode 100644 index 00000000000..e57e7b55c40 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/WrappedInsertStatement.java @@ -0,0 +1,89 @@ +package org.apache.iotdb.db.queryengine.plan.relational.sql.ast; + +import java.util.List; +import org.apache.iotdb.commons.exception.IllegalPathException; +import org.apache.iotdb.commons.path.PartialPath; +import org.apache.iotdb.commons.schema.view.LogicalViewSchema; +import org.apache.iotdb.db.exception.query.QueryProcessException; +import org.apache.iotdb.db.queryengine.common.MPPQueryContext; +import org.apache.iotdb.db.queryengine.common.schematree.IMeasurementSchemaInfo; +import org.apache.iotdb.db.queryengine.execution.schedule.queue.ID; +import org.apache.iotdb.db.queryengine.plan.analyze.schema.ISchemaComputationWithAutoCreation; +import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertBaseStatement; +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.file.metadata.IDeviceID; +import org.apache.tsfile.file.metadata.enums.CompressionType; +import org.apache.tsfile.file.metadata.enums.TSEncoding; +import org.apache.tsfile.utils.Pair; + +public abstract class WrappedInsertStatement extends WrappedStatement { + + protected InsertBaseStatement insertStatement; + + public WrappedInsertStatement( + InsertBaseStatement innerTreeStatement, + MPPQueryContext context) { + super(innerTreeStatement, context); + this.insertStatement = innerTreeStatement; + } + + @Override + public InsertBaseStatement getInnerTreeStatement() { + return insertStatement; + } + + public abstract List<ISchemaComputationWithAutoCreation> getSchemaValidationList(); + + public abstract void updateAfterSchemaValidation(MPPQueryContext context) throws QueryProcessException; + + public abstract ISchemaComputationWithAutoCreation getSchemaComputation(IDeviceID deviceID); + + public abstract class BasicSchemaExecutions implements ISchemaComputationWithAutoCreation { + protected IDeviceID deviceID; + + public BasicSchemaExecutions(IDeviceID deviceID) { + this.deviceID = deviceID; + } + + @Override + public boolean isAligned() { + return true; + } + + @Override + public TSDataType getDataType(int index) { + return insertStatement.getDataTypes()[index]; + } + + @Override + public TSEncoding getEncoding(int index) { + return null; + } + + @Override + public CompressionType getCompressionType(int index) { + return null; + } + + @Override + public PartialPath getDevicePath() { + //TODO-TableInsertion: use deviceId + try { + return new PartialPath(deviceID); + } catch (IllegalPathException e) { + throw new RuntimeException(e); + } + } + + @Override + public String[] getMeasurements() { + return insertStatement.getMeasurements(); + } + + @Override + public void computeDevice(boolean isAligned) { + // ignored, table device must be aligned + } + + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/WrappedStatement.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/WrappedStatement.java index 62c7a7dee77..380388ee141 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/WrappedStatement.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/WrappedStatement.java @@ -17,18 +17,22 @@ * under the License. */ -package org.apache.iotdb.db.queryengine.plan.relational.sql.tree; +package org.apache.iotdb.db.queryengine.plan.relational.sql.ast; import java.util.Collections; import java.util.List; import java.util.Objects; +import org.apache.iotdb.db.queryengine.common.MPPQueryContext; public abstract class WrappedStatement extends Statement { - private final org.apache.iotdb.db.queryengine.plan.statement.Statement innerTreeStatement; + protected final org.apache.iotdb.db.queryengine.plan.statement.Statement innerTreeStatement; + protected final MPPQueryContext context; - public WrappedStatement(org.apache.iotdb.db.queryengine.plan.statement.Statement innerTreeStatement) { + public WrappedStatement(org.apache.iotdb.db.queryengine.plan.statement.Statement innerTreeStatement, + MPPQueryContext context) { super(null); this.innerTreeStatement = innerTreeStatement; + this.context = context; } @Override @@ -57,4 +61,12 @@ public abstract class WrappedStatement extends Statement { public String toString() { return innerTreeStatement.toString(); } + + public org.apache.iotdb.db.queryengine.plan.statement.Statement getInnerTreeStatement() { + return innerTreeStatement; + } + + public MPPQueryContext getContext() { + return context; + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/Statement.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/Statement.java index a19c61439c0..d7aac386a88 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/Statement.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/Statement.java @@ -22,6 +22,7 @@ package org.apache.iotdb.db.queryengine.plan.statement; import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.db.auth.AuthorityChecker; +import org.apache.iotdb.db.queryengine.common.MPPQueryContext; import org.apache.iotdb.db.queryengine.plan.parser.ASTVisitor; import java.util.List; @@ -67,7 +68,8 @@ public abstract class Statement extends StatementNode { "Only the admin user can perform this operation"); } - public org.apache.iotdb.db.queryengine.plan.relational.sql.tree.Statement toRelationalStatement() { + public org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Statement toRelationalStatement( + MPPQueryContext context) { throw new UnsupportedOperationException("Method not implemented yet"); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java index 35a0a6f18f6..53b60b67494 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java @@ -26,13 +26,19 @@ import org.apache.iotdb.commons.utils.TimePartitionUtils; import org.apache.iotdb.db.exception.metadata.DataTypeMismatchException; import org.apache.iotdb.db.exception.metadata.PathNotExistException; import org.apache.iotdb.db.exception.sql.SemanticException; +import org.apache.iotdb.db.queryengine.common.MPPQueryContext; import org.apache.iotdb.db.queryengine.common.schematree.IMeasurementSchemaInfo; import org.apache.iotdb.db.queryengine.plan.analyze.schema.ISchemaValidation; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ColumnDefinition.ColumnCategory; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Statement; import org.apache.iotdb.db.queryengine.plan.statement.StatementType; import org.apache.iotdb.db.queryengine.plan.statement.StatementVisitor; import org.apache.iotdb.db.utils.CommonUtils; import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.file.metadata.IDeviceID; +import org.apache.tsfile.file.metadata.IDeviceID.Factory; +import org.apache.tsfile.file.metadata.StringArrayDeviceID; import org.apache.tsfile.file.metadata.enums.CompressionType; import org.apache.tsfile.file.metadata.enums.TSEncoding; import org.apache.tsfile.utils.Binary; @@ -60,6 +66,10 @@ public class InsertTabletStatement extends InsertBaseStatement implements ISchem private BitMap[] bitMaps; private Object[] columns; + private ColumnCategory[] columnCategories; + private List<Integer> idColumnIndices; + private IDeviceID[] deviceIDs; + private int rowCount = 0; /** @@ -409,4 +419,45 @@ public class InsertTabletStatement extends InsertBaseStatement implements ISchem public void setWriteToTable(boolean writeToTable) { this.writeToTable = writeToTable; } + + @Override + public Statement toRelationalStatement(MPPQueryContext context) { + return super.toRelationalStatement(context); + } + + public ColumnCategory[] getColumnCategories() { + return columnCategories; + } + + public void setColumnCategories( + ColumnCategory[] columnCategories) { + this.columnCategories = columnCategories; + idColumnIndices = new ArrayList<>(); + for (int i = 0; i < columnCategories.length; i++) { + if (columnCategories[i].equals(ColumnCategory.ID)) { + idColumnIndices.add(i); + } + } + } + + public List<Integer> getIdColumnIndices() { + return idColumnIndices; + } + + public IDeviceID getTableDeviceID(int rowIdx) { + if (deviceIDs == null) { + deviceIDs = new IDeviceID[rowCount]; + } + if (deviceIDs[rowIdx] == null) { + String[] deviceIdSegments = new String[idColumnIndices.size() + 1]; + deviceIdSegments[0] = this.devicePath.getFullPath(); + for (int i = 0; i < idColumnIndices.size(); i++) { + final Integer columnIndex = idColumnIndices.get(i); + deviceIdSegments[i + 1] = ((Binary[]) columns[columnIndex])[rowIdx].toString(); + } + deviceIDs[rowIdx] = Factory.DEFAULT_FACTORY.create(deviceIdSegments); + } + + return deviceIDs[rowIdx]; + } } diff --git a/iotdb-protocol/thrift-datanode/src/main/thrift/client.thrift b/iotdb-protocol/thrift-datanode/src/main/thrift/client.thrift index 2838cd82128..4b760e27cef 100644 --- a/iotdb-protocol/thrift-datanode/src/main/thrift/client.thrift +++ b/iotdb-protocol/thrift-datanode/src/main/thrift/client.thrift @@ -234,6 +234,7 @@ struct TSInsertTabletReq { 7: required i32 size 8: optional bool isAligned 9: optional bool writeToTable + 10: optional list<byte> columnCategories } struct TSInsertTabletsReq {
