This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch region_migration
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/region_migration by this push:
     new b7be36ca595 [To region_migration] cn pull from dn, IT improve, 
coordinator avoid submit duplicate (#12127)
b7be36ca595 is described below

commit b7be36ca5951de688c398a28647fb56277880da8
Author: Li Yu Heng <[email protected]>
AuthorDate: Thu Mar 7 20:44:29 2024 +0800

    [To region_migration] cn pull from dn, IT improve, coordinator avoid submit 
duplicate (#12127)
---
 .../iotdb/it/env/cluster/env/AbstractEnv.java      |   2 +-
 .../it/env/cluster/node/AbstractNodeWrapper.java   |  14 +-
 .../it/IoTDBRegionMigrateReliabilityIT.java        | 263 ++++++++++++---------
 .../confignode/it/procedure/IoTDBProcedureIT.java  |   4 +-
 .../iotdb/confignode/manager/ConfigManager.java    |  11 -
 .../apache/iotdb/confignode/manager/IManager.java  |   9 -
 .../iotdb/confignode/manager/ProcedureManager.java |  37 +--
 .../iotdb/confignode/manager/node/NodeManager.java |   6 +-
 .../procedure/env/ConfigNodeProcedureEnv.java      |   6 +-
 ...moveHandler.java => RegionMaintainHandler.java} |  51 +++-
 .../impl/node/RemoveDataNodeProcedure.java         |   4 +-
 .../impl/statemachine/AddRegionPeerProcedure.java  |  89 ++-----
 .../impl/statemachine/RegionMigrateProcedure.java  |  10 +-
 .../statemachine/RemoveRegionPeerProcedure.java    |  95 ++------
 .../thrift/ConfigNodeRPCServiceProcessor.java      |   6 -
 .../iotdb/db/protocol/client/ConfigNodeClient.java |   7 -
 .../impl/DataNodeInternalRPCServiceImpl.java       |   6 +
 .../iotdb/db/service/RegionMigrateService.java     | 141 ++++++-----
 .../thrift-commons/src/main/thrift/common.thrift   |  15 ++
 .../src/main/thrift/confignode.thrift              |   9 -
 .../src/main/thrift/datanode.thrift                |   6 +
 21 files changed, 359 insertions(+), 432 deletions(-)

diff --git 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java
 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java
index 257d813982c..569059c56a0 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java
@@ -344,7 +344,7 @@ public abstract class AbstractEnv implements BaseEnv {
     for (AbstractNodeWrapper nodeWrapper :
         Stream.concat(this.dataNodeWrapperList.stream(), 
this.configNodeWrapperList.stream())
             .collect(Collectors.toList())) {
-      nodeWrapper.stop();
+      nodeWrapper.stopForcibly();
       nodeWrapper.destroyDir();
       String lockPath = EnvUtils.getLockFilePath(nodeWrapper.getPort());
       if (!new File(lockPath).delete()) {
diff --git 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AbstractNodeWrapper.java
 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AbstractNodeWrapper.java
index dbc43f8fc41..a27d57e4720 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AbstractNodeWrapper.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AbstractNodeWrapper.java
@@ -475,15 +475,19 @@ public abstract class AbstractNodeWrapper implements 
BaseNodeWrapper {
     if (this.instance == null) {
       return;
     }
+    logger.info("Node {} will be shutdown soon", this.getPort());
     this.instance.destroy();
     try {
       if (!this.instance.waitFor(20, TimeUnit.SECONDS)) {
+        logger.warn("Node {} will be shutdown forcibly soon", this.getPort());
         this.instance.destroyForcibly().waitFor(10, TimeUnit.SECONDS);
       }
     } catch (InterruptedException e) {
       Thread.currentThread().interrupt();
-      logger.error("Waiting node to shutdown error. %s", e);
+      logger.error("Waiting node to shutdown error.", e);
+      return;
     }
+    logger.info("Node {} has been shutdown", this.getPort());
   }
 
   @Override
@@ -492,12 +496,14 @@ public abstract class AbstractNodeWrapper implements 
BaseNodeWrapper {
       return;
     }
     try {
-      this.instance.destroyForcibly().waitFor(5, TimeUnit.SECONDS);
-      logger.info("Node {} has been successfully forcibly stopped", nodePort);
+      logger.info("Node {} will be shutdown forcibly soon", this.getPort());
+      this.instance.destroyForcibly().waitFor(10, TimeUnit.SECONDS);
     } catch (InterruptedException e) {
       Thread.currentThread().interrupt();
-      logger.error("Waiting node to shutdown error. %s", e);
+      logger.error("Waiting node to shutdown error.", e);
+      return;
     }
+    logger.info("Node {} has been shutdown", this.getPort());
   }
 
   @Override
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBRegionMigrateReliabilityIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBRegionMigrateReliabilityIT.java
index 18ddac96ac5..812f393e86c 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBRegionMigrateReliabilityIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBRegionMigrateReliabilityIT.java
@@ -23,12 +23,16 @@ import 
org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
 import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
 import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.confignode.procedure.state.AddRegionPeerState;
 import org.apache.iotdb.confignode.procedure.state.RegionTransitionState;
 import org.apache.iotdb.consensus.ConsensusFactory;
 import org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant;
 import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.env.cluster.node.AbstractNodeWrapper;
+import org.apache.iotdb.it.env.cluster.node.ConfigNodeWrapper;
 
 import org.awaitility.Awaitility;
+import org.awaitility.core.ConditionTimeoutException;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -44,14 +48,18 @@ import java.sql.Connection;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
-import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 public class IoTDBRegionMigrateReliabilityIT {
   private static final Logger LOGGER =
@@ -81,61 +89,95 @@ public class IoTDBRegionMigrateReliabilityIT {
 
   @Test
   public void normal1C2DTest() throws Exception {
-    EnvFactory.getEnv()
-        .getConfig()
-        .getCommonConfig()
-        .setDataReplicationFactor(1)
-        .setSchemaReplicationFactor(1);
-    EnvFactory.getEnv().initClusterEnvironment(1, 2);
+    generalTest(1, 1, 1, 2, Collections.emptySet(), Collections.emptySet());
+  }
 
-    try (final Connection connection = EnvFactory.getEnv().getConnection();
-        final Statement statement = connection.createStatement()) {
+  @Test
+  public void normal3C3DTest() throws Exception {
+    generalTest(2, 3, 3, 3, Collections.emptySet(), Collections.emptySet());
+  }
 
-      statement.execute(INSERTION);
+  // endregion
 
-      ResultSet result = statement.executeQuery(SHOW_REGIONS);
-      Map<Integer, Set<Integer>> regionMap = getRegionMap(result);
+  // region ConfigNode crash tests
+  @Test
+  public void cnCrashDuringPreCheck() throws Exception {
+    generalTest(
+        1,
+        1,
+        1,
+        2,
+        Stream.of(RegionTransitionState.REGION_MIGRATE_PREPARE.toString())
+            .collect(Collectors.toSet()),
+        Collections.emptySet());
+  }
 
-      result = statement.executeQuery(SHOW_DATANODES);
-      Set<Integer> dataNodeSet = new HashSet<>();
-      while (result.next()) {
-        dataNodeSet.add(result.getInt(ColumnHeaderConstant.NODE_ID));
-      }
+  @Test
+  public void cnCrashDuringCreatePeer() throws Exception {
+    generalTest(
+        1,
+        1,
+        1,
+        2,
+        
Stream.of(AddRegionPeerState.CREATE_NEW_REGION_PEER.toString()).collect(Collectors.toSet()),
+        Collections.emptySet());
+  }
 
-      final int selectedRegion = selectRegion(regionMap);
-      final int originalDataNode = selectOriginalDataNode(regionMap, 
selectedRegion);
-      final int destDataNode = selectDestDataNode(dataNodeSet, regionMap, 
selectedRegion);
+  @Test
+  public void cnCrashDuringDoAddPeer() throws Exception {
+    generalTest(
+        1,
+        1,
+        1,
+        2,
+        
Stream.of(AddRegionPeerState.DO_ADD_REGION_PEER.toString()).collect(Collectors.toSet()),
+        Collections.emptySet());
+  }
 
-      // set breakpoint
-      HashMap<String, Runnable> keywordAction = new HashMap<>();
-      Arrays.stream(RegionTransitionState.values())
-          .forEach(
-              state ->
-                  keywordAction.put(
-                      String.valueOf(state), () -> 
LOGGER.info(String.valueOf(state))));
-      ExecutorService service = 
IoTDBThreadPoolFactory.newCachedThreadPool("regionMigrateIT");
-      LOGGER.info("breakpoint setting...");
-      service.submit(() -> logBreakpointMonitor(0, keywordAction));
-      LOGGER.info("breakpoint set");
+  @Test
+  public void cnCrashDuring() throws Exception {
+    generalTest(
+        1,
+        1,
+        1,
+        2,
+        Stream.of(AddRegionPeerState.UPDATE_REGION_LOCATION_CACHE.toString())
+            .collect(Collectors.toSet()),
+        Collections.emptySet());
+  }
 
-      statement.execute(regionMigrateCommand(selectedRegion, originalDataNode, 
destDataNode));
+  // TODO: other cn crash test
 
-      awaitUntilSuccess(statement, selectedRegion, originalDataNode, 
destDataNode);
+  // endregion
 
-      checkRegionFileClear(originalDataNode);
+  // region DataNode crash tests
 
-      LOGGER.info("test pass");
-    }
-  }
+  // endregion
 
-  @Test
-  public void normal3C3DTest() throws Exception {
+  // region Helpers
+
+  public void generalTest(
+      final int dataReplicateFactor,
+      final int schemaReplicationFactor,
+      final int configNodeNum,
+      final int dataNodeNum,
+      Set<String> killConfigNodeKeywords,
+      Set<String> killDataNodeKeywords // TODO:此参数尚未生效
+      ) throws Exception {
+    // prepare env
     EnvFactory.getEnv()
         .getConfig()
         .getCommonConfig()
-        .setDataReplicationFactor(2)
-        .setSchemaReplicationFactor(3);
-    EnvFactory.getEnv().initClusterEnvironment(3, 3);
+        .setDataReplicationFactor(dataReplicateFactor)
+        .setSchemaReplicationFactor(schemaReplicationFactor);
+    EnvFactory.getEnv().initClusterEnvironment(configNodeNum, dataNodeNum);
+
+    ExecutorService service = 
IoTDBThreadPoolFactory.newCachedThreadPool("regionMigrateIT");
+    EnvFactory.getEnv()
+        .getConfigNodeWrapperList()
+        .forEach(
+            configNodeWrapper ->
+                service.submit(() -> nodeLogKillPoint(configNodeWrapper, 
killConfigNodeKeywords)));
 
     try (final Connection connection = EnvFactory.getEnv().getConnection();
         final Statement statement = connection.createStatement();
@@ -157,94 +199,62 @@ public class IoTDBRegionMigrateReliabilityIT {
       final int originalDataNode = selectOriginalDataNode(regionMap, 
selectedRegion);
       final int destDataNode = selectDestDataNode(dataNodeSet, regionMap, 
selectedRegion);
 
-      // set breakpoint
-      HashMap<String, Runnable> keywordAction = new HashMap<>();
-      Arrays.stream(RegionTransitionState.values())
-          .forEach(
-              state ->
-                  keywordAction.put(
-                      String.valueOf(state), () -> 
LOGGER.info(String.valueOf(state))));
-      ExecutorService service = 
IoTDBThreadPoolFactory.newCachedThreadPool("regionMigrateIT");
-      LOGGER.info("breakpoint setting...");
-      service.submit(() -> logBreakpointMonitor(0, keywordAction));
-      service.submit(() -> logBreakpointMonitor(1, keywordAction));
-      service.submit(() -> logBreakpointMonitor(2, keywordAction));
-      LOGGER.info("breakpoint set");
-
       statement.execute(regionMigrateCommand(selectedRegion, originalDataNode, 
destDataNode));
 
       awaitUntilSuccess(statement, selectedRegion, originalDataNode, 
destDataNode);
 
       checkRegionFileClear(originalDataNode);
-
-      LOGGER.info("test pass");
     }
+    LOGGER.info("test pass");
   }
 
-  // endregion
-
-  // region ConfigNode crash tests
-  @Test
-  public void cnCrashDuringPreCheck() {}
-
-  @Test
-  public void cnCrashDuringCreatePeer() {}
-
-  @Test
-  public void cnCrashDuringAddPeer() {}
-
-  // TODO: other cn crash test
-
-  // endregion
-
-  // region DataNode crash tests
-
-  // endregion
-
-  // region Helpers
-
   /**
    * Monitor the node's log and do something.
    *
-   * @param nodeIndex
-   * @param keywordAction Map<keyword, action>
+   * @param nodeWrapper Easy to understand
+   * @param killNodeKeywords When detect these keywords in node's log, stop 
the node forcibly
    */
-  private static void logBreakpointMonitor(int nodeIndex, HashMap<String, 
Runnable> keywordAction) {
+  private static void nodeLogKillPoint(
+      AbstractNodeWrapper nodeWrapper, Set<String> killNodeKeywords) {
+    if (killNodeKeywords.isEmpty()) {
+      return;
+    }
+    final String logFileName;
+    if (nodeWrapper instanceof ConfigNodeWrapper) {
+      logFileName = "log_confignode_all.log";
+    } else {
+      logFileName = "log_datanode_all.log";
+    }
     ProcessBuilder builder =
         new ProcessBuilder(
             "tail",
             "-f",
-            EnvFactory.getEnv().getConfigNodeWrapper(nodeIndex).getNodePath()
-                + File.separator
-                + "logs"
-                + File.separator
-                + "log_confignode_all.log");
-    builder.redirectErrorStream(true); // 将错误输出和标准输出合并
+            nodeWrapper.getNodePath() + File.separator + "logs" + 
File.separator + logFileName);
+    builder.redirectErrorStream(true);
 
     try {
-      Process process = builder.start(); // 开始执行命令
-      // 读取命令的输出
+      Process process = builder.start();
       try (BufferedReader reader =
           new BufferedReader(new InputStreamReader(process.getInputStream()))) 
{
         String line;
         while ((line = reader.readLine()) != null) {
-          Set<String> detected = new HashSet<>();
+          // if trigger more than one keyword at a same time, test code may 
have mistakes
+          
Assert.assertTrue(killNodeKeywords.stream().filter(line::contains).count() <= 
1);
           String finalLine = line;
-          keywordAction
-              .keySet()
-              .forEach(
-                  k -> {
-                    if (finalLine.contains(k)) {
-                      detected.add(k);
-                    }
-                  });
-          detected.forEach(
-              k -> {
-                keywordAction.get(k).run();
-                //
-                // 
EnvFactory.getEnv().getConfigNodeWrapper(nodeIndex).stopForcibly();
-                keywordAction.remove(k);
-              });
+          Optional<String> detectedKeyword =
+              killNodeKeywords.stream()
+                  .filter(keyword -> finalLine.contains("breakpoint:" + 
keyword))
+                  .findAny();
+          if (detectedKeyword.isPresent()) {
+            // reboot the node
+            nodeWrapper.stopForcibly();
+            nodeWrapper.start();
+            // each keyword only trigger once
+            killNodeKeywords.remove(detectedKeyword.get());
+          }
+          if (killNodeKeywords.isEmpty()) {
+            break;
+          }
         }
       }
     } catch (IOException e) {
@@ -279,7 +289,7 @@ public class IoTDBRegionMigrateReliabilityIT {
       Map<Integer, Set<Integer>> regionMap, int selectedRegion) {
     return regionMap.get(selectedRegion).stream()
         .findAny()
-        .orElseThrow(() -> new RuntimeException("gg"));
+        .orElseThrow(() -> new RuntimeException("cannot find original 
DataNode"));
   }
 
   private static int selectDestDataNode(
@@ -287,20 +297,37 @@ public class IoTDBRegionMigrateReliabilityIT {
     return dataNodeSet.stream()
         .filter(dataNodeId -> 
!regionMap.get(selectedRegion).contains(dataNodeId))
         .findAny()
-        .orElseThrow(() -> new RuntimeException("gg"));
+        .orElseThrow(() -> new RuntimeException("cannot find dest DataNode"));
   }
 
   private static void awaitUntilSuccess(
       Statement statement, int selectedRegion, int originalDataNode, int 
destDataNode) {
-    Awaitility.await()
-        .atMost(1, TimeUnit.MINUTES)
-        .until(
-            () -> {
-              Map<Integer, Set<Integer>> newRegionMap =
-                  getRegionMap(statement.executeQuery(SHOW_REGIONS));
-              Set<Integer> dataNodes = newRegionMap.get(selectedRegion);
-              return !dataNodes.contains(originalDataNode) && 
dataNodes.contains(destDataNode);
-            });
+    AtomicReference<Set<Integer>> lastTimeDataNodes = new AtomicReference<>();
+    try {
+      Awaitility.await()
+          .atMost(1, TimeUnit.MINUTES)
+          .until(
+              () -> {
+                try {
+                  Map<Integer, Set<Integer>> newRegionMap =
+                      getRegionMap(statement.executeQuery(SHOW_REGIONS));
+                  Set<Integer> dataNodes = newRegionMap.get(selectedRegion);
+                  lastTimeDataNodes.set(dataNodes);
+                  return !dataNodes.contains(originalDataNode) && 
dataNodes.contains(destDataNode);
+                } catch (Exception e) {
+                  // Any exception can be ignored
+                  return false;
+                }
+              });
+    } catch (ConditionTimeoutException e) {
+      //      Set<Integer> expectation = new Set<>(lastTimeDataNodes);
+      String actualSetStr = lastTimeDataNodes.get().toString();
+      lastTimeDataNodes.get().remove(originalDataNode);
+      lastTimeDataNodes.get().add(destDataNode);
+      String expectSetStr = lastTimeDataNodes.toString();
+      LOGGER.info("DataNode Set {} is unexpected, expect {}", actualSetStr, 
expectSetStr);
+      throw e;
+    }
   }
 
   /** Check whether the original DataNode's region file has been deleted. */
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/confignode/it/procedure/IoTDBProcedureIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/procedure/IoTDBProcedureIT.java
index 4d0f388a33b..831879da0d2 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/confignode/it/procedure/IoTDBProcedureIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/procedure/IoTDBProcedureIT.java
@@ -79,11 +79,13 @@ public class IoTDBProcedureIT {
   @Test
   public void procedureRecoverAtAnotherConfigNodeTest() throws Exception {
     recoverTest(3, false);
+    LOGGER.info("test pass");
   }
 
   @Test
   public void procedureRecoverAtTheSameConfigNodeTest() throws Exception {
     recoverTest(1, true);
+    LOGGER.info("test pass");
   }
 
   private void recoverTest(int configNodeNum, boolean needRestartLeader) 
throws Exception {
@@ -114,7 +116,7 @@ public class IoTDBProcedureIT {
     Assert.assertTrue(resp.getDatabaseInfoMap().size() < MAX_STATE);
     // Then shutdown the leader, wait the new leader exist and the procedure 
continue
     final int oldLeaderIndex = EnvFactory.getEnv().getLeaderConfigNodeIndex();
-    EnvFactory.getEnv().getConfigNodeWrapper(oldLeaderIndex).stop();
+    EnvFactory.getEnv().getConfigNodeWrapper(oldLeaderIndex).stopForcibly();
     if (needRestartLeader) {
       EnvFactory.getEnv().getConfigNodeWrapper(oldLeaderIndex).start();
     }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index 4f45ea5587a..b4102c3ed6f 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -151,7 +151,6 @@ import 
org.apache.iotdb.confignode.rpc.thrift.TGetUDFTableResp;
 import org.apache.iotdb.confignode.rpc.thrift.TMigrateRegionReq;
 import org.apache.iotdb.confignode.rpc.thrift.TNodeVersionInfo;
 import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp;
-import org.apache.iotdb.confignode.rpc.thrift.TRegionMigrateResultReportReq;
 import org.apache.iotdb.confignode.rpc.thrift.TRegionRouteMapResp;
 import org.apache.iotdb.confignode.rpc.thrift.TSchemaNodeManagementResp;
 import org.apache.iotdb.confignode.rpc.thrift.TSchemaPartitionTableResp;
@@ -411,16 +410,6 @@ public class ConfigManager implements IManager {
     return status;
   }
 
-  @Override
-  public TSStatus reportRegionMigrateResult(TRegionMigrateResultReportReq req) 
{
-    TSStatus status = confirmLeader();
-    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-      // TODO: 这里需要修改report机制,改为向AddRegionPeerProcedure汇报
-      procedureManager.reportRegionMigrateResult(req);
-    }
-    return status;
-  }
-
   @Override
   public DataSet getDataNodeConfiguration(
       GetDataNodeConfigurationPlan getDataNodeConfigurationPlan) {
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
index e9e310bc342..368eaf5fb8a 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
@@ -91,7 +91,6 @@ import 
org.apache.iotdb.confignode.rpc.thrift.TGetTriggerTableResp;
 import org.apache.iotdb.confignode.rpc.thrift.TGetUDFTableResp;
 import org.apache.iotdb.confignode.rpc.thrift.TMigrateRegionReq;
 import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp;
-import org.apache.iotdb.confignode.rpc.thrift.TRegionMigrateResultReportReq;
 import org.apache.iotdb.confignode.rpc.thrift.TRegionRouteMapResp;
 import org.apache.iotdb.confignode.rpc.thrift.TSchemaNodeManagementResp;
 import org.apache.iotdb.confignode.rpc.thrift.TSchemaPartitionTableResp;
@@ -242,14 +241,6 @@ public interface IManager {
    */
   TSStatus reportDataNodeShutdown(TDataNodeLocation dataNodeLocation);
 
-  /**
-   * DataNode report region migrate result to ConfigNode when remove DataNode.
-   *
-   * @param req TRegionMigrateResultReportReq
-   * @return TSStatus
-   */
-  TSStatus reportRegionMigrateResult(TRegionMigrateResultReportReq req);
-
   /**
    * Get DataNode info.
    *
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
index 5d10d8f7871..5a9b08f548e 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
@@ -51,7 +51,7 @@ import org.apache.iotdb.confignode.procedure.Procedure;
 import org.apache.iotdb.confignode.procedure.ProcedureExecutor;
 import org.apache.iotdb.confignode.procedure.ProcedureMetrics;
 import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
-import org.apache.iotdb.confignode.procedure.env.DataNodeRemoveHandler;
+import org.apache.iotdb.confignode.procedure.env.RegionMaintainHandler;
 import org.apache.iotdb.confignode.procedure.impl.CreateManyDatabasesProcedure;
 import org.apache.iotdb.confignode.procedure.impl.cq.CreateCQProcedure;
 import org.apache.iotdb.confignode.procedure.impl.node.AddConfigNodeProcedure;
@@ -74,10 +74,8 @@ import 
org.apache.iotdb.confignode.procedure.impl.schema.DeleteLogicalViewProced
 import 
org.apache.iotdb.confignode.procedure.impl.schema.DeleteTimeSeriesProcedure;
 import org.apache.iotdb.confignode.procedure.impl.schema.SetTemplateProcedure;
 import 
org.apache.iotdb.confignode.procedure.impl.schema.UnsetTemplateProcedure;
-import 
org.apache.iotdb.confignode.procedure.impl.statemachine.AddRegionPeerProcedure;
 import 
org.apache.iotdb.confignode.procedure.impl.statemachine.CreateRegionGroupsProcedure;
 import 
org.apache.iotdb.confignode.procedure.impl.statemachine.RegionMigrateProcedure;
-import 
org.apache.iotdb.confignode.procedure.impl.statemachine.RemoveRegionPeerProcedure;
 import org.apache.iotdb.confignode.procedure.impl.sync.AuthOperationProcedure;
 import 
org.apache.iotdb.confignode.procedure.impl.trigger.CreateTriggerProcedure;
 import org.apache.iotdb.confignode.procedure.impl.trigger.DropTriggerProcedure;
@@ -96,7 +94,6 @@ import org.apache.iotdb.confignode.rpc.thrift.TDatabaseSchema;
 import org.apache.iotdb.confignode.rpc.thrift.TDeleteLogicalViewReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDeleteTimeSeriesReq;
 import org.apache.iotdb.confignode.rpc.thrift.TMigrateRegionReq;
-import org.apache.iotdb.confignode.rpc.thrift.TRegionMigrateResultReportReq;
 import org.apache.iotdb.db.exception.BatchProcessException;
 import org.apache.iotdb.db.schemaengine.template.Template;
 import org.apache.iotdb.rpc.RpcUtils;
@@ -641,7 +638,7 @@ public class ProcedureManager {
     }
 
     // select coordinator for adding peer
-    DataNodeRemoveHandler handler = new DataNodeRemoveHandler(configManager);
+    RegionMaintainHandler handler = new RegionMaintainHandler(configManager);
     Optional<TDataNodeLocation> selectedDataNode =
         handler.filterDataNodeWithOtherRegionReplica(regionGroupId, 
destDataNode);
     if (!selectedDataNode.isPresent()) {
@@ -1077,36 +1074,6 @@ public class ProcedureManager {
     this.env = env;
   }
 
-  public void reportRegionMigrateResult(TRegionMigrateResultReportReq req) {
-    // TODO: ugly, will fix soon
-    this.executor
-        .getProcedures()
-        .values()
-        .forEach(
-            procedure1 -> {
-              if (procedure1 instanceof AddRegionPeerProcedure) {
-                AddRegionPeerProcedure procedure = (AddRegionPeerProcedure) 
procedure1;
-                if (procedure.getConsensusGroupId().equals(req.getRegionId())) 
{
-                  procedure.notifyAddPeerFinished(req);
-                }
-              }
-            });
-
-    // TODO: ugly, will fix soon
-    this.executor
-        .getProcedures()
-        .values()
-        .forEach(
-            procedure1 -> {
-              if (procedure1 instanceof RemoveRegionPeerProcedure) {
-                RemoveRegionPeerProcedure procedure = 
(RemoveRegionPeerProcedure) procedure1;
-                if (procedure.getConsensusGroupId().equals(req.getRegionId())) 
{
-                  procedure.notifyRemovePeerFinished(req);
-                }
-              }
-            });
-  }
-
   public void addMetrics() {
     MetricService.getInstance().addMetricSet(this.procedureMetrics);
   }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
index d18f701d1fb..aa457e80a48 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
@@ -62,7 +62,7 @@ import 
org.apache.iotdb.confignode.manager.partition.PartitionMetrics;
 import org.apache.iotdb.confignode.manager.pipe.coordinator.PipeManager;
 import org.apache.iotdb.confignode.manager.schema.ClusterSchemaManager;
 import org.apache.iotdb.confignode.persistence.node.NodeInfo;
-import org.apache.iotdb.confignode.procedure.env.DataNodeRemoveHandler;
+import org.apache.iotdb.confignode.procedure.env.RegionMaintainHandler;
 import org.apache.iotdb.confignode.rpc.thrift.TCQConfig;
 import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeInfo;
 import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
@@ -344,8 +344,8 @@ public class NodeManager {
   public DataSet removeDataNode(RemoveDataNodePlan removeDataNodePlan) {
     LOGGER.info("NodeManager start to remove DataNode {}", removeDataNodePlan);
 
-    DataNodeRemoveHandler dataNodeRemoveHandler =
-        new DataNodeRemoveHandler((ConfigManager) configManager);
+    RegionMaintainHandler dataNodeRemoveHandler =
+        new RegionMaintainHandler((ConfigManager) configManager);
     DataNodeToStatusResp preCheckStatus =
         dataNodeRemoveHandler.checkRemoveDataNodeRequest(removeDataNodePlan);
     if (preCheckStatus.getStatus().getCode() != 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
index 518f3cc75c7..cf4cf20495a 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
@@ -104,14 +104,14 @@ public class ConfigNodeProcedureEnv {
 
   private final ProcedureScheduler scheduler;
 
-  private final DataNodeRemoveHandler dataNodeRemoveHandler;
+  private final RegionMaintainHandler dataNodeRemoveHandler;
 
   private final ReentrantLock removeConfigNodeLock;
 
   public ConfigNodeProcedureEnv(ConfigManager configManager, 
ProcedureScheduler scheduler) {
     this.configManager = configManager;
     this.scheduler = scheduler;
-    this.dataNodeRemoveHandler = new DataNodeRemoveHandler(configManager);
+    this.dataNodeRemoveHandler = new RegionMaintainHandler(configManager);
     this.removeConfigNodeLock = new ReentrantLock();
   }
 
@@ -735,7 +735,7 @@ public class ConfigNodeProcedureEnv {
     return schedulerLock;
   }
 
-  public DataNodeRemoveHandler getDataNodeRemoveHandler() {
+  public RegionMaintainHandler getDataNodeRemoveHandler() {
     return dataNodeRemoveHandler;
   }
 
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/RegionMaintainHandler.java
similarity index 93%
rename from 
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java
rename to 
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/RegionMaintainHandler.java
index 95ccf3de64e..acc95b66b40 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/RegionMaintainHandler.java
@@ -22,8 +22,14 @@ import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.common.rpc.thrift.TRegionMaintainTaskStatus;
+import org.apache.iotdb.common.rpc.thrift.TRegionMigrateResultReportReq;
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.client.ClientPoolFactory;
+import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
 import org.apache.iotdb.commons.cluster.NodeStatus;
 import org.apache.iotdb.commons.service.metric.MetricService;
 import org.apache.iotdb.commons.utils.NodeUrlUtils;
@@ -59,9 +65,9 @@ import static 
org.apache.iotdb.confignode.conf.ConfigNodeConstant.REMOVE_DATANOD
 import static org.apache.iotdb.consensus.ConsensusFactory.IOT_CONSENSUS;
 import static org.apache.iotdb.consensus.ConsensusFactory.SIMPLE_CONSENSUS;
 
-public class DataNodeRemoveHandler {
+public class RegionMaintainHandler {
 
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(DataNodeRemoveHandler.class);
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(RegionMaintainHandler.class);
 
   private static final ConfigNodeConfig CONF = 
ConfigNodeDescriptor.getInstance().getConf();
 
@@ -70,8 +76,14 @@ public class DataNodeRemoveHandler {
   /** region migrate lock */
   private final LockQueue regionMigrateLock = new LockQueue();
 
-  public DataNodeRemoveHandler(ConfigManager configManager) {
+  private final IClientManager<TEndPoint, SyncDataNodeInternalServiceClient> 
dataNodeClientManager;
+
+  public RegionMaintainHandler(ConfigManager configManager) {
     this.configManager = configManager;
+    dataNodeClientManager =
+        new IClientManager.Factory<TEndPoint, 
SyncDataNodeInternalServiceClient>()
+            .createClientManager(
+                new 
ClientPoolFactory.SyncDataNodeInternalServiceClientPoolFactory());
   }
 
   public static String getIdWithRpcEndpoint(TDataNodeLocation location) {
@@ -234,12 +246,15 @@ public class DataNodeRemoveHandler {
    * @return TSStatus
    */
   public TSStatus addRegionPeer(
-      TDataNodeLocation destDataNode, TConsensusGroupId regionId, 
TDataNodeLocation coordinator) {
+      long procedureId,
+      TDataNodeLocation destDataNode,
+      TConsensusGroupId regionId,
+      TDataNodeLocation coordinator) {
     TSStatus status;
 
     // Send addRegionPeer request to the selected DataNode,
     // destDataNode is where the new RegionReplica is created
-    TMaintainPeerReq maintainPeerReq = new TMaintainPeerReq(regionId, 
destDataNode);
+    TMaintainPeerReq maintainPeerReq = new TMaintainPeerReq(regionId, 
destDataNode, procedureId);
     status =
         SyncDataNodeClientPool.getInstance()
             .sendSyncRequestToDataNodeWithRetry(
@@ -268,11 +283,13 @@ public class DataNodeRemoveHandler {
   public TSStatus removeRegionPeer(
       TDataNodeLocation originalDataNode,
       TConsensusGroupId regionId,
-      TDataNodeLocation coordinator) {
+      TDataNodeLocation coordinator,
+      long procedureId) {
     TSStatus status;
 
     // Send removeRegionPeer request to the rpcClientDataNode
-    TMaintainPeerReq maintainPeerReq = new TMaintainPeerReq(regionId, 
originalDataNode);
+    TMaintainPeerReq maintainPeerReq =
+        new TMaintainPeerReq(regionId, originalDataNode, procedureId);
     status =
         SyncDataNodeClientPool.getInstance()
             .sendSyncRequestToDataNodeWithRetry(
@@ -298,10 +315,11 @@ public class DataNodeRemoveHandler {
    * @return TSStatus
    */
   public TSStatus deleteOldRegionPeer(
-      TDataNodeLocation originalDataNode, TConsensusGroupId regionId) {
+      TDataNodeLocation originalDataNode, TConsensusGroupId regionId, long 
procedureId) {
 
     TSStatus status;
-    TMaintainPeerReq maintainPeerReq = new TMaintainPeerReq(regionId, 
originalDataNode);
+    TMaintainPeerReq maintainPeerReq =
+        new TMaintainPeerReq(regionId, originalDataNode, procedureId);
 
     status =
         
configManager.getLoadManager().getNodeStatus(originalDataNode.getDataNodeId())
@@ -325,6 +343,21 @@ public class DataNodeRemoveHandler {
     return status;
   }
 
+  public TRegionMaintainTaskStatus waitTaskFinish(long taskId, 
TDataNodeLocation dataNodeLocation) {
+    while (true) {
+      try (SyncDataNodeInternalServiceClient dataNodeClient =
+          
dataNodeClientManager.borrowClient(dataNodeLocation.getInternalEndPoint())) {
+        TRegionMigrateResultReportReq report = 
dataNodeClient.getRegionMaintainResult(taskId);
+        if (report.getTaskStatus() != TRegionMaintainTaskStatus.PROCESSING) {
+          return report.getTaskStatus();
+        }
+        Thread.sleep(1000);
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    }
+  }
+
   public void addRegionLocation(TConsensusGroupId regionId, TDataNodeLocation 
newLocation) {
     LOGGER.info(
         "AddRegionLocation started, add region {} to {}",
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/RemoveDataNodeProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/RemoveDataNodeProcedure.java
index 103ab321fd8..ee4baf4cd25 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/RemoveDataNodeProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/RemoveDataNodeProcedure.java
@@ -25,7 +25,7 @@ import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.commons.exception.runtime.ThriftSerDeException;
 import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils;
 import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
-import org.apache.iotdb.confignode.procedure.env.DataNodeRemoveHandler;
+import org.apache.iotdb.confignode.procedure.env.RegionMaintainHandler;
 import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
 import 
org.apache.iotdb.confignode.procedure.impl.statemachine.RegionMigrateProcedure;
 import org.apache.iotdb.confignode.procedure.state.RemoveDataNodeState;
@@ -68,7 +68,7 @@ public class RemoveDataNodeProcedure extends 
AbstractNodeProcedure<RemoveDataNod
       return Flow.NO_MORE_STATE;
     }
 
-    DataNodeRemoveHandler handler = env.getDataNodeRemoveHandler();
+    RegionMaintainHandler handler = env.getDataNodeRemoveHandler();
     try {
       switch (state) {
         case REGION_REPLICA_CHECK:
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/AddRegionPeerProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/AddRegionPeerProcedure.java
index b5b9448f771..488b85f3522 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/AddRegionPeerProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/AddRegionPeerProcedure.java
@@ -21,18 +21,17 @@ package 
org.apache.iotdb.confignode.procedure.impl.statemachine;
 
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TRegionMaintainTaskStatus;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.exception.runtime.ThriftSerDeException;
 import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils;
 import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
-import org.apache.iotdb.confignode.procedure.env.DataNodeRemoveHandler;
+import org.apache.iotdb.confignode.procedure.env.RegionMaintainHandler;
 import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
 import 
org.apache.iotdb.confignode.procedure.exception.ProcedureSuspendedException;
 import org.apache.iotdb.confignode.procedure.exception.ProcedureYieldException;
 import org.apache.iotdb.confignode.procedure.state.AddRegionPeerState;
 import org.apache.iotdb.confignode.procedure.store.ProcedureType;
-import org.apache.iotdb.confignode.rpc.thrift.TRegionMigrateResultReportReq;
-import org.apache.iotdb.rpc.TSStatusCode;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -42,8 +41,6 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 
 import static org.apache.iotdb.commons.utils.FileUtils.logBreakpoint;
-import static 
org.apache.iotdb.confignode.conf.ConfigNodeConstant.ADD_REGION_PEER_PROGRESS;
-import static 
org.apache.iotdb.confignode.conf.ConfigNodeConstant.REGION_MIGRATE_PROCESS;
 import static 
org.apache.iotdb.confignode.procedure.state.AddRegionPeerState.UPDATE_REGION_LOCATION_CACHE;
 import static org.apache.iotdb.rpc.TSStatusCode.SUCCESS_STATUS;
 
@@ -56,11 +53,6 @@ public class AddRegionPeerProcedure
 
   private TDataNodeLocation destDataNode;
 
-  private boolean addRegionPeerSuccess = true;
-  private String addRegionPeerResult;
-
-  private final Object addRegionPeerLock = new Object();
-
   public AddRegionPeerProcedure() {
     super();
   }
@@ -81,7 +73,7 @@ public class AddRegionPeerProcedure
     if (consensusGroupId == null) {
       return Flow.NO_MORE_STATE;
     }
-    DataNodeRemoveHandler handler = env.getDataNodeRemoveHandler();
+    RegionMaintainHandler handler = env.getDataNodeRemoveHandler();
     try {
       switch (state) {
         case CREATE_NEW_REGION_PEER:
@@ -90,17 +82,23 @@ public class AddRegionPeerProcedure
           setNextState(AddRegionPeerState.DO_ADD_REGION_PEER);
           break;
         case DO_ADD_REGION_PEER:
-          TSStatus tsStatus = handler.addRegionPeer(destDataNode, 
consensusGroupId, coordinator);
+          TSStatus tsStatus =
+              handler.addRegionPeer(this.getProcId(), destDataNode, 
consensusGroupId, coordinator);
+          TRegionMaintainTaskStatus result;
+          logBreakpoint(state.name());
           if (tsStatus.getCode() == SUCCESS_STATUS.getStatusCode()) {
-            waitForOneMigrationStepFinished(consensusGroupId, state);
+            result = handler.waitTaskFinish(this.getProcId(), coordinator);
           } else {
             throw new ProcedureException("ADD_REGION_PEER executed failed in 
DataNode");
           }
-          logBreakpoint(state.name());
-          setNextState(UPDATE_REGION_LOCATION_CACHE);
-          break;
+          if (result == TRegionMaintainTaskStatus.SUCCESS) {
+            setNextState(UPDATE_REGION_LOCATION_CACHE);
+            break;
+          }
+          throw new ProcedureException("ADD_REGION_PEER executed failed in 
DataNode");
         case UPDATE_REGION_LOCATION_CACHE:
           handler.addRegionLocation(consensusGroupId, destDataNode);
+          logBreakpoint(state.name());
           return Flow.NO_MORE_STATE;
         default:
           throw new ProcedureException("Unsupported state: " + state.name());
@@ -112,61 +110,6 @@ public class AddRegionPeerProcedure
   }
 
   // TODO: Clear all remaining information related to 'migrate' and 'migration'
-  public TSStatus waitForOneMigrationStepFinished(
-      TConsensusGroupId consensusGroupId, AddRegionPeerState state) throws 
Exception {
-    LOGGER.info(
-        "{}, Wait for state {} finished, regionId: {}",
-        REGION_MIGRATE_PROCESS,
-        state,
-        consensusGroupId);
-
-    TSStatus status = new TSStatus(SUCCESS_STATUS.getStatusCode());
-    synchronized (addRegionPeerLock) {
-      try {
-        addRegionPeerLock.wait();
-
-        if (!addRegionPeerSuccess) {
-          throw new ProcedureException(
-              String.format("Region migration failed, regionId: %s", 
consensusGroupId));
-        }
-      } catch (InterruptedException e) {
-        LOGGER.error(
-            "{}, region migration {} interrupt", REGION_MIGRATE_PROCESS, 
consensusGroupId, e);
-        Thread.currentThread().interrupt();
-        status.setCode(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
-        status.setMessage("Waiting for region migration interruption," + 
e.getMessage());
-      }
-    }
-    return status;
-  }
-
-  public void notifyAddPeerFinished(TRegionMigrateResultReportReq req) {
-
-    LOGGER.info(
-        "{}, ConfigNode received region migration result reported by DataNode: 
{}",
-        ADD_REGION_PEER_PROGRESS,
-        req);
-
-    // TODO the req is used in roll back
-    synchronized (addRegionPeerLock) {
-      TSStatus migrateStatus = req.getMigrateResult();
-      // Migration failed
-      if (migrateStatus.getCode() != SUCCESS_STATUS.getStatusCode()) {
-        LOGGER.info(
-            "{}, Region migration failed in DataNode, migrateStatus: {}",
-            ADD_REGION_PEER_PROGRESS,
-            migrateStatus);
-        addRegionPeerSuccess = false;
-        addRegionPeerResult = migrateStatus.toString();
-      }
-      addRegionPeerLock.notifyAll();
-    }
-  }
-
-  @Override
-  protected boolean isRollbackSupported(AddRegionPeerState state) {
-    return false;
-  }
 
   @Override
   protected void rollbackState(
@@ -216,8 +159,4 @@ public class AddRegionPeerProcedure
   public TDataNodeLocation getCoordinator() {
     return coordinator;
   }
-
-  public TDataNodeLocation getDestDataNode() {
-    return destDataNode;
-  }
 }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/RegionMigrateProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/RegionMigrateProcedure.java
index 35a51008ea6..8b9fc5d6ac2 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/RegionMigrateProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/RegionMigrateProcedure.java
@@ -24,7 +24,7 @@ import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.commons.exception.runtime.ThriftSerDeException;
 import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils;
 import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
-import org.apache.iotdb.confignode.procedure.env.DataNodeRemoveHandler;
+import org.apache.iotdb.confignode.procedure.env.RegionMaintainHandler;
 import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
 import org.apache.iotdb.confignode.procedure.state.ProcedureLockState;
 import org.apache.iotdb.confignode.procedure.state.RegionTransitionState;
@@ -40,7 +40,7 @@ import java.util.Objects;
 
 import static org.apache.iotdb.commons.utils.FileUtils.logBreakpoint;
 import static 
org.apache.iotdb.confignode.conf.ConfigNodeConstant.REGION_MIGRATE_PROCESS;
-import static 
org.apache.iotdb.confignode.procedure.env.DataNodeRemoveHandler.getIdWithRpcEndpoint;
+import static 
org.apache.iotdb.confignode.procedure.env.RegionMaintainHandler.getIdWithRpcEndpoint;
 
 /** Region migrate procedure */
 public class RegionMigrateProcedure
@@ -52,8 +52,6 @@ public class RegionMigrateProcedure
   private static final int RETRY_THRESHOLD = 5;
 
   /** Wait region migrate finished */
-  private final Object regionMigrateLock = new Object();
-
   private TConsensusGroupId consensusGroupId;
 
   private TDataNodeLocation originalDataNode;
@@ -63,8 +61,6 @@ public class RegionMigrateProcedure
   private TDataNodeLocation coordinatorForAddPeer;
   private TDataNodeLocation coordinatorForRemovePeer;
 
-  private boolean migrateSuccess = true;
-
   private String migrateResult = "";
 
   public RegionMigrateProcedure() {
@@ -90,7 +86,7 @@ public class RegionMigrateProcedure
     if (consensusGroupId == null) {
       return Flow.NO_MORE_STATE;
     }
-    DataNodeRemoveHandler handler = env.getDataNodeRemoveHandler();
+    RegionMaintainHandler handler = env.getDataNodeRemoveHandler();
     try {
       switch (state) {
         case REGION_MIGRATE_PREPARE:
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/RemoveRegionPeerProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/RemoveRegionPeerProcedure.java
index f656653a113..db58f456201 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/RemoveRegionPeerProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/RemoveRegionPeerProcedure.java
@@ -21,18 +21,17 @@ package 
org.apache.iotdb.confignode.procedure.impl.statemachine;
 
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TRegionMaintainTaskStatus;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.exception.runtime.ThriftSerDeException;
 import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils;
 import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
-import org.apache.iotdb.confignode.procedure.env.DataNodeRemoveHandler;
+import org.apache.iotdb.confignode.procedure.env.RegionMaintainHandler;
 import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
 import 
org.apache.iotdb.confignode.procedure.exception.ProcedureSuspendedException;
 import org.apache.iotdb.confignode.procedure.exception.ProcedureYieldException;
 import org.apache.iotdb.confignode.procedure.state.RemoveRegionPeerState;
 import org.apache.iotdb.confignode.procedure.store.ProcedureType;
-import org.apache.iotdb.confignode.rpc.thrift.TRegionMigrateResultReportReq;
-import org.apache.iotdb.rpc.TSStatusCode;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -42,7 +41,6 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 
 import static org.apache.iotdb.commons.utils.FileUtils.logBreakpoint;
-import static 
org.apache.iotdb.confignode.conf.ConfigNodeConstant.REGION_MIGRATE_PROCESS;
 import static 
org.apache.iotdb.confignode.procedure.state.RemoveRegionPeerState.DELETE_OLD_REGION_PEER;
 import static 
org.apache.iotdb.confignode.procedure.state.RemoveRegionPeerState.REMOVE_REGION_LOCATION_CACHE;
 import static org.apache.iotdb.rpc.TSStatusCode.SUCCESS_STATUS;
@@ -54,10 +52,6 @@ public class RemoveRegionPeerProcedure
   private TDataNodeLocation coordinator;
   private TDataNodeLocation targetDataNode;
 
-  private boolean removeRegionPeerSuccess = true;
-  private String removeRegionPeerResult;
-  private final Object removeRegionPeerLock = new Object();
-
   public RemoveRegionPeerProcedure() {
     super();
   }
@@ -79,27 +73,37 @@ public class RemoveRegionPeerProcedure
       return Flow.NO_MORE_STATE;
     }
     TSStatus tsStatus;
-    DataNodeRemoveHandler handler = env.getDataNodeRemoveHandler();
+    RegionMaintainHandler handler = env.getDataNodeRemoveHandler();
     try {
       switch (state) {
         case REMOVE_REGION_PEER:
-          tsStatus = handler.removeRegionPeer(targetDataNode, 
consensusGroupId, coordinator);
+          tsStatus =
+              handler.removeRegionPeer(
+                  targetDataNode, consensusGroupId, coordinator, 
this.getProcId());
+          TRegionMaintainTaskStatus result;
+          logBreakpoint(state.name());
           if (tsStatus.getCode() == SUCCESS_STATUS.getStatusCode()) {
-            waitForOneMigrationStepFinished(consensusGroupId, state);
+            result = handler.waitTaskFinish(this.getProcId(), coordinator);
           } else {
             throw new ProcedureException("REMOVE_REGION_PEER executed failed 
in DataNode");
           }
-          logBreakpoint(state.name());
+          if (result != TRegionMaintainTaskStatus.SUCCESS) {
+            throw new ProcedureException("REMOVE_REGION_PEER executed failed 
in DataNode");
+          }
           setNextState(DELETE_OLD_REGION_PEER);
           break;
         case DELETE_OLD_REGION_PEER:
-          tsStatus = handler.deleteOldRegionPeer(targetDataNode, 
consensusGroupId);
+          tsStatus =
+              handler.deleteOldRegionPeer(targetDataNode, consensusGroupId, 
this.getProcId());
+          logBreakpoint(state.name());
           if (tsStatus.getCode() == SUCCESS_STATUS.getStatusCode()) {
-            waitForOneMigrationStepFinished(consensusGroupId, state);
+            result = handler.waitTaskFinish(this.getProcId(), targetDataNode);
+          } else {
+            throw new ProcedureException("DELETE_OLD_REGION_PEER executed 
failed in DataNode");
+          }
+          if (result != TRegionMaintainTaskStatus.SUCCESS) {
+            throw new ProcedureException("DELETE_OLD_REGION_PEER executed 
failed in DataNode");
           }
-          logBreakpoint(state.name());
-          // Remove consensus group after a node stop, which will be failed, 
but we will
-          // continuously execute.
           setNextState(REMOVE_REGION_LOCATION_CACHE);
           break;
         case REMOVE_REGION_LOCATION_CACHE:
@@ -115,67 +119,10 @@ public class RemoveRegionPeerProcedure
     return Flow.HAS_MORE_STATE;
   }
 
-  public TSStatus waitForOneMigrationStepFinished(
-      TConsensusGroupId consensusGroupId, RemoveRegionPeerState state) throws 
Exception {
-
-    LOGGER.info(
-        "{}, Wait for state {} finished, regionId: {}",
-        REGION_MIGRATE_PROCESS,
-        state,
-        consensusGroupId);
-
-    TSStatus status = new TSStatus(SUCCESS_STATUS.getStatusCode());
-    synchronized (removeRegionPeerLock) {
-      try {
-        // TODO set timeOut?
-        removeRegionPeerLock.wait();
-
-        if (!removeRegionPeerSuccess) {
-          throw new ProcedureException(
-              String.format("Region migration failed, regionId: %s", 
consensusGroupId));
-        }
-      } catch (InterruptedException e) {
-        LOGGER.error(
-            "{}, region migration {} interrupt", REGION_MIGRATE_PROCESS, 
consensusGroupId, e);
-        Thread.currentThread().interrupt();
-        status.setCode(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
-        status.setMessage("Waiting for region migration interruption," + 
e.getMessage());
-      }
-    }
-    return status;
-  }
-
-  public void notifyRemovePeerFinished(TRegionMigrateResultReportReq req) {
-    LOGGER.info(
-        "{}, ConfigNode received region migration result reported by DataNode: 
{}",
-        REGION_MIGRATE_PROCESS,
-        req);
-
-    // TODO the req is used in roll back
-    synchronized (removeRegionPeerLock) {
-      TSStatus migrateStatus = req.getMigrateResult();
-      // Migration failed
-      if (migrateStatus.getCode() != SUCCESS_STATUS.getStatusCode()) {
-        LOGGER.info(
-            "{}, Region migration failed in DataNode, migrateStatus: {}",
-            REGION_MIGRATE_PROCESS,
-            migrateStatus);
-        removeRegionPeerSuccess = false;
-        removeRegionPeerResult = migrateStatus.toString();
-      }
-      removeRegionPeerLock.notifyAll();
-    }
-  }
-
   @Override
   protected void rollbackState(ConfigNodeProcedureEnv env, 
RemoveRegionPeerState state)
       throws IOException, InterruptedException, ProcedureException {}
 
-  @Override
-  protected boolean isRollbackSupported(RemoveRegionPeerState state) {
-    return false;
-  }
-
   @Override
   protected RemoveRegionPeerState getState(int stateId) {
     return RemoveRegionPeerState.values()[stateId];
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
index 95c82a1c001..5b77ad2f046 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
@@ -129,7 +129,6 @@ import 
org.apache.iotdb.confignode.rpc.thrift.TGetUDFTableResp;
 import org.apache.iotdb.confignode.rpc.thrift.TLoginReq;
 import org.apache.iotdb.confignode.rpc.thrift.TMigrateRegionReq;
 import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp;
-import org.apache.iotdb.confignode.rpc.thrift.TRegionMigrateResultReportReq;
 import org.apache.iotdb.confignode.rpc.thrift.TRegionRouteMapResp;
 import org.apache.iotdb.confignode.rpc.thrift.TSchemaNodeManagementReq;
 import org.apache.iotdb.confignode.rpc.thrift.TSchemaNodeManagementResp;
@@ -270,11 +269,6 @@ public class ConfigNodeRPCServiceProcessor implements 
IConfigNodeRPCService.Ifac
     return resp;
   }
 
-  @Override
-  public TSStatus reportRegionMigrateResult(TRegionMigrateResultReportReq req) 
{
-    return configManager.reportRegionMigrateResult(req);
-  }
-
   @Override
   public TShowClusterResp showCluster() {
     return configManager.showCluster();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java
index 1387e5f338f..089047a704e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java
@@ -98,7 +98,6 @@ import 
org.apache.iotdb.confignode.rpc.thrift.TGetUDFTableResp;
 import org.apache.iotdb.confignode.rpc.thrift.TLoginReq;
 import org.apache.iotdb.confignode.rpc.thrift.TMigrateRegionReq;
 import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp;
-import org.apache.iotdb.confignode.rpc.thrift.TRegionMigrateResultReportReq;
 import org.apache.iotdb.confignode.rpc.thrift.TRegionRouteMapResp;
 import org.apache.iotdb.confignode.rpc.thrift.TSchemaNodeManagementReq;
 import org.apache.iotdb.confignode.rpc.thrift.TSchemaNodeManagementResp;
@@ -417,12 +416,6 @@ public class ConfigNodeClient implements 
IConfigNodeRPCService.Iface, ThriftClie
         resp -> !updateConfigNodeLeader(resp.status));
   }
 
-  @Override
-  public TSStatus reportRegionMigrateResult(TRegionMigrateResultReportReq req) 
throws TException {
-    return executeRemoteCallWithRetry(
-        () -> client.reportRegionMigrateResult(req), status -> 
!updateConfigNodeLeader(status));
-  }
-
   @Override
   public TShowClusterResp showCluster() throws TException {
     return executeRemoteCallWithRetry(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
index 5a62e1ddd6a..c4588d5bac3 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TFlushReq;
+import org.apache.iotdb.common.rpc.thrift.TRegionMigrateResultReportReq;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.common.rpc.thrift.TSetSpaceQuotaReq;
 import org.apache.iotdb.common.rpc.thrift.TSetTTLReq;
@@ -1618,6 +1619,11 @@ public class DataNodeInternalRPCServiceImpl implements 
IDataNodeRPCService.Iface
     return status;
   }
 
+  @Override
+  public TRegionMigrateResultReportReq getRegionMaintainResult(long taskId) 
throws TException {
+    return RegionMigrateService.getRegionMaintainResult(taskId);
+  }
+
   private TSStatus createNewRegion(ConsensusGroupId regionId, String 
storageGroup, long ttl) {
     return regionManager.createNewRegion(regionId, storageGroup, ttl);
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java
index 092394c9bf2..a39dd72d34f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java
@@ -21,9 +21,10 @@ package org.apache.iotdb.db.service;
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.common.rpc.thrift.TRegionMaintainTaskStatus;
 import org.apache.iotdb.common.rpc.thrift.TRegionMigrateFailedType;
+import org.apache.iotdb.common.rpc.thrift.TRegionMigrateResultReportReq;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.commons.client.exception.ClientManagerException;
 import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.commons.concurrent.ThreadName;
 import org.apache.iotdb.commons.consensus.ConsensusGroupId;
@@ -32,28 +33,24 @@ import org.apache.iotdb.commons.consensus.SchemaRegionId;
 import org.apache.iotdb.commons.exception.StartupException;
 import org.apache.iotdb.commons.service.IService;
 import org.apache.iotdb.commons.service.ServiceType;
-import org.apache.iotdb.confignode.rpc.thrift.TRegionMigrateResultReportReq;
 import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.consensus.exception.ConsensusException;
 import 
org.apache.iotdb.consensus.exception.PeerAlreadyInConsensusGroupException;
 import org.apache.iotdb.consensus.exception.PeerNotInConsensusGroupException;
 import org.apache.iotdb.db.consensus.DataRegionConsensusImpl;
 import org.apache.iotdb.db.consensus.SchemaRegionConsensusImpl;
-import org.apache.iotdb.db.protocol.client.ConfigNodeClient;
-import org.apache.iotdb.db.protocol.client.ConfigNodeClientManager;
-import org.apache.iotdb.db.protocol.client.ConfigNodeInfo;
 import org.apache.iotdb.db.schemaengine.SchemaEngine;
 import org.apache.iotdb.db.storageengine.StorageEngine;
 import org.apache.iotdb.db.storageengine.rescon.memory.AbstractPoolManager;
 import org.apache.iotdb.mpp.rpc.thrift.TMaintainPeerReq;
 import org.apache.iotdb.rpc.TSStatusCode;
 
-import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
 public class RegionMigrateService implements IService {
 
@@ -67,6 +64,13 @@ public class RegionMigrateService implements IService {
 
   private RegionMigratePool regionMigratePool;
 
+  // Map<taskId, taskStatus>
+  // TODO:暂时无法处理一个procedure中向同一个datanode提交多个异步任务的情况
+  private static final ConcurrentHashMap<Long, TRegionMigrateResultReportReq> 
taskResultMap =
+      new ConcurrentHashMap<>();
+  private static final TRegionMigrateResultReportReq unfinishedResult =
+      new TRegionMigrateResultReportReq();
+
   private RegionMigrateService() {}
 
   public static RegionMigrateService getInstance() {
@@ -80,10 +84,13 @@ public class RegionMigrateService implements IService {
    * @return if the submit task succeed
    */
   public synchronized boolean submitAddRegionPeerTask(TMaintainPeerReq req) {
-
     boolean submitSucceed = true;
     try {
-      regionMigratePool.submit(new AddRegionPeerTask(req.getRegionId(), 
req.getDestNode()));
+      if (!addToTaskResultMap(req.getTaskId())) {
+        LOGGER.warn("{} The AddRegionPeerTask {} has already been submitted 
and will not be submitted again.", REGION_MIGRATE_PROCESS, req.getTaskId());
+      }
+      regionMigratePool.submit(
+          new AddRegionPeerTask(req.getTaskId(), req.getRegionId(), 
req.getDestNode()));
     } catch (Exception e) {
       LOGGER.error(
           "{}, Submit AddRegionPeerTask error for Region: {}",
@@ -105,7 +112,11 @@ public class RegionMigrateService implements IService {
 
     boolean submitSucceed = true;
     try {
-      regionMigratePool.submit(new RemoveRegionPeerTask(req.getRegionId(), 
req.getDestNode()));
+      if (!addToTaskResultMap(req.getTaskId())) {
+        LOGGER.warn("{} The RemoveRegionPeer {} has already been submitted and 
will not be submitted again.", REGION_MIGRATE_PROCESS, req.getTaskId());
+      }
+      regionMigratePool.submit(
+          new RemoveRegionPeerTask(req.getTaskId(), req.getRegionId(), 
req.getDestNode()));
     } catch (Exception e) {
       LOGGER.error(
           "{}, Submit RemoveRegionPeer task error for Region: {}",
@@ -124,10 +135,13 @@ public class RegionMigrateService implements IService {
    * @return if the submit task succeed
    */
   public synchronized boolean submitDeleteOldRegionPeerTask(TMaintainPeerReq 
req) {
-
     boolean submitSucceed = true;
     try {
-      regionMigratePool.submit(new DeleteOldRegionPeerTask(req.getRegionId(), 
req.getDestNode()));
+      if (!addToTaskResultMap(req.getTaskId())) {
+        LOGGER.warn("{} The DeleteOldRegionPeerTask {} has already been 
submitted and will not be submitted again.", REGION_MIGRATE_PROCESS, 
req.getTaskId());
+      }
+      regionMigratePool.submit(
+          new DeleteOldRegionPeerTask(req.getTaskId(), req.getRegionId(), 
req.getDestNode()));
     } catch (Exception e) {
       LOGGER.error(
           "{}, Submit DeleteOldRegionPeerTask error for Region: {}",
@@ -139,6 +153,14 @@ public class RegionMigrateService implements IService {
     return submitSucceed;
   }
 
+  private boolean addToTaskResultMap(long taskId) {
+    if (taskResultMap.containsKey(taskId)) {
+      return false;
+    }
+    taskResultMap.put(taskId, unfinishedResult);
+    return true;
+  }
+
   @Override
   public void start() throws StartupException {
     regionMigratePool = new RegionMigratePool();
@@ -190,13 +212,17 @@ public class RegionMigrateService implements IService {
 
     private static final Logger taskLogger = 
LoggerFactory.getLogger(AddRegionPeerTask.class);
 
+    private final long taskId;
+
     // The RegionGroup that shall perform the add peer process
     private final TConsensusGroupId tRegionId;
 
     // The new DataNode to be added in the RegionGroup
     private final TDataNodeLocation destDataNode;
 
-    public AddRegionPeerTask(TConsensusGroupId tRegionId, TDataNodeLocation 
destDataNode) {
+    public AddRegionPeerTask(
+        long taskId, TConsensusGroupId tRegionId, TDataNodeLocation 
destDataNode) {
+      this.taskId = taskId;
       this.tRegionId = tRegionId;
       this.destDataNode = destDataNode;
     }
@@ -205,11 +231,12 @@ public class RegionMigrateService implements IService {
     public void run() {
       TSStatus runResult = addPeer();
       if (isFailed(runResult)) {
-        reportFailed(tRegionId, destDataNode, 
TRegionMigrateFailedType.AddPeerFailed, runResult);
+        taskFail(
+            taskId, tRegionId, destDataNode, 
TRegionMigrateFailedType.AddPeerFailed, runResult);
         return;
       }
 
-      reportSucceed(tRegionId, "AddPeer");
+      taskSucceed(taskId, tRegionId, "AddPeer");
     }
 
     private TSStatus addPeer() {
@@ -282,11 +309,15 @@ public class RegionMigrateService implements IService {
 
     private static final Logger taskLogger = 
LoggerFactory.getLogger(RemoveRegionPeerTask.class);
 
+    private final long taskId;
+
     private final TConsensusGroupId tRegionId;
 
     private final TDataNodeLocation destDataNode;
 
-    public RemoveRegionPeerTask(TConsensusGroupId tRegionId, TDataNodeLocation 
destDataNode) {
+    public RemoveRegionPeerTask(
+        long taskId, TConsensusGroupId tRegionId, TDataNodeLocation 
destDataNode) {
+      this.taskId = taskId;
       this.tRegionId = tRegionId;
       this.destDataNode = destDataNode;
     }
@@ -295,9 +326,10 @@ public class RegionMigrateService implements IService {
     public void run() {
       TSStatus runResult = removePeer();
       if (isSucceed(runResult)) {
-        reportSucceed(tRegionId, "RemovePeer");
+        taskSucceed(taskId, tRegionId, "RemovePeer");
       } else {
-        reportFailed(tRegionId, destDataNode, 
TRegionMigrateFailedType.RemovePeerFailed, runResult);
+        taskFail(
+            taskId, tRegionId, destDataNode, 
TRegionMigrateFailedType.RemovePeerFailed, runResult);
       }
     }
 
@@ -375,13 +407,15 @@ public class RegionMigrateService implements IService {
   private static class DeleteOldRegionPeerTask implements Runnable {
 
     private static final Logger taskLogger = 
LoggerFactory.getLogger(DeleteOldRegionPeerTask.class);
+    private final long taskId;
 
     private final TConsensusGroupId tRegionId;
 
     private final TDataNodeLocation originalDataNode;
 
     public DeleteOldRegionPeerTask(
-        TConsensusGroupId tRegionId, TDataNodeLocation originalDataNode) {
+        long taskId, TConsensusGroupId tRegionId, TDataNodeLocation 
originalDataNode) {
+      this.taskId = taskId;
       this.tRegionId = tRegionId;
       this.originalDataNode = originalDataNode;
     }
@@ -391,7 +425,8 @@ public class RegionMigrateService implements IService {
       // deletePeer: remove the peer from the consensus group
       TSStatus runResult = deletePeer();
       if (isFailed(runResult)) {
-        reportFailed(
+        taskFail(
+            taskId,
             tRegionId,
             originalDataNode,
             TRegionMigrateFailedType.RemoveConsensusGroupFailed,
@@ -400,12 +435,17 @@ public class RegionMigrateService implements IService {
 
       // deleteRegion: delete region data
       runResult = deleteRegion();
+
       if (isFailed(runResult)) {
-        reportFailed(
-            tRegionId, originalDataNode, 
TRegionMigrateFailedType.DeleteRegionFailed, runResult);
+        taskFail(
+            taskId,
+            tRegionId,
+            originalDataNode,
+            TRegionMigrateFailedType.DeleteRegionFailed,
+            runResult);
       }
 
-      reportSucceed(tRegionId, "DeletePeer");
+      taskSucceed(taskId, tRegionId, "DeletePeer");
     }
 
     private TSStatus deletePeer() {
@@ -476,57 +516,42 @@ public class RegionMigrateService implements IService {
     private Holder() {}
   }
 
-  private static void reportSucceed(TConsensusGroupId tRegionId, String 
migrateState) {
+  private static void taskSucceed(long taskId, TConsensusGroupId tRegionId, 
String migrateState) {
     TSStatus status = new 
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
     status.setMessage(
         String.format("Region: %s, state: %s, executed succeed", tRegionId, 
migrateState));
-    TRegionMigrateResultReportReq req = new 
TRegionMigrateResultReportReq(tRegionId, status);
-    try {
-      reportRegionMigrateResultToConfigNode(req);
-    } catch (Exception e) {
-      LOGGER.error(
-          "{}, Report region {} migrate result error in reportSucceed, result: 
{}",
-          REGION_MIGRATE_PROCESS,
-          tRegionId,
-          req,
-          e);
-    }
+    TRegionMigrateResultReportReq req =
+        new TRegionMigrateResultReportReq(TRegionMaintainTaskStatus.SUCCESS);
+    req.setRegionId(tRegionId).setMigrateResult(status);
+    taskResultMap.put(taskId, req);
   }
 
-  private static void reportFailed(
+  private static void taskFail(
+      long taskId,
       TConsensusGroupId tRegionId,
       TDataNodeLocation failedNode,
       TRegionMigrateFailedType failedType,
       TSStatus status) {
     Map<TDataNodeLocation, TRegionMigrateFailedType> failedNodeAndReason = new 
HashMap<>();
     failedNodeAndReason.put(failedNode, failedType);
-    TRegionMigrateResultReportReq req = new 
TRegionMigrateResultReportReq(tRegionId, status);
+    TRegionMigrateResultReportReq req =
+        new TRegionMigrateResultReportReq(TRegionMaintainTaskStatus.FAIL);
+    req.setRegionId(tRegionId).setMigrateResult(status);
     req.setFailedNodeAndReason(failedNodeAndReason);
-    try {
-      reportRegionMigrateResultToConfigNode(req);
-    } catch (Exception e) {
-      LOGGER.error(
-          "{}, Report region {} migrate error in reportFailed, result:{}",
-          REGION_MIGRATE_PROCESS,
-          tRegionId,
-          req,
-          e);
-    }
+    taskResultMap.put(taskId, req);
   }
 
-  private static void 
reportRegionMigrateResultToConfigNode(TRegionMigrateResultReportReq req)
-      throws TException, ClientManagerException {
-    TSStatus status;
-    try (ConfigNodeClient client =
-        
ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID))
 {
-      status = client.reportRegionMigrateResult(req);
-      LOGGER.info(
-          "{}, Report region {} migrate result {} to Config node succeed, 
result: {}",
-          REGION_MIGRATE_PROCESS,
-          req.getRegionId(),
-          req,
-          status);
+  // TODO: 单例模式下static与非static的区别?
+  public static TRegionMigrateResultReportReq getRegionMaintainResult(Long 
taskId) {
+    TRegionMigrateResultReportReq result = new TRegionMigrateResultReportReq();
+    if (!taskResultMap.containsKey(taskId)) {
+      result.setTaskStatus(TRegionMaintainTaskStatus.TASK_NOT_EXIST);
+    } else if (taskResultMap.get(taskId) == unfinishedResult) {
+      result.setTaskStatus(TRegionMaintainTaskStatus.PROCESSING);
+    } else {
+      result = taskResultMap.get(taskId);
     }
+    return result;
   }
 
   private static boolean isSucceed(TSStatus status) {
diff --git a/iotdb-protocol/thrift-commons/src/main/thrift/common.thrift 
b/iotdb-protocol/thrift-commons/src/main/thrift/common.thrift
index ba5a2da6b09..636b05888f4 100644
--- a/iotdb-protocol/thrift-commons/src/main/thrift/common.thrift
+++ b/iotdb-protocol/thrift-commons/src/main/thrift/common.thrift
@@ -90,6 +90,7 @@ struct TDataNodeConfiguration {
   2: required TNodeResource resource
 }
 
+// TODO: deprecated
 enum TRegionMigrateFailedType {
   AddPeerFailed,
   RemovePeerFailed,
@@ -98,6 +99,20 @@ enum TRegionMigrateFailedType {
   CreateRegionFailed
 }
 
+struct TRegionMigrateResultReportReq {
+  1: optional TConsensusGroupId regionId
+  2: optional TSStatus migrateResult
+  3: optional map<TDataNodeLocation, TRegionMigrateFailedType> 
failedNodeAndReason
+  4: required TRegionMaintainTaskStatus taskStatus
+}
+
+enum TRegionMaintainTaskStatus {
+  TASK_NOT_EXIST,
+  PROCESSING,
+  SUCCESS,
+  FAIL,
+}
+
 struct TFlushReq {
    1: optional string isSeq
    2: optional list<string> storageGroups
diff --git a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift 
b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
index 989255c0cc8..886e2bbe7d8 100644
--- a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
+++ b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
@@ -140,12 +140,6 @@ struct TDataNodeRemoveResp {
   2: optional map<common.TDataNodeLocation, common.TSStatus> nodeToStatus
 }
 
-struct TRegionMigrateResultReportReq {
-  1: required common.TConsensusGroupId regionId
-  2: required common.TSStatus migrateResult
-  3: optional map<common.TDataNodeLocation, common.TRegionMigrateFailedType> 
failedNodeAndReason
-}
-
 struct TDataNodeConfigurationResp {
   1: required common.TSStatus status
   // map<DataNodeId, DataNodeConfiguration>
@@ -880,9 +874,6 @@ service IConfigNodeRPCService {
    */
   TDataNodeConfigurationResp getDataNodeConfiguration(i32 dataNodeId)
 
-  /** Report region migration complete */
-  common.TSStatus reportRegionMigrateResult(TRegionMigrateResultReportReq req)
-
   // ======================================================
   // Database
   // ======================================================
diff --git a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift 
b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
index 8afc6bbe3e7..7042f67c3a2 100644
--- a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
+++ b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
@@ -51,6 +51,7 @@ struct TCreatePeerReq {
 struct TMaintainPeerReq {
   1: required common.TConsensusGroupId regionId
   2: required common.TDataNodeLocation destNode
+  3: required i64 taskId
 }
 
 struct TFragmentInstanceId {
@@ -557,6 +558,11 @@ service IDataNodeRPCService {
    */
   common.TSStatus deleteOldRegionPeer(TMaintainPeerReq req);
 
+  /**
+   * Get the result of a region maintainance task
+   */
+  common.TRegionMigrateResultReportReq getRegionMaintainResult(i64 taskId)
+
   /**
   * Config node will disable the Data node, the Data node will not accept 
read/write request when disabled
   * @param data node location

Reply via email to