This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch rc/1.3.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rc/1.3.3 by this push:
new 7434fb76e07 [To rc/1.3.3] Fix the issue of restarting DataNode to
clean up InvalidDataRegion (#13543)
7434fb76e07 is described below
commit 7434fb76e07a36e8bf269d89a2562e8cad8be6c1
Author: 133tosakarin <[email protected]>
AuthorDate: Thu Sep 19 02:42:28 2024 +0800
[To rc/1.3.3] Fix the issue of restarting DataNode to clean up
InvalidDataRegion (#13543)
---
.../apache/iotdb/db/schemaengine/SchemaEngine.java | 60 ++++++++----
.../java/org/apache/iotdb/db/service/DataNode.java | 101 +++++++++++++++------
2 files changed, 113 insertions(+), 48 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/SchemaEngine.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/SchemaEngine.java
index 0e11510b83e..f322d955708 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/SchemaEngine.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/SchemaEngine.java
@@ -133,17 +133,12 @@ public class SchemaEngine {
}
}
- /**
- * Scan the database and schema region directories to recover schema regions
and return the
- * collected local schema partition info for localSchemaPartitionTable
recovery.
- */
- @SuppressWarnings("java:S2142")
- private void initSchemaRegion() {
- File schemaDir = new File(config.getSchemaDir());
- File[] sgDirList = schemaDir.listFiles();
-
+ public static Map<String, List<SchemaRegionId>> getLocalSchemaRegionInfo() {
+ final File schemaDir = new File(config.getSchemaDir());
+ final File[] sgDirList = schemaDir.listFiles();
+ final Map<String, List<SchemaRegionId>> localSchemaPartitionTable = new
HashMap<>();
if (sgDirList == null) {
- return;
+ return localSchemaPartitionTable;
}
// recover SchemaRegion concurrently
@@ -158,15 +153,15 @@ public class SchemaEngine {
continue;
}
- PartialPath storageGroup;
+ final PartialPath database;
try {
- storageGroup = new PartialPath(file.getName());
+ database = new PartialPath(file.getName());
} catch (IllegalPathException illegalPathException) {
// not a legal sg dir
continue;
}
- File sgDir = new File(config.getSchemaDir(), storageGroup.getFullPath());
+ final File sgDir = new File(config.getSchemaDir(),
database.getFullPath());
if (!sgDir.exists()) {
continue;
@@ -176,21 +171,48 @@ public class SchemaEngine {
if (schemaRegionDirs == null) {
continue;
}
-
- for (File schemaRegionDir : schemaRegionDirs) {
- SchemaRegionId schemaRegionId;
+ List<SchemaRegionId> schemaRegionIds = new ArrayList<>();
+ for (final File schemaRegionDir : schemaRegionDirs) {
+ final SchemaRegionId schemaRegionId;
try {
schemaRegionId = new
SchemaRegionId(Integer.parseInt(schemaRegionDir.getName()));
} catch (NumberFormatException e) {
// the dir/file is not schemaRegionDir, ignore this.
continue;
}
- futures.add(
-
schemaRegionRecoverPools.submit(recoverSchemaRegionTask(storageGroup,
schemaRegionId)));
+ schemaRegionIds.add(schemaRegionId);
}
+ localSchemaPartitionTable.put(database.getFullPath(), schemaRegionIds);
}
+ return localSchemaPartitionTable;
+ }
- for (Future<ISchemaRegion> future : futures) {
+ /**
+ * Scan the database and schema region directories to recover schema regions
and return the
+ * collected local schema partition info for localSchemaPartitionTable
recovery.
+ */
+ @SuppressWarnings("java:S2142")
+ private void initSchemaRegion() {
+ // recover SchemaRegion concurrently
+ Map<String, List<SchemaRegionId>> localSchemaRegionInfo =
getLocalSchemaRegionInfo();
+ final ExecutorService schemaRegionRecoverPools =
+ IoTDBThreadPoolFactory.newFixedThreadPool(
+ Runtime.getRuntime().availableProcessors(),
+ ThreadName.SCHEMA_REGION_RECOVER_TASK.getName());
+ final List<Future<ISchemaRegion>> futures = new ArrayList<>();
+ localSchemaRegionInfo.forEach(
+ (k, v) -> {
+ for (SchemaRegionId schemaRegionId : v) {
+ try {
+ futures.add(
+ schemaRegionRecoverPools.submit(
+ recoverSchemaRegionTask(new PartialPath(k),
schemaRegionId)));
+ } catch (IllegalPathException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ });
+ for (final Future<ISchemaRegion> future : futures) {
try {
ISchemaRegion schemaRegion = future.get();
schemaRegionMap.put(schemaRegion.getSchemaRegionId(), schemaRegion);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java
index 7cb87484271..2479640353e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java
@@ -32,6 +32,8 @@ import
org.apache.iotdb.commons.concurrent.IoTDBDefaultThreadExceptionHandler;
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
+import org.apache.iotdb.commons.consensus.DataRegionId;
+import org.apache.iotdb.commons.consensus.SchemaRegionId;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.exception.IoTDBException;
import org.apache.iotdb.commons.exception.StartupException;
@@ -122,6 +124,7 @@ import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
@@ -214,7 +217,6 @@ public class DataNode extends ServerCommandLine implements
DataNodeMBean {
// Pull and check system configurations from ConfigNode-leader
pullAndCheckSystemConfigurations();
-
if (isFirstStart) {
sendRegisterRequestToConfigNode(true);
IoTDBStartCheck.getInstance().generateOrOverwriteSystemPropertiesFile();
@@ -540,49 +542,92 @@ public class DataNode extends ServerCommandLine
implements DataNodeMBean {
}
private void removeInvalidRegions(List<ConsensusGroupId>
dataNodeConsensusGroupIds) {
+ removeInvalidConsensusDataRegions(dataNodeConsensusGroupIds);
+ removeInvalidDataRegions(dataNodeConsensusGroupIds);
+ removeInvalidConsensusSchemaRegions(dataNodeConsensusGroupIds);
+ removeInvalidSchemaRegions(dataNodeConsensusGroupIds);
+ }
+
+ private void removeInvalidDataRegions(List<ConsensusGroupId>
dataNodeConsensusGroupIds) {
+ Map<String, List<DataRegionId>> localDataRegionInfo =
+ StorageEngine.getInstance().getLocalDataRegionInfo();
+ List<String> allLocalFilesFolders =
TierManager.getInstance().getAllLocalFilesFolders();
+ localDataRegionInfo.forEach(
+ (database, dataRegionIds) -> {
+ for (DataRegionId dataRegionId : dataRegionIds) {
+ if (!dataNodeConsensusGroupIds.contains(dataRegionId)) {
+ removeDataDirRegion(database, dataRegionId,
allLocalFilesFolders);
+ }
+ }
+ });
+ }
+
+ private void removeInvalidConsensusDataRegions(List<ConsensusGroupId>
dataNodeConsensusGroupIds) {
List<ConsensusGroupId> invalidDataRegionConsensusGroupIds =
DataRegionConsensusImpl.getInstance().getAllConsensusGroupIdsWithoutStarting().stream()
.filter(consensusGroupId ->
!dataNodeConsensusGroupIds.contains(consensusGroupId))
.collect(Collectors.toList());
-
- List<ConsensusGroupId> invalidSchemaRegionConsensusGroupIds =
-
SchemaRegionConsensusImpl.getInstance().getAllConsensusGroupIdsWithoutStarting().stream()
- .filter(consensusGroupId ->
!dataNodeConsensusGroupIds.contains(consensusGroupId))
- .collect(Collectors.toList());
- removeInvalidDataRegions(invalidDataRegionConsensusGroupIds);
- removeInvalidSchemaRegions(invalidSchemaRegionConsensusGroupIds);
- }
-
- private void removeInvalidDataRegions(List<ConsensusGroupId>
invalidConsensusGroupId) {
- logger.info("Remove invalid dataRegion directories... {}",
invalidConsensusGroupId);
- for (ConsensusGroupId consensusGroupId : invalidConsensusGroupId) {
+ logger.info("Remove invalid dataRegion directories... {}",
invalidDataRegionConsensusGroupIds);
+ for (ConsensusGroupId consensusGroupId :
invalidDataRegionConsensusGroupIds) {
File oldDir =
new File(
DataRegionConsensusImpl.getInstance()
.getRegionDirFromConsensusGroupId(consensusGroupId));
- removeRegionsDir(oldDir);
+ removeDir(oldDir);
}
}
- private void removeInvalidSchemaRegions(List<ConsensusGroupId>
invalidConsensusGroupId) {
- logger.info("Remove invalid schemaRegion directories... {}",
invalidConsensusGroupId);
- for (ConsensusGroupId consensusGroupId : invalidConsensusGroupId) {
+ private void removeInvalidSchemaRegions(List<ConsensusGroupId>
schemaConsensusGroupIds) {
+ Map<String, List<SchemaRegionId>> localSchemaRegionInfo =
+ SchemaEngine.getLocalSchemaRegionInfo();
+ localSchemaRegionInfo.forEach(
+ (database, schemaRegionIds) -> {
+ for (SchemaRegionId schemaRegionId : schemaRegionIds) {
+ if (!schemaConsensusGroupIds.contains(schemaRegionId)) {
+ removeInvalidSchemaDir(database, schemaRegionId);
+ }
+ }
+ });
+ }
+
+ private void removeDataDirRegion(
+ String database, DataRegionId dataRegionId, List<String> fileFolders) {
+ fileFolders.forEach(
+ folder -> {
+ String regionDir =
+ folder + File.separator + database + File.separator +
dataRegionId.getId();
+ removeDir(new File(regionDir));
+ });
+ }
+
+ private void removeInvalidConsensusSchemaRegions(
+ List<ConsensusGroupId> dataNodeConsensusGroupIds) {
+ List<ConsensusGroupId> invalidSchemaRegionConsensusGroupIds =
+
SchemaRegionConsensusImpl.getInstance().getAllConsensusGroupIdsWithoutStarting().stream()
+ .filter(consensusGroupId ->
!dataNodeConsensusGroupIds.contains(consensusGroupId))
+ .collect(Collectors.toList());
+ logger.info(
+ "Remove invalid schemaRegion directories... {}",
invalidSchemaRegionConsensusGroupIds);
+
+ for (ConsensusGroupId consensusGroupId :
invalidSchemaRegionConsensusGroupIds) {
File oldDir =
new File(
SchemaRegionConsensusImpl.getInstance()
.getRegionDirFromConsensusGroupId(consensusGroupId));
- removeRegionsDir(oldDir);
+ removeDir(oldDir);
}
}
- private void removeRegionsDir(File regionDir) {
+ private void removeInvalidSchemaDir(String database, SchemaRegionId
schemaRegionId) {
+ String systemSchemaDir =
+ config.getSystemDir() + File.separator + database + File.separator +
schemaRegionId.getId();
+ removeDir(new File(systemSchemaDir));
+ }
+
+ private void removeDir(File regionDir) {
if (regionDir.exists()) {
- try {
- FileUtils.recursivelyDeleteFolder(regionDir.getPath());
- logger.info("delete {} succeed.", regionDir.getAbsolutePath());
- } catch (IOException e) {
- logger.error("delete {} failed.", regionDir.getAbsolutePath());
- }
+ FileUtils.deleteDirectoryAndEmptyParent(regionDir);
+ logger.info("delete {} succeed.", regionDir.getAbsolutePath());
} else {
logger.info("delete {} failed, because it does not exist.",
regionDir.getAbsolutePath());
}
@@ -641,12 +686,10 @@ public class DataNode extends ServerCommandLine
implements DataNodeMBean {
(endTime - startTime));
List<TConsensusGroupId> consensusGroupIds =
dataNodeRestartResp.getConsensusGroupIds();
- List<ConsensusGroupId> dataNodeConsensusGroupIds =
+ removeInvalidRegions(
consensusGroupIds.stream()
.map(ConsensusGroupId.Factory::createFromTConsensusGroupId)
- .collect(Collectors.toList());
-
- removeInvalidRegions(dataNodeConsensusGroupIds);
+ .collect(Collectors.toList()));
} else {
/* Throw exception when restart is rejected */
throw new StartupException(dataNodeRestartResp.getStatus().getMessage());