This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch iotdb-6094 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 13693f7e1eaae02eeef15bd53885d36959dfffd4 Author: Itami Sho <[email protected]> AuthorDate: Sat Aug 5 15:47:28 2023 +0800 [IOTDB-6094] Load: Fix error in if/else judgment of tsfileResource exists or not & Reduce warn log when timeseries exist (#10762) (cherry picked from commit ebc95a772b6f3ac1de05718240ce63a76381e7b1) --- .../queryengine/plan/analyze/AnalyzeVisitor.java | 91 ++++++++-------------- .../config/executor/ClusterConfigTaskExecutor.java | 27 ++++--- .../metadata/DatabaseSchemaStatement.java | 9 +++ 3 files changed, 58 insertions(+), 69 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java index 448f5b08ff0..2e345832fee 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java @@ -2492,8 +2492,6 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext> public Analysis visitLoadFile(LoadTsFileStatement loadTsFileStatement, MPPQueryContext context) { context.setQueryType(QueryType.WRITE); - Map<String, Map<MeasurementSchema, File>> device2Schemas = new HashMap<>(); - Map<String, Pair<Boolean, File>> device2IsAligned = new HashMap<>(); int tsfileNum = loadTsFileStatement.getTsFiles().size(); // analyze tsfile metadata @@ -2509,7 +2507,7 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext> tsFile.getPath())); } try { - analyzeTsFile(loadTsFileStatement, tsFile, device2Schemas, device2IsAligned, context); + analyzeTsFile(loadTsFileStatement, tsFile, context); } catch (IllegalArgumentException e) { logger.warn( String.format( @@ -2598,21 +2596,16 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext> return analysis; } - private void analyzeTsFile( - LoadTsFileStatement statement, - File tsFile, - Map<String, Map<MeasurementSchema, File>> device2Schemas, - Map<String, Pair<Boolean, File>> device2IsAligned, - MPPQueryContext context) + private void analyzeTsFile(LoadTsFileStatement statement, File tsFile, MPPQueryContext context) throws IOException, VerifyMetadataException { try (TsFileSequenceReader reader = new TsFileSequenceReader(tsFile.getAbsolutePath())) { - TsFileResource tsFileResource = new TsFileResource(tsFile); - final boolean isAlreadyExistBeforeLoad = tsFileResource.resourceFileExists(); - boolean isDeserializeDone = false; + Map<String, List<TimeseriesMetadata>> device2Metadata = null; - Map<String, List<TimeseriesMetadata>> device2Metadata = reader.getAllTimeseriesMetadata(true); if (IoTDBDescriptor.getInstance().getConfig().isAutoCreateSchemaEnabled() || statement.isVerifySchema()) { + device2Metadata = reader.getAllTimeseriesMetadata(true); + Map<String, Map<MeasurementSchema, File>> device2Schemas = new HashMap<>(); + Map<String, Pair<Boolean, File>> device2IsAligned = new HashMap<>(); // construct schema int deviceSize = device2Metadata.size(); int deviceCount = 0; @@ -2656,30 +2649,7 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext> && timeseriesIndex == timeseriesMetadataListSize - 1)) { // check if the device has the same aligned definition in all tsfiles - if (isDeviceAligned(device2IsAligned, device, tsFile, isAligned)) { - // case 1: if the tsfile has tsfile resource before loading, we should deserialize - // it only once. - if (isAlreadyExistBeforeLoad) { - if (!isDeserializeDone) { - tsFileResource.deserialize(); - statement.addTsFileResource(tsFileResource); - isDeserializeDone = true; - } - - } else if (!tsFileResource.resourceFileExists()) { - // case 2: if the tsfile has no tsfile resource before loading, we should - // construct it. - tsFileResource = constructTsFileResource(tsFile, device2Metadata, reader); - statement.addTsFileResource(tsFileResource); - - } else { - // case 3: the tsfile resource is created when loading, so we just need to update - // the resource. - FileLoaderUtils.updateTsFileResource(device2Metadata, tsFileResource); - } - - tsFileResource.setStatus(TsFileResourceStatus.NORMAL); - + if (hasDeviceSameDefinition(device2IsAligned, device, tsFile, isAligned)) { autoCreateAndVerifySchema(statement, device2Schemas, device2IsAligned, context); timeseriesCount = 0; } else { @@ -2699,7 +2669,7 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext> // when the number of devices does not exceed the threshold and it's not the last // timeseries of the last device, we also need to check if the device has the same aligned // definition in all tsfiles before going to the next device loop. - if (!isDeviceAligned(device2IsAligned, device, tsFile, isAligned)) { + if (!hasDeviceSameDefinition(device2IsAligned, device, tsFile, isAligned)) { throw new VerifyMetadataException( String.format( "Device %s has different aligned definition in tsFile %s and other TsFile.", @@ -2707,10 +2677,28 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext> } } } + + // construct TsFileResource + TsFileResource tsFileResource = new TsFileResource(tsFile); + + if (!tsFileResource.resourceFileExists()) { + if (device2Metadata == null) { + device2Metadata = reader.getAllTimeseriesMetadata(true); + } + FileLoaderUtils.updateTsFileResource( + device2Metadata, tsFileResource); // serialize it in LoadSingleTsFileNode + tsFileResource.updatePlanIndexes(reader.getMinPlanIndex()); + tsFileResource.updatePlanIndexes(reader.getMaxPlanIndex()); + } else { + tsFileResource.deserialize(); + } + + tsFileResource.setStatus(TsFileResourceStatus.NORMAL); + statement.addTsFileResource(tsFileResource); } } - private boolean isDeviceAligned( + private boolean hasDeviceSameDefinition( Map<String, Pair<Boolean, File>> device2IsAligned, String device, File tsFile, @@ -2721,25 +2709,6 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext> .equals(isAligned); } - private TsFileResource constructTsFileResource( - File tsFile, - Map<String, List<TimeseriesMetadata>> device2Metadata, - TsFileSequenceReader reader) - throws IOException { - TsFileResource resource = new TsFileResource(tsFile); - if (!resource.resourceFileExists()) { - FileLoaderUtils.updateTsFileResource( - device2Metadata, resource); // serialize it in LoadSingleTsFileNode - resource.updatePlanIndexes(reader.getMinPlanIndex()); - resource.updatePlanIndexes(reader.getMaxPlanIndex()); - } else { - resource.deserialize(); - } - - resource.setStatus(TsFileResourceStatus.NORMAL); - return resource; - } - private void autoCreateSg(int sgLevel, Map<String, Map<MeasurementSchema, File>> device2Schemas) throws VerifyMetadataException, LoadFileException, IllegalPathException { sgLevel += 1; // e.g. "root.sg" means sgLevel = 1, "root.sg.test" means sgLevel=2 @@ -2762,11 +2731,13 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext> DatabaseSchemaStatement statement = new DatabaseSchemaStatement(DatabaseSchemaStatement.DatabaseSchemaStatementType.CREATE); statement.setDatabasePath(sgPath); - executeSetStorageGroupStatement(statement); + // load doesn't print exception log + statement.setEnablePrintExceptionLog(false); + executeSetDatabaseStatement(statement); } } - private void executeSetStorageGroupStatement(Statement statement) throws LoadFileException { + private void executeSetDatabaseStatement(Statement statement) throws LoadFileException { long queryId = SessionManager.getInstance().requestQueryId(); ExecutionResult result = Coordinator.getInstance() diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java index 75d657c2383..8395791c226 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java @@ -266,11 +266,18 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor { TSStatus tsStatus = configNodeClient.setDatabase(databaseSchema); // Get response or throw exception if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != tsStatus.getCode()) { - LOGGER.warn( - "Failed to execute create database {} in config node, status is {}.", - databaseSchemaStatement.getDatabasePath(), - tsStatus); - future.setException(new IoTDBException(tsStatus.message, tsStatus.code)); + // If database already exists when loading, we do not throw exceptions to avoid printing too + // many logs + if ((TSStatusCode.DATABASE_ALREADY_EXISTS.getStatusCode() == tsStatus.getCode() + && databaseSchemaStatement.getEnablePrintExceptionLog())) { + LOGGER.warn( + "Failed to execute create database {} in config node, status is {}.", + databaseSchemaStatement.getDatabasePath(), + tsStatus); + future.setException(new IoTDBException(tsStatus.message, tsStatus.code)); + } else { + future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS)); + } } else { future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS)); } @@ -293,10 +300,12 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor { TSStatus tsStatus = configNodeClient.alterDatabase(databaseSchema); // Get response or throw exception if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != tsStatus.getCode()) { - LOGGER.warn( - "Failed to execute create database {} in config node, status is {}.", - databaseSchemaStatement.getDatabasePath(), - tsStatus); + if (databaseSchemaStatement.getEnablePrintExceptionLog()) { + LOGGER.warn( + "Failed to execute alter database {} in config node, status is {}.", + databaseSchemaStatement.getDatabasePath(), + tsStatus); + } future.setException(new IoTDBException(tsStatus.message, tsStatus.code)); } else { future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS)); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/DatabaseSchemaStatement.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/DatabaseSchemaStatement.java index ae8a15bf93c..00c02488c1d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/DatabaseSchemaStatement.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/DatabaseSchemaStatement.java @@ -40,6 +40,7 @@ public class DatabaseSchemaStatement extends Statement implements IConfigStateme private Long timePartitionInterval = null; private Integer schemaRegionGroupNum = null; private Integer dataRegionGroupNum = null; + private boolean enablePrintExceptionLog = true; public DatabaseSchemaStatement(DatabaseSchemaStatementType subType) { super(); @@ -107,6 +108,14 @@ public class DatabaseSchemaStatement extends Statement implements IConfigStateme this.dataRegionGroupNum = dataRegionGroupNum; } + public boolean getEnablePrintExceptionLog() { + return enablePrintExceptionLog; + } + + public void setEnablePrintExceptionLog(boolean enablePrintExceptionLog) { + this.enablePrintExceptionLog = enablePrintExceptionLog; + } + @Override public <R, C> R accept(StatementVisitor<R, C> visitor, C context) { switch (subType) {
