This is an automated email from the ASF dual-hosted git repository.

CRZbulabula pushed a commit to branch improve-region-maintain-async
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit f8ae66f7b722393b568f1c905260145aed36b041
Author: Yongzao <[email protected]>
AuthorDate: Thu Jun 25 18:18:40 2026 +0800

    Use procedures for region create/delete to fix the delete-region ghost task
    
    The ConfigNode-leader periodically drained a persistent RegionMaintainer
    queue (PartitionManager.maintainRegionReplicas) by sending synchronous
    deleteRegion RPCs to DataNodes. A slow data-region deletion could outlive
    the RPC timeout; the ConfigNode retried 10s later, and the DataNode's
    second call returned SUCCESS (the region was already gone and
    ConsensusGroupNotExistException was swallowed). The ConfigNode then polled
    the task off the queue and believed it was done, while the first call was
    still running on the DataNode - a "ghost task".
    
    Replace the whole RegionMaintainer queue with Procedures that mirror the
    existing AddRegionPeerProcedure/RemoveRegionPeerProcedure pattern:
    
    - DataNode: new deleteRegionAsync(TMaintainPeerReq) RPC plus a
      DeleteRegionTask in RegionMigrateService. The deletion runs in the
      background and records its terminal state in taskResultMap, so the
      ConfigNode polls getRegionMaintainResult for progress
      (PROCESSING/SUCCESS/FAIL/TASK_NOT_EXIST) instead of trusting a single
      RPC response. The task returns immediately after taskFail and never
      falls through to taskSucceed, so a failed deletion can never be reported
      as success.
    
    - ConfigNode: new DeleteRegionProcedure (async submit + waitTaskFinish
      poll, taskId = procId, idempotent on retry/leader-change) and
      CreateRegionProcedure (the create RPC already returns a final status, so
      it stays synchronous and the procedure only adds persistence and retry).
      DeleteDatabaseProcedure and CreateRegionGroupsProcedure now spawn these
      as child procedures instead of offering to the queue, and every
      completed task is logged.
    
    - Upgrade safety: the OfferRegionMaintainTasks/PollRegionMaintainTask/
      PollSpecificRegionMaintainTask plan types and classes are kept
      (@Deprecated, still deserialized and applied as no-ops) so an old
      consensus log still replays. The PartitionInfo snapshot has no version
      header, so the trailing RegionMaintainer block is still consumed on load
      (and the tasks discarded with a WARN) and written as empty on save,
      keeping the byte layout unchanged.
    
    Add DeleteRegionProcedureTest/CreateRegionProcedureTest and a
    PartitionInfo test that loads a legacy snapshot containing pending tasks.
---
 .../iotdb/confignode/i18n/ProcedureMessages.java   |  22 ++
 .../iotdb/confignode/i18n/ProcedureMessages.java   |  18 ++
 .../client/sync/CnToDnSyncRequestType.java         |   1 +
 .../client/sync/SyncDataNodeClientPool.java        |   3 +
 .../consensus/request/ConfigPhysicalPlanType.java  |   6 +
 .../statemachine/ConfigRegionStateMachine.java     |   3 -
 .../iotdb/confignode/manager/ConfigManager.java    |   3 -
 .../iotdb/confignode/manager/ProcedureManager.java |   6 +-
 .../manager/partition/PartitionManager.java        | 281 ---------------------
 .../persistence/partition/PartitionInfo.java       | 106 ++++----
 .../procedure/env/RegionMaintainHandler.java       |  79 ++++++
 .../impl/region/CreateRegionGroupsProcedure.java   |  41 ++-
 .../impl/region/CreateRegionProcedure.java         | 201 +++++++++++++++
 .../impl/region/DeleteRegionProcedure.java         | 229 +++++++++++++++++
 .../impl/schema/DeleteDatabaseProcedure.java       | 100 ++------
 ...teDatabaseState.java => CreateRegionState.java} |   8 +-
 ...teDatabaseState.java => DeleteRegionState.java} |   8 +-
 .../state/schema/DeleteDatabaseState.java          |   5 +-
 .../procedure/store/ProcedureFactory.java          |  12 +
 .../confignode/procedure/store/ProcedureType.java  |   2 +
 .../confignode/persistence/PartitionInfoTest.java  |  43 ++++
 .../impl/region/CreateRegionProcedureTest.java     |  64 +++++
 .../impl/region/DeleteRegionProcedureTest.java     |  58 +++++
 .../apache/iotdb/db/i18n/DataNodeMiscMessages.java |   2 +
 .../apache/iotdb/db/i18n/DataNodeMiscMessages.java |   2 +
 .../impl/DataNodeInternalRPCServiceImpl.java       |  22 ++
 .../iotdb/db/service/RegionMigrateService.java     | 162 ++++++++++++
 .../src/main/thrift/datanode.thrift                |  14 +
 28 files changed, 1039 insertions(+), 462 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/i18n/en/org/apache/iotdb/confignode/i18n/ProcedureMessages.java
 
