This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch rc/1.3.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rc/1.3.3 by this push:
     new 7434fb76e07 [To rc/1.3.3] Fix the issue of restarting DataNode to 
clean up InvalidDataRegion (#13543)
7434fb76e07 is described below

commit 7434fb76e07a36e8bf269d89a2562e8cad8be6c1
Author: 133tosakarin <[email protected]>
AuthorDate: Thu Sep 19 02:42:28 2024 +0800

    [To rc/1.3.3] Fix the issue of restarting DataNode to clean up 
InvalidDataRegion (#13543)
---
 .../apache/iotdb/db/schemaengine/SchemaEngine.java |  60 ++++++++----
 .../java/org/apache/iotdb/db/service/DataNode.java | 101 +++++++++++++++------
 2 files changed, 113 insertions(+), 48 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/SchemaEngine.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/SchemaEngine.java
index 0e11510b83e..f322d955708 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/SchemaEngine.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/SchemaEngine.java
@@ -133,17 +133,12 @@ public class SchemaEngine {
     }
   }
 
-  /**
-   * Scan the database and schema region directories to recover schema regions 
and return the
-   * collected local schema partition info for localSchemaPartitionTable 
recovery.
-   */
-  @SuppressWarnings("java:S2142")
-  private void initSchemaRegion() {
-    File schemaDir = new File(config.getSchemaDir());
-    File[] sgDirList = schemaDir.listFiles();
-
+  public static Map<String, List<SchemaRegionId>> getLocalSchemaRegionInfo() {
+    final File schemaDir = new File(config.getSchemaDir());
+    final File[] sgDirList = schemaDir.listFiles();
+    final Map<String, List<SchemaRegionId>> localSchemaPartitionTable = new 
HashMap<>();
     if (sgDirList == null) {
-      return;
+      return localSchemaPartitionTable;
     }
 
     // recover SchemaRegion concurrently
@@ -158,15 +153,15 @@ public class SchemaEngine {
         continue;
       }
 
-      PartialPath storageGroup;
+      final PartialPath database;
       try {
-        storageGroup = new PartialPath(file.getName());
+        database = new PartialPath(file.getName());
       } catch (IllegalPathException illegalPathException) {
         // not a legal sg dir
         continue;
       }
 
-      File sgDir = new File(config.getSchemaDir(), storageGroup.getFullPath());
+      final File sgDir = new File(config.getSchemaDir(), 
database.getFullPath());
 
       if (!sgDir.exists()) {
         continue;
@@ -176,21 +171,48 @@ public class SchemaEngine {
       if (schemaRegionDirs == null) {
         continue;
       }
-
-      for (File schemaRegionDir : schemaRegionDirs) {
-        SchemaRegionId schemaRegionId;
+      List<SchemaRegionId> schemaRegionIds = new ArrayList<>();
+      for (final File schemaRegionDir : schemaRegionDirs) {
+        final SchemaRegionId schemaRegionId;
         try {
           schemaRegionId = new 
SchemaRegionId(Integer.parseInt(schemaRegionDir.getName()));
         } catch (NumberFormatException e) {
           // the dir/file is not schemaRegionDir, ignore this.
           continue;
         }
-        futures.add(
-            
schemaRegionRecoverPools.submit(recoverSchemaRegionTask(storageGroup, 
schemaRegionId)));
+        schemaRegionIds.add(schemaRegionId);
       }
+      localSchemaPartitionTable.put(database.getFullPath(), schemaRegionIds);
     }
+    return localSchemaPartitionTable;
+  }
 
-    for (Future<ISchemaRegion> future : futures) {
+  /**
+   * Scan the database and schema region directories to recover schema regions 
and return the
+   * collected local schema partition info for localSchemaPartitionTable 
recovery.
+   */
+  @SuppressWarnings("java:S2142")
+  private void initSchemaRegion() {
+    // recover SchemaRegion concurrently
+    Map<String, List<SchemaRegionId>> localSchemaRegionInfo = 
getLocalSchemaRegionInfo();
+    final ExecutorService schemaRegionRecoverPools =
+        IoTDBThreadPoolFactory.newFixedThreadPool(
+            Runtime.getRuntime().availableProcessors(),
+            ThreadName.SCHEMA_REGION_RECOVER_TASK.getName());
+    final List<Future<ISchemaRegion>> futures = new ArrayList<>();
+    localSchemaRegionInfo.forEach(
+        (k, v) -> {
+          for (SchemaRegionId schemaRegionId : v) {
+            try {
+              futures.add(
+                  schemaRegionRecoverPools.submit(
+                      recoverSchemaRegionTask(new PartialPath(k), 
schemaRegionId)));
+            } catch (IllegalPathException e) {
+              throw new RuntimeException(e);
+            }
+          }
+        });
+    for (final Future<ISchemaRegion> future : futures) {
       try {
         ISchemaRegion schemaRegion = future.get();
         schemaRegionMap.put(schemaRegion.getSchemaRegionId(), schemaRegion);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java
index 7cb87484271..2479640353e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java
@@ -32,6 +32,8 @@ import 
org.apache.iotdb.commons.concurrent.IoTDBDefaultThreadExceptionHandler;
 import org.apache.iotdb.commons.conf.CommonDescriptor;
 import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.commons.consensus.ConsensusGroupId;
+import org.apache.iotdb.commons.consensus.DataRegionId;
+import org.apache.iotdb.commons.consensus.SchemaRegionId;
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.exception.IoTDBException;
 import org.apache.iotdb.commons.exception.StartupException;
@@ -122,6 +124,7 @@ import java.nio.charset.Charset;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.concurrent.TimeUnit;
@@ -214,7 +217,6 @@ public class DataNode extends ServerCommandLine implements 
DataNodeMBean {
 
       // Pull and check system configurations from ConfigNode-leader
       pullAndCheckSystemConfigurations();
-
       if (isFirstStart) {
         sendRegisterRequestToConfigNode(true);
         
IoTDBStartCheck.getInstance().generateOrOverwriteSystemPropertiesFile();
@@ -540,49 +542,92 @@ public class DataNode extends ServerCommandLine 
implements DataNodeMBean {
   }
 
   private void removeInvalidRegions(List<ConsensusGroupId> 
dataNodeConsensusGroupIds) {
+    removeInvalidConsensusDataRegions(dataNodeConsensusGroupIds);
+    removeInvalidDataRegions(dataNodeConsensusGroupIds);
+    removeInvalidConsensusSchemaRegions(dataNodeConsensusGroupIds);
+    removeInvalidSchemaRegions(dataNodeConsensusGroupIds);
+  }
+
+  private void removeInvalidDataRegions(List<ConsensusGroupId> 
dataNodeConsensusGroupIds) {
+    Map<String, List<DataRegionId>> localDataRegionInfo =
+        StorageEngine.getInstance().getLocalDataRegionInfo();
+    List<String> allLocalFilesFolders = 
TierManager.getInstance().getAllLocalFilesFolders();
+    localDataRegionInfo.forEach(
+        (database, dataRegionIds) -> {
+          for (DataRegionId dataRegionId : dataRegionIds) {
+            if (!dataNodeConsensusGroupIds.contains(dataRegionId)) {
+              removeDataDirRegion(database, dataRegionId, 
allLocalFilesFolders);
+            }
+          }
+        });
+  }
+
+  private void removeInvalidConsensusDataRegions(List<ConsensusGroupId> 
dataNodeConsensusGroupIds) {
     List<ConsensusGroupId> invalidDataRegionConsensusGroupIds =
         
DataRegionConsensusImpl.getInstance().getAllConsensusGroupIdsWithoutStarting().stream()
             .filter(consensusGroupId -> 
!dataNodeConsensusGroupIds.contains(consensusGroupId))
             .collect(Collectors.toList());
-
-    List<ConsensusGroupId> invalidSchemaRegionConsensusGroupIds =
-        
SchemaRegionConsensusImpl.getInstance().getAllConsensusGroupIdsWithoutStarting().stream()
-            .filter(consensusGroupId -> 
!dataNodeConsensusGroupIds.contains(consensusGroupId))
-            .collect(Collectors.toList());
-    removeInvalidDataRegions(invalidDataRegionConsensusGroupIds);
-    removeInvalidSchemaRegions(invalidSchemaRegionConsensusGroupIds);
-  }
-
-  private void removeInvalidDataRegions(List<ConsensusGroupId> 
invalidConsensusGroupId) {
-    logger.info("Remove invalid dataRegion directories... {}", 
invalidConsensusGroupId);
-    for (ConsensusGroupId consensusGroupId : invalidConsensusGroupId) {
+    logger.info("Remove invalid dataRegion directories... {}", 
invalidDataRegionConsensusGroupIds);
+    for (ConsensusGroupId consensusGroupId : 
invalidDataRegionConsensusGroupIds) {
       File oldDir =
           new File(
               DataRegionConsensusImpl.getInstance()
                   .getRegionDirFromConsensusGroupId(consensusGroupId));
-      removeRegionsDir(oldDir);
+      removeDir(oldDir);
     }
   }
 
-  private void removeInvalidSchemaRegions(List<ConsensusGroupId> 
invalidConsensusGroupId) {
-    logger.info("Remove invalid schemaRegion directories... {}", 
invalidConsensusGroupId);
-    for (ConsensusGroupId consensusGroupId : invalidConsensusGroupId) {
+  private void removeInvalidSchemaRegions(List<ConsensusGroupId> 
schemaConsensusGroupIds) {
+    Map<String, List<SchemaRegionId>> localSchemaRegionInfo =
+        SchemaEngine.getLocalSchemaRegionInfo();
+    localSchemaRegionInfo.forEach(
+        (database, schemaRegionIds) -> {
+          for (SchemaRegionId schemaRegionId : schemaRegionIds) {
+            if (!schemaConsensusGroupIds.contains(schemaRegionId)) {
+              removeInvalidSchemaDir(database, schemaRegionId);
+            }
+          }
+        });
+  }
+
+  private void removeDataDirRegion(
+      String database, DataRegionId dataRegionId, List<String> fileFolders) {
+    fileFolders.forEach(
+        folder -> {
+          String regionDir =
+              folder + File.separator + database + File.separator + 
dataRegionId.getId();
+          removeDir(new File(regionDir));
+        });
+  }
+
+  private void removeInvalidConsensusSchemaRegions(
+      List<ConsensusGroupId> dataNodeConsensusGroupIds) {
+    List<ConsensusGroupId> invalidSchemaRegionConsensusGroupIds =
+        
SchemaRegionConsensusImpl.getInstance().getAllConsensusGroupIdsWithoutStarting().stream()
+            .filter(consensusGroupId -> 
!dataNodeConsensusGroupIds.contains(consensusGroupId))
+            .collect(Collectors.toList());
+    logger.info(
+        "Remove invalid schemaRegion directories... {}", 
invalidSchemaRegionConsensusGroupIds);
+
+    for (ConsensusGroupId consensusGroupId : 
invalidSchemaRegionConsensusGroupIds) {
       File oldDir =
           new File(
               SchemaRegionConsensusImpl.getInstance()
                   .getRegionDirFromConsensusGroupId(consensusGroupId));
-      removeRegionsDir(oldDir);
+      removeDir(oldDir);
     }
   }
 
-  private void removeRegionsDir(File regionDir) {
+  private void removeInvalidSchemaDir(String database, SchemaRegionId 
schemaRegionId) {
+    String systemSchemaDir =
+        config.getSystemDir() + File.separator + database + File.separator + 
schemaRegionId.getId();
+    removeDir(new File(systemSchemaDir));
+  }
+
+  private void removeDir(File regionDir) {
     if (regionDir.exists()) {
-      try {
-        FileUtils.recursivelyDeleteFolder(regionDir.getPath());
-        logger.info("delete {} succeed.", regionDir.getAbsolutePath());
-      } catch (IOException e) {
-        logger.error("delete {} failed.", regionDir.getAbsolutePath());
-      }
+      FileUtils.deleteDirectoryAndEmptyParent(regionDir);
+      logger.info("delete {} succeed.", regionDir.getAbsolutePath());
     } else {
       logger.info("delete {} failed, because it does not exist.", 
regionDir.getAbsolutePath());
     }
@@ -641,12 +686,10 @@ public class DataNode extends ServerCommandLine 
implements DataNodeMBean {
           (endTime - startTime));
 
       List<TConsensusGroupId> consensusGroupIds = 
dataNodeRestartResp.getConsensusGroupIds();
-      List<ConsensusGroupId> dataNodeConsensusGroupIds =
+      removeInvalidRegions(
           consensusGroupIds.stream()
               .map(ConsensusGroupId.Factory::createFromTConsensusGroupId)
-              .collect(Collectors.toList());
-
-      removeInvalidRegions(dataNodeConsensusGroupIds);
+              .collect(Collectors.toList()));
     } else {
       /* Throw exception when restart is rejected */
       throw new StartupException(dataNodeRestartResp.getStatus().getMessage());

Reply via email to