This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch rel/1.2
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/1.2 by this push:
new 8f4f3952933 [IOTDB-6094] Load: Fix error in if/else judgment of
tsfileResource exists or not & Reduce warn log when timeseries exist (#10762)
(#10792)
8f4f3952933 is described below
commit 8f4f3952933252529cea182f4970fa6e5db515f2
Author: Steve Yurong Su <[email protected]>
AuthorDate: Sat Aug 5 16:30:59 2023 +0800
[IOTDB-6094] Load: Fix error in if/else judgment of tsfileResource exists
or not & Reduce warn log when timeseries exist (#10762) (#10792)
(cherry picked from commit ebc95a772b6f3ac1de05718240ce63a76381e7b1)
Co-authored-by: Itami Sho <[email protected]>
---
.../queryengine/plan/analyze/AnalyzeVisitor.java | 91 ++++++++--------------
.../config/executor/ClusterConfigTaskExecutor.java | 27 ++++---
.../metadata/DatabaseSchemaStatement.java | 9 +++
3 files changed, 58 insertions(+), 69 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
index 448f5b08ff0..2e345832fee 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
@@ -2492,8 +2492,6 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
public Analysis visitLoadFile(LoadTsFileStatement loadTsFileStatement,
MPPQueryContext context) {
context.setQueryType(QueryType.WRITE);
- Map<String, Map<MeasurementSchema, File>> device2Schemas = new HashMap<>();
- Map<String, Pair<Boolean, File>> device2IsAligned = new HashMap<>();
int tsfileNum = loadTsFileStatement.getTsFiles().size();
// analyze tsfile metadata
@@ -2509,7 +2507,7 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
tsFile.getPath()));
}
try {
- analyzeTsFile(loadTsFileStatement, tsFile, device2Schemas,
device2IsAligned, context);
+ analyzeTsFile(loadTsFileStatement, tsFile, context);
} catch (IllegalArgumentException e) {
logger.warn(
String.format(
@@ -2598,21 +2596,16 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
return analysis;
}
- private void analyzeTsFile(
- LoadTsFileStatement statement,
- File tsFile,
- Map<String, Map<MeasurementSchema, File>> device2Schemas,
- Map<String, Pair<Boolean, File>> device2IsAligned,
- MPPQueryContext context)
+ private void analyzeTsFile(LoadTsFileStatement statement, File tsFile,
MPPQueryContext context)
throws IOException, VerifyMetadataException {
try (TsFileSequenceReader reader = new
TsFileSequenceReader(tsFile.getAbsolutePath())) {
- TsFileResource tsFileResource = new TsFileResource(tsFile);
- final boolean isAlreadyExistBeforeLoad =
tsFileResource.resourceFileExists();
- boolean isDeserializeDone = false;
+ Map<String, List<TimeseriesMetadata>> device2Metadata = null;
- Map<String, List<TimeseriesMetadata>> device2Metadata =
reader.getAllTimeseriesMetadata(true);
if (IoTDBDescriptor.getInstance().getConfig().isAutoCreateSchemaEnabled()
|| statement.isVerifySchema()) {
+ device2Metadata = reader.getAllTimeseriesMetadata(true);
+ Map<String, Map<MeasurementSchema, File>> device2Schemas = new
HashMap<>();
+ Map<String, Pair<Boolean, File>> device2IsAligned = new HashMap<>();
// construct schema
int deviceSize = device2Metadata.size();
int deviceCount = 0;
@@ -2656,30 +2649,7 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
&& timeseriesIndex == timeseriesMetadataListSize - 1)) {
// check if the device has the same aligned definition in all
tsfiles
- if (isDeviceAligned(device2IsAligned, device, tsFile,
isAligned)) {
- // case 1: if the tsfile has tsfile resource before loading,
we should deserialize
- // it only once.
- if (isAlreadyExistBeforeLoad) {
- if (!isDeserializeDone) {
- tsFileResource.deserialize();
- statement.addTsFileResource(tsFileResource);
- isDeserializeDone = true;
- }
-
- } else if (!tsFileResource.resourceFileExists()) {
- // case 2: if the tsfile has no tsfile resource before
loading, we should
- // construct it.
- tsFileResource = constructTsFileResource(tsFile,
device2Metadata, reader);
- statement.addTsFileResource(tsFileResource);
-
- } else {
- // case 3: the tsfile resource is created when loading, so
we just need to update
- // the resource.
- FileLoaderUtils.updateTsFileResource(device2Metadata,
tsFileResource);
- }
-
- tsFileResource.setStatus(TsFileResourceStatus.NORMAL);
-
+ if (hasDeviceSameDefinition(device2IsAligned, device, tsFile,
isAligned)) {
autoCreateAndVerifySchema(statement, device2Schemas,
device2IsAligned, context);
timeseriesCount = 0;
} else {
@@ -2699,7 +2669,7 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
// when the number of devices does not exceed the threshold and it's
not the last
// timeseries of the last device, we also need to check if the
device has the same aligned
// definition in all tsfiles before going to the next device loop.
- if (!isDeviceAligned(device2IsAligned, device, tsFile, isAligned)) {
+ if (!hasDeviceSameDefinition(device2IsAligned, device, tsFile,
isAligned)) {
throw new VerifyMetadataException(
String.format(
"Device %s has different aligned definition in tsFile %s
and other TsFile.",
@@ -2707,10 +2677,28 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
}
}
}
+
+ // construct TsFileResource
+ TsFileResource tsFileResource = new TsFileResource(tsFile);
+
+ if (!tsFileResource.resourceFileExists()) {
+ if (device2Metadata == null) {
+ device2Metadata = reader.getAllTimeseriesMetadata(true);
+ }
+ FileLoaderUtils.updateTsFileResource(
+ device2Metadata, tsFileResource); // serialize it in
LoadSingleTsFileNode
+ tsFileResource.updatePlanIndexes(reader.getMinPlanIndex());
+ tsFileResource.updatePlanIndexes(reader.getMaxPlanIndex());
+ } else {
+ tsFileResource.deserialize();
+ }
+
+ tsFileResource.setStatus(TsFileResourceStatus.NORMAL);
+ statement.addTsFileResource(tsFileResource);
}
}
- private boolean isDeviceAligned(
+ private boolean hasDeviceSameDefinition(
Map<String, Pair<Boolean, File>> device2IsAligned,
String device,
File tsFile,
@@ -2721,25 +2709,6 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
.equals(isAligned);
}
- private TsFileResource constructTsFileResource(
- File tsFile,
- Map<String, List<TimeseriesMetadata>> device2Metadata,
- TsFileSequenceReader reader)
- throws IOException {
- TsFileResource resource = new TsFileResource(tsFile);
- if (!resource.resourceFileExists()) {
- FileLoaderUtils.updateTsFileResource(
- device2Metadata, resource); // serialize it in LoadSingleTsFileNode
- resource.updatePlanIndexes(reader.getMinPlanIndex());
- resource.updatePlanIndexes(reader.getMaxPlanIndex());
- } else {
- resource.deserialize();
- }
-
- resource.setStatus(TsFileResourceStatus.NORMAL);
- return resource;
- }
-
private void autoCreateSg(int sgLevel, Map<String, Map<MeasurementSchema,
File>> device2Schemas)
throws VerifyMetadataException, LoadFileException, IllegalPathException {
sgLevel += 1; // e.g. "root.sg" means sgLevel = 1, "root.sg.test" means
sgLevel=2
@@ -2762,11 +2731,13 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
DatabaseSchemaStatement statement =
new
DatabaseSchemaStatement(DatabaseSchemaStatement.DatabaseSchemaStatementType.CREATE);
statement.setDatabasePath(sgPath);
- executeSetStorageGroupStatement(statement);
+ // load doesn't print exception log
+ statement.setEnablePrintExceptionLog(false);
+ executeSetDatabaseStatement(statement);
}
}
- private void executeSetStorageGroupStatement(Statement statement) throws
LoadFileException {
+ private void executeSetDatabaseStatement(Statement statement) throws
LoadFileException {
long queryId = SessionManager.getInstance().requestQueryId();
ExecutionResult result =
Coordinator.getInstance()
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
index 75d657c2383..8395791c226 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
@@ -266,11 +266,18 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
TSStatus tsStatus = configNodeClient.setDatabase(databaseSchema);
// Get response or throw exception
if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != tsStatus.getCode()) {
- LOGGER.warn(
- "Failed to execute create database {} in config node, status is
{}.",
- databaseSchemaStatement.getDatabasePath(),
- tsStatus);
- future.setException(new IoTDBException(tsStatus.message,
tsStatus.code));
+ // If database already exists when loading, we do not throw exceptions
to avoid printing too
+ // many logs
+ if ((TSStatusCode.DATABASE_ALREADY_EXISTS.getStatusCode() ==
tsStatus.getCode()
+ && databaseSchemaStatement.getEnablePrintExceptionLog())) {
+ LOGGER.warn(
+ "Failed to execute create database {} in config node, status is
{}.",
+ databaseSchemaStatement.getDatabasePath(),
+ tsStatus);
+ future.setException(new IoTDBException(tsStatus.message,
tsStatus.code));
+ } else {
+ future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
+ }
} else {
future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
}
@@ -293,10 +300,12 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
TSStatus tsStatus = configNodeClient.alterDatabase(databaseSchema);
// Get response or throw exception
if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != tsStatus.getCode()) {
- LOGGER.warn(
- "Failed to execute create database {} in config node, status is
{}.",
- databaseSchemaStatement.getDatabasePath(),
- tsStatus);
+ if (databaseSchemaStatement.getEnablePrintExceptionLog()) {
+ LOGGER.warn(
+ "Failed to execute alter database {} in config node, status is
{}.",
+ databaseSchemaStatement.getDatabasePath(),
+ tsStatus);
+ }
future.setException(new IoTDBException(tsStatus.message,
tsStatus.code));
} else {
future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/DatabaseSchemaStatement.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/DatabaseSchemaStatement.java
index ae8a15bf93c..00c02488c1d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/DatabaseSchemaStatement.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/DatabaseSchemaStatement.java
@@ -40,6 +40,7 @@ public class DatabaseSchemaStatement extends Statement
implements IConfigStateme
private Long timePartitionInterval = null;
private Integer schemaRegionGroupNum = null;
private Integer dataRegionGroupNum = null;
+ private boolean enablePrintExceptionLog = true;
public DatabaseSchemaStatement(DatabaseSchemaStatementType subType) {
super();
@@ -107,6 +108,14 @@ public class DatabaseSchemaStatement extends Statement
implements IConfigStateme
this.dataRegionGroupNum = dataRegionGroupNum;
}
+ public boolean getEnablePrintExceptionLog() {
+ return enablePrintExceptionLog;
+ }
+
+ public void setEnablePrintExceptionLog(boolean enablePrintExceptionLog) {
+ this.enablePrintExceptionLog = enablePrintExceptionLog;
+ }
+
@Override
public <R, C> R accept(StatementVisitor<R, C> visitor, C context) {
switch (subType) {