b/iotdb-core/confignode/src/main/i18n/en/org/apache/iotdb/confignode/i18n/ProcedureMessages.java
index f4ae087b159..a8d76910f5f 100644
--- 
a/iotdb-core/confignode/src/main/i18n/en/org/apache/iotdb/confignode/i18n/ProcedureMessages.java
+++ 
b/iotdb-core/confignode/src/main/i18n/en/org/apache/iotdb/confignode/i18n/ProcedureMessages.java
@@ -616,6 +616,28 @@ public final class ProcedureMessages {
   public static final String PID_ADDREGION_STATE_FAILED = "[pid{}][AddRegion] 
state {} failed";
   public static final String 
PID_ADDREGION_SUCCESS_HAS_BEEN_ADDED_TO_DATANODE_PROCEDURE_TOOK =
       "[pid{}][AddRegion] success, {} has been added to DataNode {}. Procedure 
took {} (start at {}).";
+  public static final String 
PID_CREATEREGION_STARTED_WILL_BE_CREATED_ON_DATANODE =
+      "[pid{}][CreateRegion] started, region {} will be created on DataNode 
{}.";
+  public static final String PID_CREATEREGION_STATE_COMPLETE =
+      "[pid{}][CreateRegion] state {} complete";
+  public static final String PID_CREATEREGION_STATE_FAILED =
+      "[pid{}][CreateRegion] state {} failed";
+  public static final String PID_CREATEREGION_CREATE_FAILED_WILL_RETRY =
+      "[pid{}][CreateRegion] failed to create region {} on DataNode {} 
(attempt {}), will retry. status: {}";
+  public static final String 
PID_CREATEREGION_SUCCESS_HAS_BEEN_CREATED_ON_DATANODE_PROCEDURE_TOOK =
+      "[pid{}][CreateRegion] success, region {} has been created on DataNode 
{}. Procedure took {} (started at {}).";
+  public static final String 
PID_DELETEREGION_STARTED_WILL_BE_DELETED_FROM_DATANODE =
+      "[pid{}][DeleteRegion] started, region {} will be deleted from DataNode 
{}.";
+  public static final String PID_DELETEREGION_STATE_COMPLETE =
+      "[pid{}][DeleteRegion] state {} complete";
+  public static final String PID_DELETEREGION_STATE_FAILED =
+      "[pid{}][DeleteRegion] state {} failed";
+  public static final String PID_DELETEREGION_SUBMIT_TASK_FAILED =
+      "[pid{}][DeleteRegion] failed to submit the delete task for region {} to 
DataNode {}. status: {}";
+  public static final String PID_DELETEREGION_EXECUTED_FAILED =
+      "[pid{}][DeleteRegion] failed to delete region {} from DataNode {}, task 
status is {}. You may need to manually clean up the region files.";
+  public static final String 
PID_DELETEREGION_SUCCESS_REGION_HAS_BEEN_DELETED_FROM_DATANODE_PROCEDURE =
+      "[pid{}][DeleteRegion] success, region {} has been deleted from DataNode 
{}. Procedure took {} (started at {}).";
   public static final String 
PID_MIGRATEREGION_STARTED_WILL_BE_MIGRATED_FROM_DATANODE_TO =
       "[pid{}][MigrateRegion] started, {} will be migrated from DataNode {} to 
{}.";
   public static final String PID_MIGRATEREGION_STATE_COMPLETE =
diff --git 
a/iotdb-core/confignode/src/main/i18n/zh/org/apache/iotdb/confignode/i18n/ProcedureMessages.java
 
b/iotdb-core/confignode/src/main/i18n/zh/org/apache/iotdb/confignode/i18n/ProcedureMessages.java
index 126055a7c57..a3ed7b44d50 100644
--- 
a/iotdb-core/confignode/src/main/i18n/zh/org/apache/iotdb/confignode/i18n/ProcedureMessages.java
+++ 
b/iotdb-core/confignode/src/main/i18n/zh/org/apache/iotdb/confignode/i18n/ProcedureMessages.java
@@ -614,6 +614,24 @@ public final class ProcedureMessages {
   public static final String PID_ADDREGION_STATE_FAILED = "[pid{}][AddRegion] 
state {} failed";
   public static final String 
PID_ADDREGION_SUCCESS_HAS_BEEN_ADDED_TO_DATANODE_PROCEDURE_TOOK =
       "[pid{}][AddRegion] success, {} has been added to DataNode {}. Procedure 
took {} (start at {}).";
+  public static final String 
PID_CREATEREGION_STARTED_WILL_BE_CREATED_ON_DATANODE =
+      "[pid{}][CreateRegion] 开始,region {} 将在 DataNode {} 上创建。";
+  public static final String PID_CREATEREGION_STATE_COMPLETE = 
"[pid{}][CreateRegion] 状态 {} 完成";
+  public static final String PID_CREATEREGION_STATE_FAILED = 
"[pid{}][CreateRegion] 状态 {} 失败";
+  public static final String PID_CREATEREGION_CREATE_FAILED_WILL_RETRY =
+      "[pid{}][CreateRegion] 创建 region {} 到 DataNode {} 失败(第 {} 
次尝试),将重试。status: {}";
+  public static final String 
PID_CREATEREGION_SUCCESS_HAS_BEEN_CREATED_ON_DATANODE_PROCEDURE_TOOK =
+      "[pid{}][CreateRegion] 成功,region {} 已在 DataNode {} 上创建。过程耗时 {}(开始于 {})。";
+  public static final String 
PID_DELETEREGION_STARTED_WILL_BE_DELETED_FROM_DATANODE =
+      "[pid{}][DeleteRegion] 开始,region {} 将从 DataNode {} 上删除。";
+  public static final String PID_DELETEREGION_STATE_COMPLETE = 
"[pid{}][DeleteRegion] 状态 {} 完成";
+  public static final String PID_DELETEREGION_STATE_FAILED = 
"[pid{}][DeleteRegion] 状态 {} 失败";
+  public static final String PID_DELETEREGION_SUBMIT_TASK_FAILED =
+      "[pid{}][DeleteRegion] 提交 region {} 的删除任务到 DataNode {} 失败。status: {}";
+  public static final String PID_DELETEREGION_EXECUTED_FAILED =
+      "[pid{}][DeleteRegion] 从 DataNode {} 删除 region {} 失败,任务状态为 {}。你可能需要手动清理 
region 文件。";
+  public static final String 
PID_DELETEREGION_SUCCESS_REGION_HAS_BEEN_DELETED_FROM_DATANODE_PROCEDURE =
+      "[pid{}][DeleteRegion] 成功,region {} 已从 DataNode {} 上删除。过程耗时 {}(开始于 {})。";
   public static final String 
PID_MIGRATEREGION_STARTED_WILL_BE_MIGRATED_FROM_DATANODE_TO =
       "[pid{}][MigrateRegion] started, {} will be migrated from DataNode {} to 
{}.";
   public static final String PID_MIGRATEREGION_STATE_COMPLETE =
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/CnToDnSyncRequestType.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/CnToDnSyncRequestType.java
index 790fd637d61..566e8728dbe 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/CnToDnSyncRequestType.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/CnToDnSyncRequestType.java
@@ -35,6 +35,7 @@ public enum CnToDnSyncRequestType {
   ADD_REGION_PEER,
   REMOVE_REGION_PEER,
   DELETE_OLD_REGION_PEER,
+  DELETE_REGION_ASYNC,
   RESET_PEER_LIST,
 
   // Data Partition Table Maintenance
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncDataNodeClientPool.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncDataNodeClientPool.java
index 14c833fb2fc..4ec13cd9089 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncDataNodeClientPool.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncDataNodeClientPool.java
@@ -133,6 +133,9 @@ public class SyncDataNodeClientPool {
     actionMapBuilder.put(
         CnToDnSyncRequestType.DELETE_OLD_REGION_PEER,
         (req, client) -> client.deleteOldRegionPeer((TMaintainPeerReq) req));
+    actionMapBuilder.put(
+        CnToDnSyncRequestType.DELETE_REGION_ASYNC,
+        (req, client) -> client.deleteRegionAsync((TMaintainPeerReq) req));
     actionMapBuilder.put(
         CnToDnSyncRequestType.RESET_PEER_LIST,
         (req, client) -> client.resetPeerList((TResetPeerListReq) req));
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java
index dce1db12cd0..ac47840c873 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java
@@ -62,11 +62,17 @@ public enum ConfigPhysicalPlanType {
   GetRegionInfoList((short) 302),
   @Deprecated
   UpdateRegionLocation((short) 303),
+  // Deprecated: the RegionMaintainer queue has been replaced by 
Create/DeleteRegionProcedure. These
+  // plan types are kept (and still deserialized + applied as no-ops) only so 
an old ConfigNode
+  // consensus log can be replayed during an upgrade.
+  @Deprecated
   OfferRegionMaintainTasks((short) 304),
+  @Deprecated
   PollRegionMaintainTask((short) 305),
   GetRegionId((short) 306),
   GetSeriesSlotList((short) 307),
   GetTimeSlotList((short) 308),
+  @Deprecated
   PollSpecificRegionMaintainTask((short) 309),
   CountTimeSlotList((short) 310),
   AddRegionLocation((short) 311),
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java
index ad0a82bcf56..f0e24ad4d1a 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java
@@ -380,8 +380,6 @@ public class ConfigRegionStateMachine implements 
IStateMachine, IStateMachine.Ev
         new LeaderServiceStartup(
             "RetryFailedTasksService",
             () -> 
configManager.getRetryFailedTasksThread().startRetryFailedTasksService()),
-        new LeaderServiceStartup(
-            "RegionCleaner", () -> 
configManager.getPartitionManager().startRegionCleaner()),
         // Add metrics after leader ready.
         new LeaderServiceStartup("Metrics", () -> configManager.addMetrics()),
         // Activate leader related service for config pipe.
@@ -426,7 +424,6 @@ public class ConfigRegionStateMachine implements 
IStateMachine, IStateMachine.Ev
     configManager.getLoadManager().stopLoadServices();
     configManager.getProcedureManager().stopExecutor();
     configManager.getRetryFailedTasksThread().stopRetryFailedTasksService();
-    configManager.getPartitionManager().stopRegionCleaner();
     configManager.getCQManager().stopCQScheduler();
     configManager.getClusterSchemaManager().clearSchemaQuotaCache();
     // Remove Metric after leader change
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index da10bb22283..544da2aaf89 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -456,9 +456,6 @@ public class ConfigManager implements IManager {
   }
 
   public void close() throws IOException {
-    if (partitionManager != null) {
-      partitionManager.getRegionMaintainer().shutdown();
-    }
     if (procedureManager != null) {
       procedureManager.stopExecutor();
     }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
index 2beeea6b4af..05c4357934d 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
@@ -56,7 +56,6 @@ import 
org.apache.iotdb.confignode.consensus.request.write.datanode.RemoveDataNo
 import 
org.apache.iotdb.confignode.consensus.request.write.procedure.UpdateProcedurePlan;
 import 
org.apache.iotdb.confignode.consensus.request.write.region.CreateRegionGroupsPlan;
 import org.apache.iotdb.confignode.i18n.ManagerMessages;
-import org.apache.iotdb.confignode.manager.partition.PartitionManager;
 import org.apache.iotdb.confignode.persistence.ProcedureInfo;
 import org.apache.iotdb.confignode.procedure.PartitionTableAutoCleaner;
 import org.apache.iotdb.confignode.procedure.Procedure;
@@ -317,9 +316,8 @@ public class ProcedureManager {
     }
     List<TSStatus> results = new ArrayList<>(procedures.size());
     procedures.forEach(procedure -> 
results.add(waitingProcedureFinished(procedure)));
-    // Clear the previously deleted regions
-    final PartitionManager partitionManager = 
getConfigManager().getPartitionManager();
-    
partitionManager.getRegionMaintainer().submit(partitionManager::maintainRegionReplicas);
+    // Region deletion is now handled by the DeleteRegionProcedure children 
spawned inside each
+    // DeleteDatabaseProcedure, so there is no longer a RegionMaintainer queue 
to kick here.
     if (results.stream()
         .allMatch(result -> result.getCode() == 
TSStatusCode.SUCCESS_STATUS.getStatusCode())) {
       return StatusUtils.OK;
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
index 0edb0e389dc..766202afd39 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
@@ -27,17 +27,11 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
 import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
 import org.apache.iotdb.commons.cluster.RegionRoleType;
-import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
-import org.apache.iotdb.commons.concurrent.ThreadName;
-import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
 import org.apache.iotdb.commons.conf.CommonConfig;
 import org.apache.iotdb.commons.conf.CommonDescriptor;
 import org.apache.iotdb.commons.partition.DataPartitionTable;
 import org.apache.iotdb.commons.partition.SchemaPartitionTable;
 import org.apache.iotdb.commons.partition.executor.SeriesPartitionExecutor;
-import org.apache.iotdb.confignode.client.async.CnToDnAsyncRequestType;
-import 
org.apache.iotdb.confignode.client.async.CnToDnInternalServiceAsyncRequestManager;
-import 
org.apache.iotdb.confignode.client.async.handlers.DataNodeAsyncRequestContext;
 import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
 import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
 import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan;
@@ -58,7 +52,6 @@ import 
org.apache.iotdb.confignode.consensus.request.write.partition.CreateDataP
 import 
org.apache.iotdb.confignode.consensus.request.write.partition.CreateSchemaPartitionPlan;
 import 
org.apache.iotdb.confignode.consensus.request.write.partition.RemoveRegionLocationPlan;
 import 
org.apache.iotdb.confignode.consensus.request.write.region.CreateRegionGroupsPlan;
-import 
org.apache.iotdb.confignode.consensus.request.write.region.PollSpecificRegionMaintainTaskPlan;
 import 
org.apache.iotdb.confignode.consensus.response.partition.CountTimeSlotListResp;
 import 
org.apache.iotdb.confignode.consensus.response.partition.DataPartitionResp;
 import 
org.apache.iotdb.confignode.consensus.response.partition.GetRegionGroupsByTimeResp;
@@ -80,10 +73,6 @@ import org.apache.iotdb.confignode.manager.load.LoadManager;
 import org.apache.iotdb.confignode.manager.node.NodeManager;
 import org.apache.iotdb.confignode.manager.schema.ClusterSchemaManager;
 import org.apache.iotdb.confignode.persistence.partition.PartitionInfo;
-import 
org.apache.iotdb.confignode.persistence.partition.maintainer.RegionCreateTask;
-import 
org.apache.iotdb.confignode.persistence.partition.maintainer.RegionDeleteTask;
-import 
org.apache.iotdb.confignode.persistence.partition.maintainer.RegionMaintainTask;
-import 
org.apache.iotdb.confignode.persistence.partition.maintainer.RegionMaintainType;
 import 
org.apache.iotdb.confignode.procedure.impl.partition.DataPartitionTableIntegrityCheckProcedure;
 import org.apache.iotdb.confignode.rpc.thrift.TCountTimeSlotListReq;
 import org.apache.iotdb.confignode.rpc.thrift.TGetRegionGroupsByTimeReq;
@@ -92,8 +81,6 @@ import 
org.apache.iotdb.confignode.rpc.thrift.TGetSeriesSlotListReq;
 import org.apache.iotdb.confignode.rpc.thrift.TGetTimeSlotListReq;
 import org.apache.iotdb.confignode.rpc.thrift.TTimeSlotList;
 import org.apache.iotdb.consensus.exception.ConsensusException;
-import org.apache.iotdb.mpp.rpc.thrift.TCreateDataRegionReq;
-import org.apache.iotdb.mpp.rpc.thrift.TCreateSchemaRegionReq;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 
@@ -106,21 +93,14 @@ import org.slf4j.LoggerFactory;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
 import java.util.LinkedHashMap;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
-import java.util.Queue;
 import java.util.Set;
 import java.util.StringJoiner;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
@@ -144,25 +124,12 @@ public class PartitionManager {
   public static final String CONSENSUS_WRITE_ERROR =
       "Failed in the write API executing the consensus layer due to: ";
 
-  // Monitor for leadership change
-  private final Object scheduleMonitor = new Object();
-
-  /** Region cleaner. */
-  // Try to delete Regions in every 10s
-  private static final int REGION_MAINTAINER_WORK_INTERVAL = 10;
-
-  private final ScheduledExecutorService regionMaintainer;
-  private Future<?> currentRegionMaintainerFuture;
-
   private final AtomicBoolean dataPartitionTableIntegrityCheckProcedureRunning 
=
       new AtomicBoolean(false);
 
   public PartitionManager(IManager configManager, PartitionInfo partitionInfo) 
{
     this.configManager = configManager;
     this.partitionInfo = partitionInfo;
-    this.regionMaintainer =
-        IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
-            ThreadName.CONFIG_NODE_REGION_MAINTAINER.getName());
     setSeriesPartitionExecutor();
   }
 
@@ -1319,250 +1286,6 @@ public class PartitionManager {
     return partitionInfo.getRegionDatabase(regionId);
   }
 
-  /**
-   * Called by {@link PartitionManager#regionMaintainer}.
-   *
-   * <p>Periodically maintain the RegionReplicas to be created or deleted
-   */
-  public void maintainRegionReplicas() {
-    // The consensusManager of configManager may not be fully initialized at 
this time
-    Optional.ofNullable(getConsensusManager())
-        .ifPresent(
-            consensusManager -> {
-              if (getConsensusManager().isLeader()) {
-                List<RegionMaintainTask> regionMaintainTaskList =
-                    partitionInfo.getRegionMaintainEntryList();
-
-                if (regionMaintainTaskList.isEmpty()) {
-                  return;
-                }
-
-                // Group tasks by region id
-                Map<TConsensusGroupId, Queue<RegionMaintainTask>> 
regionMaintainTaskMap =
-                    new HashMap<>();
-                for (RegionMaintainTask regionMaintainTask : 
regionMaintainTaskList) {
-                  regionMaintainTaskMap
-                      .computeIfAbsent(regionMaintainTask.getRegionId(), k -> 
new LinkedList<>())
-                      .add(regionMaintainTask);
-                }
-
-                while (!regionMaintainTaskMap.isEmpty()) {
-                  // Select same type task from each region group
-                  List<RegionMaintainTask> selectedRegionMaintainTask = new 
ArrayList<>();
-                  RegionMaintainType currentType = null;
-                  for (Map.Entry<TConsensusGroupId, Queue<RegionMaintainTask>> 
entry :
-                      regionMaintainTaskMap.entrySet()) {
-                    RegionMaintainTask regionMaintainTask = 
entry.getValue().peek();
-                    if (regionMaintainTask == null) {
-                      continue;
-                    }
-
-                    if (currentType == null) {
-                      currentType = regionMaintainTask.getType();
-                      selectedRegionMaintainTask.add(entry.getValue().peek());
-                    } else {
-                      if (!currentType.equals(regionMaintainTask.getType())) {
-                        continue;
-                      }
-
-                      if (currentType.equals(RegionMaintainType.DELETE)
-                          || entry
-                              .getKey()
-                              .getType()
-                              
.equals(selectedRegionMaintainTask.get(0).getRegionId().getType())) {
-                        // Delete or same create task
-                        
selectedRegionMaintainTask.add(entry.getValue().peek());
-                      }
-                    }
-                  }
-
-                  if (selectedRegionMaintainTask.isEmpty()) {
-                    break;
-                  }
-
-                  Set<TConsensusGroupId> successfulTask = new HashSet<>();
-                  switch (currentType) {
-                    case CREATE:
-                      // create region
-                      switch 
(selectedRegionMaintainTask.get(0).getRegionId().getType()) {
-                        case SchemaRegion:
-                          // create SchemaRegion
-                          DataNodeAsyncRequestContext<TCreateSchemaRegionReq, 
TSStatus>
-                              createSchemaRegionHandler =
-                                  new DataNodeAsyncRequestContext<>(
-                                      
CnToDnAsyncRequestType.CREATE_SCHEMA_REGION);
-                          for (RegionMaintainTask regionMaintainTask : 
selectedRegionMaintainTask) {
-                            RegionCreateTask schemaRegionCreateTask =
-                                (RegionCreateTask) regionMaintainTask;
-                            LOGGER.info(
-                                
ManagerMessages.START_TO_CREATE_REGION_ON_DATANODE,
-                                
schemaRegionCreateTask.getRegionReplicaSet().getRegionId(),
-                                schemaRegionCreateTask.getTargetDataNode());
-                            createSchemaRegionHandler.putRequest(
-                                schemaRegionCreateTask.getRegionId().getId(),
-                                new TCreateSchemaRegionReq(
-                                    
schemaRegionCreateTask.getRegionReplicaSet(),
-                                    schemaRegionCreateTask.getStorageGroup()));
-                            createSchemaRegionHandler.putNodeLocation(
-                                schemaRegionCreateTask.getRegionId().getId(),
-                                schemaRegionCreateTask.getTargetDataNode());
-                          }
-
-                          
CnToDnInternalServiceAsyncRequestManager.getInstance()
-                              
.sendAsyncRequestWithRetry(createSchemaRegionHandler);
-
-                          for (Map.Entry<Integer, TSStatus> entry :
-                              
createSchemaRegionHandler.getResponseMap().entrySet()) {
-                            if (entry.getValue().getCode()
-                                == 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-                              successfulTask.add(
-                                  new TConsensusGroupId(
-                                      TConsensusGroupType.SchemaRegion, 
entry.getKey()));
-                            }
-                          }
-                          break;
-                        case DataRegion:
-                          // Create DataRegion
-                          DataNodeAsyncRequestContext<TCreateDataRegionReq, 
TSStatus>
-                              createDataRegionHandler =
-                                  new DataNodeAsyncRequestContext<>(
-                                      
CnToDnAsyncRequestType.CREATE_DATA_REGION);
-                          for (RegionMaintainTask regionMaintainTask : 
selectedRegionMaintainTask) {
-                            RegionCreateTask dataRegionCreateTask =
-                                (RegionCreateTask) regionMaintainTask;
-                            LOGGER.info(
-                                
ManagerMessages.START_TO_CREATE_REGION_ON_DATANODE,
-                                
dataRegionCreateTask.getRegionReplicaSet().getRegionId(),
-                                dataRegionCreateTask.getTargetDataNode());
-                            createDataRegionHandler.putRequest(
-                                dataRegionCreateTask.getRegionId().getId(),
-                                new TCreateDataRegionReq(
-                                    dataRegionCreateTask.getRegionReplicaSet(),
-                                    dataRegionCreateTask.getStorageGroup()));
-                            createDataRegionHandler.putNodeLocation(
-                                dataRegionCreateTask.getRegionId().getId(),
-                                dataRegionCreateTask.getTargetDataNode());
-                          }
-
-                          
CnToDnInternalServiceAsyncRequestManager.getInstance()
-                              
.sendAsyncRequestWithRetry(createDataRegionHandler);
-
-                          for (Map.Entry<Integer, TSStatus> entry :
-                              
createDataRegionHandler.getResponseMap().entrySet()) {
-                            if (entry.getValue().getCode()
-                                == 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-                              successfulTask.add(
-                                  new TConsensusGroupId(
-                                      TConsensusGroupType.DataRegion, 
entry.getKey()));
-                            }
-                          }
-                          break;
-                      }
-                      break;
-                    case DELETE:
-                      // delete region
-                      DataNodeAsyncRequestContext<TConsensusGroupId, TSStatus> 
deleteRegionHandler =
-                          new 
DataNodeAsyncRequestContext<>(CnToDnAsyncRequestType.DELETE_REGION);
-                      Map<Integer, TConsensusGroupId> regionIdMap = new 
HashMap<>();
-                      for (RegionMaintainTask regionMaintainTask : 
selectedRegionMaintainTask) {
-                        RegionDeleteTask regionDeleteTask = (RegionDeleteTask) 
regionMaintainTask;
-                        LOGGER.info(
-                            ManagerMessages.START_TO_DELETE_REGION_ON_DATANODE,
-                            regionDeleteTask.getRegionId(),
-                            regionDeleteTask.getTargetDataNode());
-                        deleteRegionHandler.putRequest(
-                            regionDeleteTask.getRegionId().getId(), 
regionDeleteTask.getRegionId());
-                        deleteRegionHandler.putNodeLocation(
-                            regionDeleteTask.getRegionId().getId(),
-                            regionDeleteTask.getTargetDataNode());
-                        regionIdMap.put(
-                            regionDeleteTask.getRegionId().getId(), 
regionDeleteTask.getRegionId());
-                      }
-
-                      long startTime = System.currentTimeMillis();
-                      CnToDnInternalServiceAsyncRequestManager.getInstance()
-                          .sendAsyncRequestWithRetry(deleteRegionHandler);
-
-                      LOGGER.info(
-                          ManagerMessages.DELETING_REGIONS_COSTS_MS,
-                          (System.currentTimeMillis() - startTime));
-
-                      for (Map.Entry<Integer, TSStatus> entry :
-                          deleteRegionHandler.getResponseMap().entrySet()) {
-                        if (entry.getValue().getCode()
-                            == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-                          successfulTask.add(regionIdMap.get(entry.getKey()));
-                        }
-                      }
-                      break;
-                  }
-
-                  if (successfulTask.isEmpty()) {
-                    break;
-                  }
-
-                  for (TConsensusGroupId regionId : successfulTask) {
-                    regionMaintainTaskMap.compute(
-                        regionId,
-                        (k, v) -> {
-                          if (v == null) {
-                            throw new IllegalStateException();
-                          }
-                          v.poll();
-                          if (v.isEmpty()) {
-                            return null;
-                          } else {
-                            return v;
-                          }
-                        });
-                  }
-
-                  // Poll the head entry if success
-                  try {
-                    getConsensusManager()
-                        .write(new 
PollSpecificRegionMaintainTaskPlan(successfulTask));
-                  } catch (ConsensusException e) {
-                    LOGGER.warn(CONSENSUS_WRITE_ERROR, e);
-                  }
-
-                  if (successfulTask.size() < 
selectedRegionMaintainTask.size()) {
-                    // Here we just break and wait until next schedule task
-                    // due to all the RegionMaintainEntry should be executed by
-                    // the order of they were offered
-                    break;
-                  }
-                }
-              }
-            });
-  }
-
-  public void startRegionCleaner() {
-    synchronized (scheduleMonitor) {
-      if (currentRegionMaintainerFuture == null) {
-        /* Start the RegionCleaner service */
-        currentRegionMaintainerFuture =
-            ScheduledExecutorUtil.safelyScheduleAtFixedRate(
-                regionMaintainer,
-                this::maintainRegionReplicas,
-                0,
-                REGION_MAINTAINER_WORK_INTERVAL,
-                TimeUnit.SECONDS);
-        LOGGER.info(ManagerMessages.REGIONCLEANER_IS_STARTED_SUCCESSFULLY);
-      }
-    }
-  }
-
-  public void stopRegionCleaner() {
-    synchronized (scheduleMonitor) {
-      if (currentRegionMaintainerFuture != null) {
-        /* Stop the RegionCleaner service */
-        currentRegionMaintainerFuture.cancel(false);
-        currentRegionMaintainerFuture = null;
-        LOGGER.info(ManagerMessages.REGIONCLEANER_IS_STOPPED_SUCCESSFULLY);
-      }
-    }
-  }
-
   /**
    * Filter the RegionGroups in the specified Database through the 
RegionGroupStatus.
    *
@@ -1607,10 +1330,6 @@ public class PartitionManager {
     return partitionInfo.getLastDataAllotTable(database);
   }
 
-  public ScheduledExecutorService getRegionMaintainer() {
-    return regionMaintainer;
-  }
-
   private ConsensusManager getConsensusManager() {
     return configManager.getConsensusManager();
   }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
index af9429e5953..457f21358cb 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java
@@ -131,9 +131,13 @@ public class PartitionInfo implements SnapshotProcessor {
   // For table model databases: The databaseName is a full name without "root."
   private final Map<String, DatabasePartitionTable> databasePartitionTables;
 
-  /** For Region-Maintainer. */
-  // For RegionReplicas' asynchronous management
-  private final List<RegionMaintainTask> regionMaintainTaskList;
+  /**
+   * Legacy RegionMaintainer queue. The queue has been replaced by {@code 
CreateRegionProcedure} /
+   * {@code DeleteRegionProcedure}; this list is now always empty (offer/poll 
are no-op shims kept
+   * for consensus-log replay during upgrade) and is retained only so {@link 
#equals}/{@link
+   * #hashCode}/{@link #clear} keep working without touching the snapshot byte 
layout.
+   */
+  @Deprecated private final List<RegionMaintainTask> regionMaintainTaskList;
 
   private static final String SNAPSHOT_FILENAME = "partition_info.bin";
 
@@ -228,63 +232,40 @@ public class PartitionInfo implements SnapshotProcessor {
    * Offer a batch of RegionMaintainTasks for the RegionMaintainer.
    *
    * @return {@link TSStatusCode#SUCCESS_STATUS}
+   * @deprecated The RegionMaintainer queue has been replaced by {@code 
CreateRegionProcedure} /
+   *     {@code DeleteRegionProcedure}. This method is retained only so that an
+   *     OfferRegionMaintainTasksPlan still present in an old ConfigNode 
consensus log can be
+   *     replayed during an upgrade; it intentionally does nothing now (the 
tasks are no longer
+   *     drained by anyone).
    */
+  @Deprecated
   public TSStatus offerRegionMaintainTasks(
       OfferRegionMaintainTasksPlan offerRegionMaintainTasksPlan) {
-    synchronized (regionMaintainTaskList) {
-      
regionMaintainTaskList.addAll(offerRegionMaintainTasksPlan.getRegionMaintainTaskList());
-      return RpcUtils.SUCCESS_STATUS;
-    }
+    // No-op: tasks are no longer queued. See @deprecated note above.
+    return RpcUtils.SUCCESS_STATUS;
   }
 
   /**
-   * Poll the head of RegionMaintainTasks from the regionMaintainTaskList 
after it's executed
-   * successfully.
-   *
    * @return {@link TSStatusCode#SUCCESS_STATUS}
+   * @deprecated Retained only for replaying old PollRegionMaintainTaskPlan 
consensus-log entries
+   *     during an upgrade. See {@link #offerRegionMaintainTasks}.
    */
+  @Deprecated
   public TSStatus pollRegionMaintainTask() {
-    synchronized (regionMaintainTaskList) {
-      regionMaintainTaskList.remove(0);
-      return RpcUtils.SUCCESS_STATUS;
-    }
+    // No-op: the queue is always empty now. See @deprecated note above.
+    return RpcUtils.SUCCESS_STATUS;
   }
 
   /**
-   * Poll the head of RegionMaintainTasks of target regions from 
regionMaintainTaskList after they
-   * are executed successfully. Tasks of each region group are treated as 
single independent queue.
-   *
    * @param plan provides target region ids
    * @return {@link TSStatusCode#SUCCESS_STATUS}
+   * @deprecated Retained only for replaying old 
PollSpecificRegionMaintainTaskPlan consensus-log
+   *     entries during an upgrade. See {@link #offerRegionMaintainTasks}.
    */
+  @Deprecated
   public TSStatus 
pollSpecificRegionMaintainTask(PollSpecificRegionMaintainTaskPlan plan) {
-    synchronized (regionMaintainTaskList) {
-      Set<TConsensusGroupId> removingRegionIdSet = new 
HashSet<>(plan.getRegionIdSet());
-      TConsensusGroupId regionId;
-      for (int i = 0; i < regionMaintainTaskList.size(); i++) {
-        regionId = regionMaintainTaskList.get(i).getRegionId();
-        if (removingRegionIdSet.contains(regionId)) {
-          regionMaintainTaskList.remove(i);
-          removingRegionIdSet.remove(regionId);
-          i--;
-        }
-        if (removingRegionIdSet.isEmpty()) {
-          break;
-        }
-      }
-      return RpcUtils.SUCCESS_STATUS;
-    }
-  }
-
-  /**
-   * Get a deep copy of RegionCleanList for RegionCleaner to maintain cluster 
RegionReplicas.
-   *
-   * @return A deep copy of RegionCleanList
-   */
-  public List<RegionMaintainTask> getRegionMaintainEntryList() {
-    synchronized (regionMaintainTaskList) {
-      return new ArrayList<>(regionMaintainTaskList);
-    }
+    // No-op: the queue is always empty now. See @deprecated note above.
+    return RpcUtils.SUCCESS_STATUS;
   }
 
   /**
@@ -1010,11 +991,11 @@ public class PartitionInfo implements SnapshotProcessor {
         databasePartitionTableEntry.getValue().serialize(bufferedOutputStream, 
protocol);
       }
 
-      // serialize regionCleanList
-      ReadWriteIOUtils.write(regionMaintainTaskList.size(), 
bufferedOutputStream);
-      for (RegionMaintainTask task : regionMaintainTaskList) {
-        task.serialize(bufferedOutputStream, protocol);
-      }
+      // The trailing RegionMaintainer queue block is kept in the snapshot 
format for backward
+      // compatibility, but the queue itself has been replaced by 
Create/DeleteRegionProcedure.
+      // Always write an empty list so the byte layout is unchanged and an old 
ConfigNode reading
+      // this snapshot still finds a well-formed (empty) block.
+      ReadWriteIOUtils.write(0, bufferedOutputStream);
 
       // write to file
       tioStreamTransport.flush();
@@ -1073,12 +1054,31 @@ public class PartitionInfo implements SnapshotProcessor 
{
         databasePartitionTables.put(database, databasePartitionTable);
       }
 
-      // restore deletedRegionSet
+      // Read the trailing RegionMaintainer queue block. The queue has been 
replaced by
+      // Create/DeleteRegionProcedure, but an OLD snapshot (taken before the 
upgrade) may have
+      // written pending tasks here, so we must still consume exactly those 
bytes to keep the stream
+      // aligned. The tasks are discarded because nothing drains the queue 
anymore. To make any such
+      // loss observable, we log a WARN naming the dropped tasks; an operator 
can then re-trigger
+      // the
+      // corresponding region creation/deletion if it did not otherwise 
converge.
       length = ReadWriteIOUtils.readInt(fileInputStream);
+      final List<RegionMaintainTask> droppedTasks = new ArrayList<>();
       for (int i = 0; i < length; i++) {
-        final RegionMaintainTask task =
-            RegionMaintainTask.Factory.create(fileInputStream, protocol);
-        regionMaintainTaskList.add(task);
+        // Advance the stream past one serialized task and drop it.
+        droppedTasks.add(RegionMaintainTask.Factory.create(fileInputStream, 
protocol));
+      }
+      if (!droppedTasks.isEmpty()) {
+        final List<String> droppedTaskDescriptions = new ArrayList<>();
+        for (final RegionMaintainTask task : droppedTasks) {
+          droppedTaskDescriptions.add(task.getType() + " " + 
task.getRegionId());
+        }
+        LOGGER.warn(
+            "Dropped {} legacy RegionMaintainTask(s) while loading the 
snapshot; the RegionMaintainer "
+                + "queue has been replaced by Create/DeleteRegionProcedure and 
these pending tasks are "
+                + "no longer executed: {}. Please verify the affected regions 
and re-trigger their "
+                + "creation/deletion manually if needed.",
+            droppedTasks.size(),
+            droppedTaskDescriptions);
       }
     }
   }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/RegionMaintainHandler.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/RegionMaintainHandler.java
index 9ce0c9f72a3..37ca8fe8d85 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/RegionMaintainHandler.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/RegionMaintainHandler.java
@@ -53,7 +53,9 @@ import 
org.apache.iotdb.confignode.manager.load.cache.consensus.ConsensusGroupHe
 import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
 import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
 import org.apache.iotdb.consensus.pipe.consensuspipe.ConsensusPipeName;
+import org.apache.iotdb.mpp.rpc.thrift.TCreateDataRegionReq;
 import org.apache.iotdb.mpp.rpc.thrift.TCreatePeerReq;
+import org.apache.iotdb.mpp.rpc.thrift.TCreateSchemaRegionReq;
 import org.apache.iotdb.mpp.rpc.thrift.TMaintainPeerReq;
 import org.apache.iotdb.mpp.rpc.thrift.TRegionLeaderChangeResp;
 import org.apache.iotdb.mpp.rpc.thrift.TRegionMigrateResult;
@@ -218,6 +220,37 @@ public class RegionMaintainHandler {
     return status;
   }
 
+  /**
+   * Synchronously create a single region replica on the target DataNode.
+   *
+   * <p>Used by {@code CreateRegionProcedure}. Unlike region deletion, 
creating a region returns a
+   * final, unambiguous status from a single RPC (it does not remove large 
amounts of data), so it
+   * is kept synchronous; wrapping it in a procedure only adds persistence and 
retry across leader
+   * changes.
+   *
+   * @param regionReplicaSet the region (carries the regionId and its type)
+   * @param storageGroup the database the region belongs to
+   * @param targetDataNode the DataNode on which to create the replica
+   * @return the TSStatus of the create RPC
+   */
+  public TSStatus createRegion(
+      TRegionReplicaSet regionReplicaSet, String storageGroup, 
TDataNodeLocation targetDataNode) {
+    final TConsensusGroupId regionId = regionReplicaSet.getRegionId();
+    final Object req;
+    final CnToDnSyncRequestType requestType;
+    if (TConsensusGroupType.SchemaRegion.equals(regionId.getType())) {
+      req = new TCreateSchemaRegionReq(regionReplicaSet, storageGroup);
+      requestType = CnToDnSyncRequestType.CREATE_SCHEMA_REGION;
+    } else {
+      req = new TCreateDataRegionReq(regionReplicaSet, storageGroup);
+      requestType = CnToDnSyncRequestType.CREATE_DATA_REGION;
+    }
+    return (TSStatus)
+        SyncDataNodeClientPool.getInstance()
+            .sendSyncRequestToDataNodeWithRetry(
+                targetDataNode.getInternalEndPoint(), req, requestType);
+  }
+
   /**
    * Order the specific ConsensusGroup to add peer for the new RegionReplica.
    *
@@ -332,6 +365,52 @@ public class RegionMaintainHandler {
     return status;
   }
 
+  /**
+   * Order the specified DataNode to asynchronously delete a region replica 
and all of its data.
+   *
+   * <p>The DataNode submits the deletion as a background task keyed by {@code 
procedureId} and
+   * returns immediately. The caller (a {@code DeleteRegionProcedure}) then 
polls progress via
+   * {@link #waitTaskFinish(long, TDataNodeLocation)}. This is the replacement 
for the old
+   * synchronous {@code deleteRegion} RPC, which could time out on slow 
deletions and let the
+   * ConfigNode's retry be wrongly reported as success.
+   *
+   * @param procedureId used as the taskId so the DataNode can dedup retried 
submits and the
+   *     ConfigNode can poll the result
+   * @param targetDataNode the DataNode that holds the replica to be deleted
+   * @param regionId region id
+   * @return TSStatus of the submit (not of the deletion itself)
+   */
+  public TSStatus submitDeleteRegionTask(
+      long procedureId, TDataNodeLocation targetDataNode, TConsensusGroupId 
regionId) {
+    TMaintainPeerReq maintainPeerReq = new TMaintainPeerReq(regionId, 
targetDataNode, procedureId);
+
+    // The target DataNode may be down (e.g. while deleting a database whose 
replica sits on an
+    // unreachable node). Fall back to a single-shot retry in that case, 
mirroring
+    // submitDeleteOldRegionPeerTask, so we do not block on an endless retry 
of a dead node.
+    final NodeStatus nodeStatus = 
getDataNodeStatus(targetDataNode.getDataNodeId());
+    final boolean useFullRetry = !NodeStatus.Unknown.equals(nodeStatus);
+    if (!useFullRetry) {
+      LOGGER.info(
+          
ProcedureMessages.DATANODE_IS_SUBMIT_DELETE_OLD_REGION_PEER_WITH_A_SINGLE,
+          REGION_MIGRATE_PROCESS,
+          simplifiedLocation(targetDataNode),
+          nodeStatus);
+    }
+
+    TSStatus status =
+        submitDataNodeSyncRequest(
+            targetDataNode.getInternalEndPoint(),
+            maintainPeerReq,
+            CnToDnSyncRequestType.DELETE_REGION_ASYNC,
+            useFullRetry);
+    LOGGER.info(
+        
ProcedureMessages.SEND_ACTION_DELETEOLDREGIONPEER_FINISHED_REGIONID_DATANODEID,
+        REGION_MIGRATE_PROCESS,
+        regionId,
+        targetDataNode.getInternalEndPoint());
+    return status;
+  }
+
   protected NodeStatus getDataNodeStatus(int dataNodeId) {
     return configManager.getLoadManager().getNodeStatus(dataNodeId);
   }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/CreateRegionGroupsProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/CreateRegionGroupsProcedure.java
index e9cce807e77..850fd8bb69b 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/CreateRegionGroupsProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/CreateRegionGroupsProcedure.java
@@ -31,18 +31,14 @@ import 
org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils;
 import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
 import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
 import 
org.apache.iotdb.confignode.consensus.request.write.region.CreateRegionGroupsPlan;
-import 
org.apache.iotdb.confignode.consensus.request.write.region.OfferRegionMaintainTasksPlan;
-import org.apache.iotdb.confignode.i18n.ConfigNodeMessages;
 import org.apache.iotdb.confignode.i18n.ProcedureMessages;
 import 
org.apache.iotdb.confignode.manager.load.cache.region.RegionHeartbeatSample;
-import 
org.apache.iotdb.confignode.persistence.partition.maintainer.RegionCreateTask;
-import 
org.apache.iotdb.confignode.persistence.partition.maintainer.RegionDeleteTask;
+import org.apache.iotdb.confignode.procedure.Procedure;
 import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
 import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
 import org.apache.iotdb.confignode.procedure.impl.StateMachineProcedure;
 import org.apache.iotdb.confignode.procedure.state.CreateRegionGroupsState;
 import org.apache.iotdb.confignode.procedure.store.ProcedureType;
-import org.apache.iotdb.consensus.exception.ConsensusException;
 import org.apache.iotdb.rpc.TSStatusCode;
 
 import org.slf4j.Logger;
@@ -51,7 +47,9 @@ import org.slf4j.LoggerFactory;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
@@ -106,7 +104,12 @@ public class CreateRegionGroupsProcedure
         break;
       case SHUNT_REGION_REPLICAS:
         persistPlan = new CreateRegionGroupsPlan();
-        final OfferRegionMaintainTasksPlan offerPlan = new 
OfferRegionMaintainTasksPlan();
+        // Recreate-failed-replica and delete-redundant-replica tasks were 
previously offered to the
+        // RegionMaintainer queue. They are now run as child procedures 
(CreateRegionProcedure /
+        // DeleteRegionProcedure) so they are persisted, retried across leader 
changes, and — for
+        // the
+        // delete case — polled to completion instead of being fire-and-forget.
+        final List<Procedure<ConfigNodeProcedureEnv>> shuntProcedures = new 
ArrayList<>();
         // Filter those RegionGroups that created successfully
         createRegionGroupsPlan
             .getRegionGroupMap()
@@ -138,16 +141,14 @@ public class CreateRegionGroupsProcedure
                               // half of the RegionReplicas created 
successfully
                               persistPlan.addRegionGroup(database, 
regionReplicaSet);
 
-                              // Build recreate tasks
+                              // Build recreate tasks for the replicas that 
failed to create
                               failedRegionReplicas
                                   .getDataNodeLocations()
                                   .forEach(
-                                      targetDataNode -> {
-                                        RegionCreateTask createTask =
-                                            new RegionCreateTask(
-                                                targetDataNode, database, 
regionReplicaSet);
-                                        
offerPlan.appendRegionMaintainTask(createTask);
-                                      });
+                                      targetDataNode ->
+                                          shuntProcedures.add(
+                                              new CreateRegionProcedure(
+                                                  database, regionReplicaSet, 
targetDataNode)));
 
                               LOGGER.info(
                                   ProcedureMessages
@@ -162,10 +163,9 @@ public class CreateRegionGroupsProcedure
                                         if (!failedRegionReplicas
                                             .getDataNodeLocations()
                                             .contains(targetDataNode)) {
-                                          RegionDeleteTask deleteTask =
-                                              new RegionDeleteTask(
-                                                  targetDataNode, 
regionReplicaSet.getRegionId());
-                                          
offerPlan.appendRegionMaintainTask(deleteTask);
+                                          shuntProcedures.add(
+                                              new DeleteRegionProcedure(
+                                                  
regionReplicaSet.getRegionId(), targetDataNode));
                                         }
                                       });
 
@@ -182,12 +182,7 @@ public class CreateRegionGroupsProcedure
           setFailure(new ProcedureException(new 
IoTDBException(persistStatus)));
           return Flow.NO_MORE_STATE;
         }
-        try {
-          env.getConfigManager().getConsensusManager().write(offerPlan);
-        } catch (final ConsensusException e) {
-          LOGGER.warn(
-              
ConfigNodeMessages.FAILED_IN_THE_WRITE_API_EXECUTING_THE_CONSENSUS_LAYER_DUE, 
e);
-        }
+        shuntProcedures.forEach(this::addChildProcedure);
         setNextState(CreateRegionGroupsState.REBALANCE_DATA_PARTITION_POLICY);
         break;
       case REBALANCE_DATA_PARTITION_POLICY:
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/CreateRegionProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/CreateRegionProcedure.java
new file mode 100644
index 00000000000..b85b4d96adb
--- /dev/null
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/CreateRegionProcedure.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.procedure.impl.region;
+
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.exception.runtime.ThriftSerDeException;
+import org.apache.iotdb.commons.queryengine.utils.DateTimeUtils;
+import org.apache.iotdb.commons.utils.CommonDateTimeUtils;
+import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils;
+import org.apache.iotdb.confignode.i18n.ProcedureMessages;
+import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
+import org.apache.iotdb.confignode.procedure.env.RegionMaintainHandler;
+import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
+import org.apache.iotdb.confignode.procedure.state.CreateRegionState;
+import org.apache.iotdb.confignode.procedure.store.ProcedureType;
+
+import org.apache.tsfile.utils.ReadWriteIOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Objects;
+
+import static org.apache.iotdb.commons.utils.KillPoint.KillPoint.setKillPoint;
+import static 
org.apache.iotdb.confignode.procedure.env.RegionMaintainHandler.simplifiedLocation;
+import static org.apache.iotdb.rpc.TSStatusCode.SUCCESS_STATUS;
+
+/**
+ * Create a single region replica on one DataNode.
+ *
+ * <p>This replaces the queue-based CREATE path in {@code 
PartitionManager.maintainRegionReplicas}.
+ * Region creation already returns a final, unambiguous status from a single 
synchronous RPC, so
+ * this procedure simply wraps that RPC, gaining persistence and retry across 
ConfigNode leader
+ * changes.
+ */
+public class CreateRegionProcedure extends 
RegionOperationProcedure<CreateRegionState> {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(CreateRegionProcedure.class);
+  private static final int MAX_RETRY = 5;
+
+  private String storageGroup;
+  private TRegionReplicaSet regionReplicaSet;
+  private TDataNodeLocation targetDataNode;
+
+  public CreateRegionProcedure() {
+    super();
+  }
+
+  public CreateRegionProcedure(
+      String storageGroup, TRegionReplicaSet regionReplicaSet, 
TDataNodeLocation targetDataNode) {
+    super(regionReplicaSet.getRegionId());
+    this.storageGroup = storageGroup;
+    this.regionReplicaSet = regionReplicaSet;
+    this.targetDataNode = targetDataNode;
+  }
+
+  @Override
+  protected Flow executeFromState(ConfigNodeProcedureEnv env, 
CreateRegionState state)
+      throws InterruptedException {
+    if (regionId == null || targetDataNode == null) {
+      return Flow.NO_MORE_STATE;
+    }
+    RegionMaintainHandler handler = env.getRegionMaintainHandler();
+    try {
+      switch (state) {
+        case CREATE_REGION:
+          LOGGER.info(
+              
ProcedureMessages.PID_CREATEREGION_STARTED_WILL_BE_CREATED_ON_DATANODE,
+              getProcId(),
+              regionId,
+              simplifiedLocation(targetDataNode));
+          TSStatus status = handler.createRegion(regionReplicaSet, 
storageGroup, targetDataNode);
+          setKillPoint(state);
+          if (status.getCode() != SUCCESS_STATUS.getStatusCode()) {
+            if (getCycles() < MAX_RETRY) {
+              LOGGER.warn(
+                  ProcedureMessages.PID_CREATEREGION_CREATE_FAILED_WILL_RETRY,
+                  getProcId(),
+                  regionId,
+                  simplifiedLocation(targetDataNode),
+                  getCycles() + 1,
+                  status);
+              setNextState(CreateRegionState.CREATE_REGION);
+              return Flow.HAS_MORE_STATE;
+            }
+            LOGGER.warn(ProcedureMessages.PID_CREATEREGION_STATE_FAILED, 
getProcId(), state);
+            return Flow.NO_MORE_STATE;
+          }
+          // Requirement: every successfully completed maintain task must be 
logged.
+          LOGGER.info(
+              ProcedureMessages
+                  
.PID_CREATEREGION_SUCCESS_HAS_BEEN_CREATED_ON_DATANODE_PROCEDURE_TOOK,
+              getProcId(),
+              regionId,
+              simplifiedLocation(targetDataNode),
+              CommonDateTimeUtils.convertMillisecondToDurationStr(
+                  System.currentTimeMillis() - getSubmittedTime()),
+              DateTimeUtils.convertLongToDate(getSubmittedTime(), "ms"));
+          return Flow.NO_MORE_STATE;
+        default:
+          throw new ProcedureException(ProcedureMessages.UNSUPPORTED_STATE + 
state.name());
+      }
+    } catch (Exception e) {
+      LOGGER.error(ProcedureMessages.PID_CREATEREGION_STATE_FAILED, 
getProcId(), state, e);
+      return Flow.NO_MORE_STATE;
+    }
+  }
+
+  @Override
+  protected void rollbackState(ConfigNodeProcedureEnv env, CreateRegionState 
state)
+      throws IOException, InterruptedException, ProcedureException {}
+
+  @Override
+  protected CreateRegionState getState(int stateId) {
+    return CreateRegionState.values()[stateId];
+  }
+
+  @Override
+  protected int getStateId(CreateRegionState createRegionState) {
+    return createRegionState.ordinal();
+  }
+
+  @Override
+  protected CreateRegionState getInitialState() {
+    return CreateRegionState.CREATE_REGION;
+  }
+
+  public TDataNodeLocation getTargetDataNode() {
+    return targetDataNode;
+  }
+
+  @Override
+  public void serialize(DataOutputStream stream) throws IOException {
+    stream.writeShort(ProcedureType.CREATE_REGION_PROCEDURE.getTypeCode());
+    super.serialize(stream);
+    ReadWriteIOUtils.write(storageGroup, stream);
+    ThriftCommonsSerDeUtils.serializeTRegionReplicaSet(regionReplicaSet, 
stream);
+    ThriftCommonsSerDeUtils.serializeTDataNodeLocation(targetDataNode, stream);
+  }
+
+  @Override
+  public void deserialize(ByteBuffer byteBuffer) {
+    super.deserialize(byteBuffer);
+    try {
+      storageGroup = ReadWriteIOUtils.readString(byteBuffer);
+      regionReplicaSet = 
ThriftCommonsSerDeUtils.deserializeTRegionReplicaSet(byteBuffer);
+      targetDataNode = 
ThriftCommonsSerDeUtils.deserializeTDataNodeLocation(byteBuffer);
+      regionId = regionReplicaSet.getRegionId();
+    } catch (ThriftSerDeException e) {
+      LOGGER.error(ProcedureMessages.ERROR_IN_DESERIALIZE, this.getClass(), e);
+    }
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (!(obj instanceof CreateRegionProcedure)) {
+      return false;
+    }
+    CreateRegionProcedure procedure = (CreateRegionProcedure) obj;
+    return Objects.equals(this.storageGroup, procedure.storageGroup)
+        && Objects.equals(this.regionReplicaSet, procedure.regionReplicaSet)
+        && Objects.equals(this.targetDataNode, procedure.targetDataNode);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(storageGroup, regionReplicaSet, targetDataNode);
+  }
+
+  @Override
+  public String toString() {
+    return "CreateRegionProcedure{"
+        + "regionId="
+        + regionId
+        + ", storageGroup="
+        + storageGroup
+        + ", targetDataNode="
+        + simplifiedLocation(targetDataNode)
+        + '}';
+  }
+}
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/DeleteRegionProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/DeleteRegionProcedure.java
new file mode 100644
index 00000000000..d5988481fb9
--- /dev/null
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/DeleteRegionProcedure.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.procedure.impl.region;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.exception.runtime.ThriftSerDeException;
+import org.apache.iotdb.commons.queryengine.utils.DateTimeUtils;
+import org.apache.iotdb.commons.utils.CommonDateTimeUtils;
+import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils;
+import org.apache.iotdb.confignode.i18n.ProcedureMessages;
+import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
+import org.apache.iotdb.confignode.procedure.env.RegionMaintainHandler;
+import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
+import org.apache.iotdb.confignode.procedure.state.DeleteRegionState;
+import org.apache.iotdb.confignode.procedure.store.ProcedureType;
+import org.apache.iotdb.mpp.rpc.thrift.TRegionMigrateResult;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Objects;
+
+import static org.apache.iotdb.commons.utils.KillPoint.KillPoint.setKillPoint;
+import static 
org.apache.iotdb.confignode.procedure.env.RegionMaintainHandler.simplifiedLocation;
+import static org.apache.iotdb.rpc.TSStatusCode.SUCCESS_STATUS;
+
+/**
+ * Delete a single region replica (the consensus peer and all of its data) 
from one DataNode.
+ *
+ * <p>This replaces the old queue-based DELETE path in {@code
+ * PartitionManager.maintainRegionReplicas}. The DataNode runs the deletion 
asynchronously (it can
+ * remove a large amount of TsFile data and outlive a single RPC timeout) and 
this procedure polls
+ * for the result, so a slow deletion can never be wrongly reported as 
finished — fixing the "ghost
+ * task" problem where the coordinator forgot a task that was still running.
+ */
+public class DeleteRegionProcedure extends 
RegionOperationProcedure<DeleteRegionState> {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(DeleteRegionProcedure.class);
+
+  private TDataNodeLocation targetDataNode;
+
+  public DeleteRegionProcedure() {
+    super();
+  }
+
+  public DeleteRegionProcedure(TConsensusGroupId regionId, TDataNodeLocation 
targetDataNode) {
+    super(regionId);
+    this.targetDataNode = targetDataNode;
+  }
+
+  @Override
+  protected Flow executeFromState(ConfigNodeProcedureEnv env, 
DeleteRegionState state)
+      throws InterruptedException {
+    if (regionId == null || targetDataNode == null) {
+      return Flow.NO_MORE_STATE;
+    }
+    RegionMaintainHandler handler = env.getRegionMaintainHandler();
+    try {
+      switch (state) {
+        case DELETE_REGION:
+          LOGGER.info(
+              
ProcedureMessages.PID_DELETEREGION_STARTED_WILL_BE_DELETED_FROM_DATANODE,
+              getProcId(),
+              regionId,
+              simplifiedLocation(targetDataNode));
+          // Only submit the delete task on the very first entry of this 
state. We must NOT
+          // re-submit
+          // when:
+          //   - the state was restored from disk after a leader change / 
ConfigNode reboot
+          //     (isStateDeserialized()), or
+          //   - this state is being re-entered in place because a previous 
attempt parked here on
+          //     PROCESSING (getCycles() > 0).
+          // The DataNode also dedups by taskId (= procId), so a duplicate 
submit would be a no-op,
+          // but skipping it here avoids the useless RPC and keeps the re-poll 
cheap.
+          if (!this.isStateDeserialized() && getCycles() == 0) {
+            TSStatus submitStatus =
+                handler.submitDeleteRegionTask(this.getProcId(), 
targetDataNode, regionId);
+            setKillPoint(state);
+            if (submitStatus.getCode() != SUCCESS_STATUS.getStatusCode()) {
+              // Submit failed (e.g. the DataNode is unreachable). End the 
procedure; the parent
+              // will
+              // detect via the partition table that the region is still 
present and may retry.
+              LOGGER.warn(
+                  ProcedureMessages.PID_DELETEREGION_SUBMIT_TASK_FAILED,
+                  getProcId(),
+                  regionId,
+                  simplifiedLocation(targetDataNode),
+                  submitStatus);
+              return Flow.NO_MORE_STATE;
+            }
+          }
+          TRegionMigrateResult result = 
handler.waitTaskFinish(this.getProcId(), targetDataNode);
+          switch (result.getTaskStatus()) {
+            case SUCCESS:
+              // Requirement: every successfully completed maintain task must 
be logged.
+              LOGGER.info(
+                  ProcedureMessages
+                      
.PID_DELETEREGION_SUCCESS_REGION_HAS_BEEN_DELETED_FROM_DATANODE_PROCEDURE,
+                  getProcId(),
+                  regionId,
+                  simplifiedLocation(targetDataNode),
+                  CommonDateTimeUtils.convertMillisecondToDurationStr(
+                      System.currentTimeMillis() - getSubmittedTime()),
+                  DateTimeUtils.convertLongToDate(getSubmittedTime(), "ms"));
+              return Flow.NO_MORE_STATE;
+            case PROCESSING:
+              // waitTaskFinish() only returns PROCESSING when its polling 
loop was interrupted,
+              // i.e.
+              // this ConfigNode is shutting down / losing leadership. The 
delete task is still
+              // running on the DataNode, so we must persist and re-poll after 
recovery rather than
+              // declare the region deleted. Stay in DELETE_REGION; the 
isStateDeserialized() guard
+              // above prevents re-submitting.
+              setNextState(DeleteRegionState.DELETE_REGION);
+              return Flow.HAS_MORE_STATE;
+            case TASK_NOT_EXIST:
+            // The DataNode has no record of this task (it never received the 
submit, or it
+            // restarted
+            // and lost its in-memory task table). Treat it as a terminal 
failure: the parent's
+            // CHECK
+            // state inspects the partition table and will spawn a fresh 
DeleteRegionProcedure (with
+            // a
+            // new procId) if the region is still present, which retries 
cleanly.
+            case FAIL:
+            default:
+              LOGGER.warn(
+                  ProcedureMessages.PID_DELETEREGION_EXECUTED_FAILED,
+                  getProcId(),
+                  regionId,
+                  simplifiedLocation(targetDataNode),
+                  result.getTaskStatus());
+              return Flow.NO_MORE_STATE;
+          }
+        default:
+          throw new ProcedureException(ProcedureMessages.UNSUPPORTED_STATE + 
state.name());
+      }
+    } catch (Exception e) {
+      LOGGER.error(ProcedureMessages.PID_DELETEREGION_STATE_FAILED, 
getProcId(), state, e);
+      return Flow.NO_MORE_STATE;
+    }
+  }
+
+  @Override
+  protected void rollbackState(ConfigNodeProcedureEnv env, DeleteRegionState 
state)
+      throws IOException, InterruptedException, ProcedureException {}
+
+  @Override
+  protected DeleteRegionState getState(int stateId) {
+    return DeleteRegionState.values()[stateId];
+  }
+
+  @Override
+  protected int getStateId(DeleteRegionState deleteRegionState) {
+    return deleteRegionState.ordinal();
+  }
+
+  @Override
+  protected DeleteRegionState getInitialState() {
+    return DeleteRegionState.DELETE_REGION;
+  }
+
+  public TDataNodeLocation getTargetDataNode() {
+    return targetDataNode;
+  }
+
+  @Override
+  public void serialize(DataOutputStream stream) throws IOException {
+    stream.writeShort(ProcedureType.DELETE_REGION_PROCEDURE.getTypeCode());
+    super.serialize(stream);
+    ThriftCommonsSerDeUtils.serializeTConsensusGroupId(regionId, stream);
+    ThriftCommonsSerDeUtils.serializeTDataNodeLocation(targetDataNode, stream);
+  }
+
+  @Override
+  public void deserialize(ByteBuffer byteBuffer) {
+    super.deserialize(byteBuffer);
+    try {
+      regionId = 
ThriftCommonsSerDeUtils.deserializeTConsensusGroupId(byteBuffer);
+      targetDataNode = 
ThriftCommonsSerDeUtils.deserializeTDataNodeLocation(byteBuffer);
+    } catch (ThriftSerDeException e) {
+      LOGGER.error(ProcedureMessages.ERROR_IN_DESERIALIZE, this.getClass(), e);
+    }
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (!(obj instanceof DeleteRegionProcedure)) {
+      return false;
+    }
+    DeleteRegionProcedure procedure = (DeleteRegionProcedure) obj;
+    return this.regionId.equals(procedure.regionId)
+        && this.targetDataNode.equals(procedure.targetDataNode);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(regionId, targetDataNode);
+  }
+
+  @Override
+  public String toString() {
+    return "DeleteRegionProcedure{"
+        + "regionId="
+        + regionId
+        + ", targetDataNode="
+        + simplifiedLocation(targetDataNode)
+        + '}';
+  }
+}
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteDatabaseProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteDatabaseProcedure.java
index 7b8de782173..493c0124eaf 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteDatabaseProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteDatabaseProcedure.java
@@ -19,29 +19,21 @@
 
 package org.apache.iotdb.confignode.procedure.impl.schema;
 
-import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
-import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
-import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.exception.runtime.ThriftSerDeException;
 import org.apache.iotdb.commons.service.metric.MetricService;
 import org.apache.iotdb.commons.utils.ThriftConfigNodeSerDeUtils;
-import org.apache.iotdb.confignode.client.async.CnToDnAsyncRequestType;
-import 
org.apache.iotdb.confignode.client.async.CnToDnInternalServiceAsyncRequestManager;
-import 
org.apache.iotdb.confignode.client.async.handlers.DataNodeAsyncRequestContext;
 import 
org.apache.iotdb.confignode.consensus.request.write.database.PreDeleteDatabasePlan;
-import 
org.apache.iotdb.confignode.consensus.request.write.region.OfferRegionMaintainTasksPlan;
 import org.apache.iotdb.confignode.i18n.ProcedureMessages;
 import org.apache.iotdb.confignode.manager.partition.PartitionMetrics;
-import 
org.apache.iotdb.confignode.persistence.partition.maintainer.RegionDeleteTask;
 import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
 import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
 import org.apache.iotdb.confignode.procedure.impl.StateMachineProcedure;
+import org.apache.iotdb.confignode.procedure.impl.region.DeleteRegionProcedure;
 import org.apache.iotdb.confignode.procedure.state.schema.DeleteDatabaseState;
 import org.apache.iotdb.confignode.procedure.store.ProcedureType;
 import org.apache.iotdb.confignode.rpc.thrift.TDatabaseSchema;
-import org.apache.iotdb.consensus.exception.ConsensusException;
 import org.apache.iotdb.rpc.TSStatusCode;
 
 import org.apache.thrift.TException;
@@ -51,10 +43,7 @@ import org.slf4j.LoggerFactory;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 import java.util.Objects;
 
 public class DeleteDatabaseProcedure
@@ -114,86 +103,29 @@ public class DeleteDatabaseProcedure
               "[DeleteDatabaseProcedure] Delete DatabaseSchema: {}",
               deleteDatabaseSchema.getName());
 
-          // Submit RegionDeleteTasks
-          final OfferRegionMaintainTasksPlan dataRegionDeleteTaskOfferPlan =
-              new OfferRegionMaintainTasksPlan();
+          // Delete every region replica (both schema and data regions) of 
this database via a
+          // DeleteRegionProcedure child. Unlike the old fire-and-forget 
RegionMaintainer queue, the
+          // DatabasePartitionTable (handled in the next state) is only 
removed once these children
+          // have finished, so a slow region deletion can no longer become a 
forgotten "ghost task".
           final List<TRegionReplicaSet> regionReplicaSets =
               env.getAllReplicaSets(deleteDatabaseSchema.getName());
-          final List<TRegionReplicaSet> schemaRegionReplicaSets = new 
ArrayList<>();
           regionReplicaSets.forEach(
               regionReplicaSet -> {
                 // Clear heartbeat cache along the way
                 env.getConfigManager()
                     .getLoadManager()
                     
.removeRegionGroupRelatedCache(regionReplicaSet.getRegionId());
-
-                if (regionReplicaSet
-                    .getRegionId()
-                    .getType()
-                    .equals(TConsensusGroupType.SchemaRegion)) {
-                  schemaRegionReplicaSets.add(regionReplicaSet);
-                } else {
-                  regionReplicaSet
-                      .getDataNodeLocations()
-                      .forEach(
-                          targetDataNode ->
-                              
dataRegionDeleteTaskOfferPlan.appendRegionMaintainTask(
-                                  new RegionDeleteTask(
-                                      targetDataNode, 
regionReplicaSet.getRegionId())));
-                }
+                regionReplicaSet
+                    .getDataNodeLocations()
+                    .forEach(
+                        targetDataNode ->
+                            addChildProcedure(
+                                new DeleteRegionProcedure(
+                                    regionReplicaSet.getRegionId(), 
targetDataNode)));
               });
-
-          if 
(!dataRegionDeleteTaskOfferPlan.getRegionMaintainTaskList().isEmpty()) {
-            // submit async data region delete task
-            
env.getConfigManager().getConsensusManager().write(dataRegionDeleteTaskOfferPlan);
-          }
-
-          // try sync delete schemaengine region
-          final DataNodeAsyncRequestContext<TConsensusGroupId, TSStatus> 
asyncClientHandler =
-              new 
DataNodeAsyncRequestContext<>(CnToDnAsyncRequestType.DELETE_REGION);
-          final Map<Integer, RegionDeleteTask> schemaRegionDeleteTaskMap = new 
HashMap<>();
-          int requestIndex = 0;
-          for (final TRegionReplicaSet schemaRegionReplicaSet : 
schemaRegionReplicaSets) {
-            for (final TDataNodeLocation dataNodeLocation :
-                schemaRegionReplicaSet.getDataNodeLocations()) {
-              asyncClientHandler.putRequest(requestIndex, 
schemaRegionReplicaSet.getRegionId());
-              asyncClientHandler.putNodeLocation(requestIndex, 
dataNodeLocation);
-              schemaRegionDeleteTaskMap.put(
-                  requestIndex,
-                  new RegionDeleteTask(dataNodeLocation, 
schemaRegionReplicaSet.getRegionId()));
-              requestIndex++;
-            }
-          }
-          if (!schemaRegionDeleteTaskMap.isEmpty()) {
-            CnToDnInternalServiceAsyncRequestManager.getInstance()
-                .sendAsyncRequestWithRetry(asyncClientHandler);
-            for (final Map.Entry<Integer, TSStatus> entry :
-                asyncClientHandler.getResponseMap().entrySet()) {
-              if (entry.getValue().getCode() == 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-                LOG.info(
-                    "[DeleteDatabaseProcedure] Successfully delete 
SchemaRegion[{}] on {}",
-                    asyncClientHandler.getRequest(entry.getKey()),
-                    
schemaRegionDeleteTaskMap.get(entry.getKey()).getTargetDataNode());
-                schemaRegionDeleteTaskMap.remove(entry.getKey());
-              } else {
-                LOG.warn(
-                    "[DeleteDatabaseProcedure] Failed to delete 
SchemaRegion[{}] on {}. Submit to async deletion.",
-                    asyncClientHandler.getRequest(entry.getKey()),
-                    
schemaRegionDeleteTaskMap.get(entry.getKey()).getTargetDataNode());
-              }
-            }
-
-            if (!schemaRegionDeleteTaskMap.isEmpty()) {
-              // submit async schemaengine region delete task for failed sync 
execution
-              final OfferRegionMaintainTasksPlan 
schemaRegionDeleteTaskOfferPlan =
-                  new OfferRegionMaintainTasksPlan();
-              schemaRegionDeleteTaskMap
-                  .values()
-                  
.forEach(schemaRegionDeleteTaskOfferPlan::appendRegionMaintainTask);
-              
env.getConfigManager().getConsensusManager().write(schemaRegionDeleteTaskOfferPlan);
-            }
-          }
-
+          setNextState(DeleteDatabaseState.DELETE_DATABASE_CONFIG);
+          break;
+        case DELETE_DATABASE_CONFIG:
           env.getConfigManager()
               .getLoadManager()
               .clearDataPartitionPolicyTable(deleteDatabaseSchema.getName());
@@ -220,7 +152,7 @@ public class DeleteDatabaseProcedure
                     
ProcedureMessages.DELETEDATABASEPROCEDURE_DELETE_DATABASESCHEMA_FAILED));
           }
       }
-    } catch (final ConsensusException | TException | IOException e) {
+    } catch (final TException | IOException e) {
       if (isRollbackSupported(state)) {
         setFailure(
             new ProcedureException(
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/schema/DeleteDatabaseState.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/CreateRegionState.java
similarity index 83%
copy from 
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/schema/DeleteDatabaseState.java
copy to 
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/CreateRegionState.java
index cde6b2bdd81..798544ed794 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/schema/DeleteDatabaseState.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/CreateRegionState.java
@@ -17,10 +17,8 @@
  * under the License.
  */
 
-package org.apache.iotdb.confignode.procedure.state.schema;
+package org.apache.iotdb.confignode.procedure.state;
 
-public enum DeleteDatabaseState {
-  PRE_DELETE_DATABASE,
-  INVALIDATE_CACHE,
-  DELETE_DATABASE_SCHEMA
+public enum CreateRegionState {
+  CREATE_REGION,
 }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/schema/DeleteDatabaseState.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/DeleteRegionState.java
similarity index 83%
copy from 
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/schema/DeleteDatabaseState.java
copy to 
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/DeleteRegionState.java
index cde6b2bdd81..5cc51cf6c52 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/schema/DeleteDatabaseState.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/DeleteRegionState.java
@@ -17,10 +17,8 @@
  * under the License.
  */
 
-package org.apache.iotdb.confignode.procedure.state.schema;
+package org.apache.iotdb.confignode.procedure.state;
 
-public enum DeleteDatabaseState {
-  PRE_DELETE_DATABASE,
-  INVALIDATE_CACHE,
-  DELETE_DATABASE_SCHEMA
+public enum DeleteRegionState {
+  DELETE_REGION,
 }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/schema/DeleteDatabaseState.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/schema/DeleteDatabaseState.java
index cde6b2bdd81..070d0518a16 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/schema/DeleteDatabaseState.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/schema/DeleteDatabaseState.java
@@ -22,5 +22,8 @@ package org.apache.iotdb.confignode.procedure.state.schema;
 public enum DeleteDatabaseState {
   PRE_DELETE_DATABASE,
   INVALIDATE_CACHE,
-  DELETE_DATABASE_SCHEMA
+  DELETE_DATABASE_SCHEMA,
+  // Delete the DatabasePartitionTable and related config after all region 
replicas have been
+  // deleted by the DeleteRegionProcedure children spawned in 
DELETE_DATABASE_SCHEMA.
+  DELETE_DATABASE_CONFIG
 }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java
index 231b44ad2ee..40dd48a3def 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java
@@ -40,6 +40,8 @@ import 
org.apache.iotdb.confignode.procedure.impl.pipe.task.StartPipeProcedureV2
 import 
org.apache.iotdb.confignode.procedure.impl.pipe.task.StopPipeProcedureV2;
 import 
org.apache.iotdb.confignode.procedure.impl.region.AddRegionPeerProcedure;
 import 
org.apache.iotdb.confignode.procedure.impl.region.CreateRegionGroupsProcedure;
+import org.apache.iotdb.confignode.procedure.impl.region.CreateRegionProcedure;
+import org.apache.iotdb.confignode.procedure.impl.region.DeleteRegionProcedure;
 import 
org.apache.iotdb.confignode.procedure.impl.region.NotifyRegionMigrationProcedure;
 import 
org.apache.iotdb.confignode.procedure.impl.region.ReconstructRegionProcedure;
 import 
org.apache.iotdb.confignode.procedure.impl.region.RegionMigrateProcedure;
@@ -145,6 +147,12 @@ public class ProcedureFactory implements IProcedureFactory 
{
       case NOTIFY_REGION_MIGRATION_PROCEDURE:
         procedure = new NotifyRegionMigrationProcedure();
         break;
+      case DELETE_REGION_PROCEDURE:
+        procedure = new DeleteRegionProcedure();
+        break;
+      case CREATE_REGION_PROCEDURE:
+        procedure = new CreateRegionProcedure();
+        break;
       case ALTER_ENCODING_COMPRESSOR_PROCEDURE:
         procedure = new AlterEncodingCompressorProcedure(false);
         break;
@@ -463,6 +471,10 @@ public class ProcedureFactory implements IProcedureFactory 
{
       return ProcedureType.RECONSTRUCT_REGION_PROCEDURE;
     } else if (procedure instanceof NotifyRegionMigrationProcedure) {
       return ProcedureType.NOTIFY_REGION_MIGRATION_PROCEDURE;
+    } else if (procedure instanceof DeleteRegionProcedure) {
+      return ProcedureType.DELETE_REGION_PROCEDURE;
+    } else if (procedure instanceof CreateRegionProcedure) {
+      return ProcedureType.CREATE_REGION_PROCEDURE;
     } else if (procedure instanceof CreateTriggerProcedure) {
       return ProcedureType.CREATE_TRIGGER_PROCEDURE;
     } else if (procedure instanceof DropTriggerProcedure) {
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureType.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureType.java
index 1cd6a46a4dc..c070663e2de 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureType.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureType.java
@@ -41,6 +41,8 @@ public enum ProcedureType {
   ADD_REGION_PEER_PROCEDURE((short) 204),
   REMOVE_REGION_PEER_PROCEDURE((short) 205),
   NOTIFY_REGION_MIGRATION_PROCEDURE((short) 206),
+  DELETE_REGION_PROCEDURE((short) 207),
+  CREATE_REGION_PROCEDURE((short) 208),
   @TestOnly
   CREATE_MANY_DATABASES_PROCEDURE((short) 250),
 
diff --git 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PartitionInfoTest.java
 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PartitionInfoTest.java
index afccb0c0eba..8e67bc6f5d8 100644
--- 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PartitionInfoTest.java
+++ 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PartitionInfoTest.java
@@ -143,6 +143,9 @@ public class PartitionInfoTest {
                 testFlag.DataPartition.getFlag(), 
TConsensusGroupType.DataRegion));
     partitionInfo.createDataPartition(createDataPartitionPlan);
 
+    // The RegionMaintainer queue has been replaced by 
Create/DeleteRegionProcedure;
+    // offerRegionMaintainTasks is now a no-op kept only for consensus-log 
replay. Calling it here
+    // verifies it does not corrupt the snapshot (the maintain-task block is 
written as empty).
     
partitionInfo.offerRegionMaintainTasks(generateOfferRegionMaintainTasksPlan());
 
     Assert.assertTrue(partitionInfo.processTakeSnapshot(snapshotDir));
@@ -152,6 +155,46 @@ public class PartitionInfoTest {
     Assert.assertEquals(partitionInfo, partitionInfo1);
   }
 
+  /**
+   * An old ConfigNode could take a snapshot while its RegionMaintainer queue 
was non-empty, writing
+   * a trailing block of serialized tasks (count {@literal >} 0). The new code 
no longer drains that
+   * queue, but it must still consume exactly those bytes so the snapshot 
stream stays aligned, and
+   * then discard the tasks. This test writes such a legacy snapshot by hand 
and verifies the new
+   * loader reads it without error and ends up with an empty maintain list.
+   */
+  @Test
+  public void testLoadLegacySnapshotWithRegionMaintainTasksIsDrained()
+      throws TException, IOException {
+    partitionInfo.generateNextRegionGroupId();
+    partitionInfo.createDatabase(
+        new DatabaseSchemaPlan(
+            ConfigPhysicalPlanType.CreateDatabase, new 
TDatabaseSchema("root.test")));
+
+    // Produce a baseline snapshot with the new (empty) maintain block, then 
rewrite its trailing
+    // block with two legacy tasks so it mimics a snapshot taken by an old 
node.
+    Assert.assertTrue(partitionInfo.processTakeSnapshot(snapshotDir));
+    File snapshotFile = new File(snapshotDir, "partition_info.bin");
+    byte[] base = java.nio.file.Files.readAllBytes(snapshotFile.toPath());
+    // The new snapshot ends with an int "0" (the empty maintain-task count). 
Strip those last 4
+    // bytes and append a legacy block: count=2 followed by two serialized 
RegionMaintainTasks.
+    java.io.ByteArrayOutputStream legacy = new java.io.ByteArrayOutputStream();
+    legacy.write(base, 0, base.length - Integer.BYTES);
+    org.apache.tsfile.utils.ReadWriteIOUtils.write(2, legacy);
+    org.apache.thrift.protocol.TProtocol protocol =
+        new org.apache.thrift.protocol.TBinaryProtocol(
+            new org.apache.thrift.transport.TIOStreamTransport(legacy));
+    OfferRegionMaintainTasksPlan tasks = 
generateOfferRegionMaintainTasksPlan();
+    tasks.getRegionMaintainTaskList().get(0).serialize(legacy, protocol);
+    tasks.getRegionMaintainTaskList().get(1).serialize(legacy, protocol);
+    protocol.getTransport().flush();
+    java.nio.file.Files.write(snapshotFile.toPath(), legacy.toByteArray());
+
+    PartitionInfo loaded = new PartitionInfo();
+    // Must not throw, and the legacy tasks must be drained (not retained).
+    loaded.processLoadSnapshot(snapshotDir);
+    Assert.assertEquals(partitionInfo, loaded);
+  }
+
   @Test
   public void testGetRegionType() {
 
diff --git 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/region/CreateRegionProcedureTest.java
 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/region/CreateRegionProcedureTest.java
new file mode 100644
index 00000000000..be5689f9944
--- /dev/null
+++ 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/region/CreateRegionProcedureTest.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.procedure.impl.region;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.confignode.procedure.store.ProcedureFactory;
+
+import org.apache.tsfile.utils.PublicBAOS;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.DataOutputStream;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+
+public class CreateRegionProcedureTest {
+  @Test
+  public void serDeTest() throws Exception {
+    TDataNodeLocation targetDataNode =
+        new TDataNodeLocation(
+            1,
+            new TEndPoint("127.0.0.1", 0),
+            new TEndPoint("127.0.0.1", 1),
+            new TEndPoint("127.0.0.1", 2),
+            new TEndPoint("127.0.0.1", 3),
+            new TEndPoint("127.0.0.1", 4));
+    TRegionReplicaSet regionReplicaSet =
+        new TRegionReplicaSet(
+            new TConsensusGroupId(TConsensusGroupType.DataRegion, 10),
+            Collections.singletonList(targetDataNode));
+    CreateRegionProcedure procedure =
+        new CreateRegionProcedure("root.sg", regionReplicaSet, targetDataNode);
+    try (PublicBAOS byteArrayOutputStream = new PublicBAOS();
+        DataOutputStream outputStream = new 
DataOutputStream(byteArrayOutputStream)) {
+      procedure.serialize(outputStream);
+      ByteBuffer buffer =
+          ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, 
byteArrayOutputStream.size());
+      // Exercises ProcedureType.CREATE_REGION_PROCEDURE + ProcedureFactory 
registration as well as
+      // the procedure's own serialize/deserialize.
+      Assert.assertEquals(procedure, 
ProcedureFactory.getInstance().create(buffer));
+    }
+  }
+}
diff --git 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/region/DeleteRegionProcedureTest.java
 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/region/DeleteRegionProcedureTest.java
new file mode 100644
index 00000000000..241f5bb7d3c
--- /dev/null
+++ 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/region/DeleteRegionProcedureTest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.procedure.impl.region;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.confignode.procedure.store.ProcedureFactory;
+
+import org.apache.tsfile.utils.PublicBAOS;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.DataOutputStream;
+import java.nio.ByteBuffer;
+
+public class DeleteRegionProcedureTest {
+  @Test
+  public void serDeTest() throws Exception {
+    DeleteRegionProcedure procedure =
+        new DeleteRegionProcedure(
+            new TConsensusGroupId(TConsensusGroupType.DataRegion, 10),
+            new TDataNodeLocation(
+                1,
+                new TEndPoint("127.0.0.1", 0),
+                new TEndPoint("127.0.0.1", 1),
+                new TEndPoint("127.0.0.1", 2),
+                new TEndPoint("127.0.0.1", 3),
+                new TEndPoint("127.0.0.1", 4)));
+    try (PublicBAOS byteArrayOutputStream = new PublicBAOS();
+        DataOutputStream outputStream = new 
DataOutputStream(byteArrayOutputStream)) {
+      procedure.serialize(outputStream);
+      ByteBuffer buffer =
+          ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, 
byteArrayOutputStream.size());
+      // Exercises ProcedureType.DELETE_REGION_PROCEDURE + ProcedureFactory 
registration as well as
+      // the procedure's own serialize/deserialize.
+      Assert.assertEquals(procedure, 
ProcedureFactory.getInstance().create(buffer));
+    }
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/i18n/en/org/apache/iotdb/db/i18n/DataNodeMiscMessages.java
 
b/iotdb-core/datanode/src/main/i18n/en/org/apache/iotdb/db/i18n/DataNodeMiscMessages.java
index 2f0a53db5c4..9d057490b24 100644
--- 
a/iotdb-core/datanode/src/main/i18n/en/org/apache/iotdb/db/i18n/DataNodeMiscMessages.java
+++ 
b/iotdb-core/datanode/src/main/i18n/en/org/apache/iotdb/db/i18n/DataNodeMiscMessages.java
@@ -977,6 +977,8 @@ public final class DataNodeMiscMessages {
       "Submit removeRegionPeer task failed, region: ";
   public static final String SUBMIT_DELETE_OLD_REGION_PEER_TASK_FAILED_REGION =
       "Submit deleteOldRegionPeer task failed, region: ";
+  public static final String SUBMIT_DELETE_REGION_TASK_FAILED_REGION =
+      "Submit deleteRegion task failed, region: ";
   public static final String CREATE_NEW_REGION_PEER_SUCCEED_REGION_ID =
       "createNewRegionPeer succeed, regionId: ";
   public static final String DISABLE_DATANODE_SUCCEED = "disable datanode 
succeed";
diff --git 
a/iotdb-core/datanode/src/main/i18n/zh/org/apache/iotdb/db/i18n/DataNodeMiscMessages.java
 
b/iotdb-core/datanode/src/main/i18n/zh/org/apache/iotdb/db/i18n/DataNodeMiscMessages.java
index c5625447212..d5dc2405f38 100644
--- 
a/iotdb-core/datanode/src/main/i18n/zh/org/apache/iotdb/db/i18n/DataNodeMiscMessages.java
+++ 
b/iotdb-core/datanode/src/main/i18n/zh/org/apache/iotdb/db/i18n/DataNodeMiscMessages.java
@@ -975,6 +975,8 @@ public final class DataNodeMiscMessages {
       "提交 removeRegionPeer 任务失败,region: ";
   public static final String SUBMIT_DELETE_OLD_REGION_PEER_TASK_FAILED_REGION =
       "提交 deleteOldRegionPeer 任务失败,region: ";
+  public static final String SUBMIT_DELETE_REGION_TASK_FAILED_REGION =
+      "提交 deleteRegion 任务失败,region: ";
   public static final String CREATE_NEW_REGION_PEER_SUCCEED_REGION_ID =
       "createNewRegionPeer 成功,regionId: ";
   public static final String DISABLE_DATANODE_SUCCEED = "禁用 DataNode 成功";
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
index c588c94c7b5..d7dfe61410d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -2998,6 +2998,28 @@ public class DataNodeInternalRPCServiceImpl implements 
IDataNodeRPCService.Iface
     return status;
   }
 
+  @Override
+  public TSStatus deleteRegionAsync(TMaintainPeerReq req) {
+    TSStatus consensusStatus = waitForConsensusStarted();
+    if (consensusStatus != null) {
+      return consensusStatus;
+    }
+    TConsensusGroupId regionId = req.getRegionId();
+    String selectedDataNodeIP = 
req.getDestNode().getInternalEndPoint().getIp();
+    boolean submitSucceed = 
RegionMigrateService.getInstance().submitDeleteRegionTask(req);
+    TSStatus status = new 
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+    if (submitSucceed) {
+      LOGGER.info(
+          "Successfully submit deleteRegion task for region: {}, target 
DataNode: {}",
+          regionId,
+          selectedDataNodeIP);
+      return status;
+    }
+    status.setCode(TSStatusCode.DELETE_REGION_ERROR.getStatusCode());
+    
status.setMessage(DataNodeMiscMessages.SUBMIT_DELETE_REGION_TASK_FAILED_REGION 
+ regionId);
+    return status;
+  }
+
   // TODO: return which DataNode fail
   @Override
   public TSStatus resetPeerList(TResetPeerListReq req) throws TException {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java
index 39c1a1678cf..f6fae2c4a8b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java
@@ -236,6 +236,43 @@ public class RegionMigrateService implements IService {
     return submitSucceed;
   }
 
+  /**
+   * Submit a {@link DeleteRegionTask} that asynchronously deletes a region 
replica on this
+   * DataNode.
+   *
+   * <p>Used by the ConfigNode's {@code DeleteRegionProcedure} to replace the 
old synchronous {@code
+   * deleteRegion} RPC. The deletion can outlive the RPC timeout (it removes 
TsFiles and the
+   * consensus peer), so it is run in the background and the ConfigNode polls 
{@link
+   * #getRegionMaintainResult} for progress. The {@link #addToTaskResultMap} 
guard makes a retried
+   * submit (after an RPC timeout or a ConfigNode leader change) a no-op, so 
the task is executed
+   * exactly once.
+   *
+   * @param req TMaintainPeerReq, whose destNode is the DataNode to delete the 
replica from
+   * @return whether the submit succeeded
+   */
+  public synchronized boolean submitDeleteRegionTask(TMaintainPeerReq req) {
+    boolean submitSucceed = true;
+    try {
+      if (!addToTaskResultMap(req.getTaskId())) {
+        LOGGER.warn(
+            "{} The DeleteRegionTask {} has already been submitted and will 
not be submitted again.",
+            REGION_MIGRATE_PROCESS,
+            req.getTaskId());
+        return true;
+      }
+      regionMigratePool.submit(
+          new DeleteRegionTask(req.getTaskId(), req.getRegionId(), 
req.getDestNode()));
+    } catch (Exception e) {
+      LOGGER.error(
+          "{}, Submit DeleteRegionTask error for Region: {}",
+          REGION_MIGRATE_PROCESS,
+          req.getRegionId(),
+          e);
+      submitSucceed = false;
+    }
+    return submitSucceed;
+  }
+
   public synchronized TSStatus resetPeerList(TResetPeerListReq req) {
     List<Peer> correctPeers =
         req.getCorrectLocations().stream()
@@ -586,6 +623,131 @@ public class RegionMigrateService implements IService {
     }
   }
 
+  /**
+   * Asynchronously deletes a region replica (the consensus peer and all of 
its data) on this
+   * DataNode.
+   *
+   * <p>This is the background worker behind {@link #submitDeleteRegionTask}. 
It mirrors the logic
+   * of the synchronous {@code deleteRegion} RPC (delete the local consensus 
peer, then delete the
+   * region data), but records its terminal state in {@link #taskResultMap} so 
the ConfigNode can
+   * poll for completion instead of relying on a single RPC response.
+   *
+   * <p>Note: unlike {@link DeleteOldRegionPeerTask}, this task returns 
immediately after recording
+   * a failure via {@link #taskFail}; it never falls through to {@link 
#taskSucceed}. A failed
+   * deletion must never be reported as success, otherwise the ConfigNode 
would forget the task
+   * while the data is still present.
+   */
+  private static class DeleteRegionTask implements Runnable {
+
+    private static final Logger taskLogger = 
LoggerFactory.getLogger(DeleteRegionTask.class);
+    private final long taskId;
+    private final TConsensusGroupId tRegionId;
+    private final TDataNodeLocation targetDataNode;
+
+    public DeleteRegionTask(
+        long taskId, TConsensusGroupId tRegionId, TDataNodeLocation 
targetDataNode) {
+      this.taskId = taskId;
+      this.tRegionId = tRegionId;
+      this.targetDataNode = targetDataNode;
+    }
+
+    @Override
+    public void run() {
+      // deletePeer: remove the local consensus peer from the consensus group
+      TSStatus runResult = deletePeer();
+      if (isFailed(runResult)) {
+        taskFail(
+            taskId,
+            tRegionId,
+            targetDataNode,
+            TRegionMigrateFailedType.RemoveConsensusGroupFailed,
+            runResult);
+        return;
+      }
+
+      // deleteRegion: delete the region data (TsFiles, schema, etc.)
+      runResult = deleteRegion();
+      if (isFailed(runResult)) {
+        taskFail(
+            taskId,
+            tRegionId,
+            targetDataNode,
+            TRegionMigrateFailedType.DeleteRegionFailed,
+            runResult);
+        return;
+      }
+
+      taskSucceed(taskId, tRegionId, "DeleteRegion");
+    }
+
+    private TSStatus deletePeer() {
+      taskLogger.info(
+          "{}, Start to delete the local peer of region {} on datanode {}",
+          REGION_MIGRATE_PROCESS,
+          tRegionId,
+          targetDataNode);
+      ConsensusGroupId regionId = 
ConsensusGroupId.Factory.createFromTConsensusGroupId(tRegionId);
+      TSStatus status = new 
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+      try {
+        if (regionId instanceof DataRegionId) {
+          DataRegionConsensusImpl.getInstance().deleteLocalPeer(regionId);
+        } else {
+          SchemaRegionConsensusImpl.getInstance().deleteLocalPeer(regionId);
+        }
+      } catch (ConsensusGroupNotExistException e) {
+        // The peer is already absent. This is expected when the task is 
retried after a previous
+        // attempt already removed the peer, so treat it as success and 
continue to delete the data.
+        taskLogger.info(
+            "{}, The local peer of region {} does not exist, skip deleting it",
+            REGION_MIGRATE_PROCESS,
+            regionId);
+      } catch (ConsensusException e) {
+        String errorMsg =
+            String.format(
+                "delete local peer error, regionId: %s, errorMessage: %s",
+                regionId, e.getMessage());
+        taskLogger.error(errorMsg);
+        status.setCode(TSStatusCode.DELETE_REGION_ERROR.getStatusCode());
+        status.setMessage(errorMsg);
+        return status;
+      } catch (Exception e) {
+        taskLogger.error(
+            "{}, delete local peer error, regionId: {}", 
REGION_MIGRATE_PROCESS, regionId, e);
+        status.setCode(TSStatusCode.DELETE_REGION_ERROR.getStatusCode());
+        status.setMessage(
+            "delete local peer for region: " + regionId + " error. exception: 
" + e.getMessage());
+        return status;
+      }
+      return status;
+    }
+
+    private TSStatus deleteRegion() {
+      taskLogger.info(
+          "{}, Start to delete the data of region {} on datanode {}",
+          REGION_MIGRATE_PROCESS,
+          tRegionId,
+          targetDataNode);
+      TSStatus status = new 
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+      ConsensusGroupId regionId = 
ConsensusGroupId.Factory.createFromTConsensusGroupId(tRegionId);
+      try {
+        if (regionId instanceof DataRegionId) {
+          DataNodeRegionManager.getInstance().deleteDataRegion((DataRegionId) 
regionId);
+        } else {
+          
DataNodeRegionManager.getInstance().deleteSchemaRegion((SchemaRegionId) 
regionId);
+        }
+      } catch (Exception e) {
+        taskLogger.error("{}, delete region {} error", REGION_MIGRATE_PROCESS, 
regionId, e);
+        status.setCode(TSStatusCode.DELETE_REGION_ERROR.getStatusCode());
+        status.setMessage(
+            String.format(DataNodeMiscMessages.DELETE_REGION_ERROR, regionId, 
e.getMessage()));
+        return status;
+      }
+      
status.setMessage(String.format(DataNodeMiscMessages.DELETE_REGION_SUCCEED, 
regionId));
+      taskLogger.info("{}, Succeed to delete region {}", 
REGION_MIGRATE_PROCESS, regionId);
+      return status;
+    }
+  }
+
   private static class Holder {
 
     private static final RegionMigrateService INSTANCE = new 
RegionMigrateService();
diff --git a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift 
b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
index 11de2e4cba2..27fc51ee74a 100644
--- a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
+++ b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
@@ -944,6 +944,20 @@ service IDataNodeRPCService {
    */
   common.TSStatus deleteRegion(common.TConsensusGroupId consensusGroupId)
 
+  /**
+   * Config node asks the DataNode to asynchronously delete a data/schema 
region replica.
+   *
+   * <p>Unlike the synchronous {@link #deleteRegion}, the DataNode submits the 
deletion as a
+   * background task keyed by {@code taskId} and returns immediately. The 
ConfigNode then polls
+   * {@link #getRegionMaintainResult} until the task reaches a terminal state. 
This avoids the
+   * "ghost task" problem where a slow deletion outlives the RPC timeout and 
the ConfigNode's
+   * retry is wrongly reported as success.
+   *
+   * @param TMaintainPeerReq which contains the RegionId, the DataNodeLocation 
to delete the
+   *     replica from (destNode) and the taskId (procedureId)
+   */
+  common.TSStatus deleteRegionAsync(TMaintainPeerReq req)
+
   /**
    * Change the leader of specified RegionGroup to another DataNode
    *

Reply via email to