This is an automated email from the ASF dual-hosted git repository. CRZbulabula pushed a commit to branch improve-region-maintain-async in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit f8ae66f7b722393b568f1c905260145aed36b041 Author: Yongzao <[email protected]> AuthorDate: Thu Jun 25 18:18:40 2026 +0800 Use procedures for region create/delete to fix the delete-region ghost task The ConfigNode-leader periodically drained a persistent RegionMaintainer queue (PartitionManager.maintainRegionReplicas) by sending synchronous deleteRegion RPCs to DataNodes. A slow data-region deletion could outlive the RPC timeout; the ConfigNode retried 10s later, and the DataNode's second call returned SUCCESS (the region was already gone and ConsensusGroupNotExistException was swallowed). The ConfigNode then polled the task off the queue and believed it was done, while the first call was still running on the DataNode - a "ghost task". Replace the whole RegionMaintainer queue with Procedures that mirror the existing AddRegionPeerProcedure/RemoveRegionPeerProcedure pattern: - DataNode: new deleteRegionAsync(TMaintainPeerReq) RPC plus a DeleteRegionTask in RegionMigrateService. The deletion runs in the background and records its terminal state in taskResultMap, so the ConfigNode polls getRegionMaintainResult for progress (PROCESSING/SUCCESS/FAIL/TASK_NOT_EXIST) instead of trusting a single RPC response. The task returns immediately after taskFail and never falls through to taskSucceed, so a failed deletion can never be reported as success. - ConfigNode: new DeleteRegionProcedure (async submit + waitTaskFinish poll, taskId = procId, idempotent on retry/leader-change) and CreateRegionProcedure (the create RPC already returns a final status, so it stays synchronous and the procedure only adds persistence and retry). DeleteDatabaseProcedure and CreateRegionGroupsProcedure now spawn these as child procedures instead of offering to the queue, and every completed task is logged. - Upgrade safety: the OfferRegionMaintainTasks/PollRegionMaintainTask/ PollSpecificRegionMaintainTask plan types and classes are kept (@Deprecated, still deserialized and applied as no-ops) so an old consensus log still replays. The PartitionInfo snapshot has no version header, so the trailing RegionMaintainer block is still consumed on load (and the tasks discarded with a WARN) and written as empty on save, keeping the byte layout unchanged. Add DeleteRegionProcedureTest/CreateRegionProcedureTest and a PartitionInfo test that loads a legacy snapshot containing pending tasks. --- .../iotdb/confignode/i18n/ProcedureMessages.java | 22 ++ .../iotdb/confignode/i18n/ProcedureMessages.java | 18 ++ .../client/sync/CnToDnSyncRequestType.java | 1 + .../client/sync/SyncDataNodeClientPool.java | 3 + .../consensus/request/ConfigPhysicalPlanType.java | 6 + .../statemachine/ConfigRegionStateMachine.java | 3 - .../iotdb/confignode/manager/ConfigManager.java | 3 - .../iotdb/confignode/manager/ProcedureManager.java | 6 +- .../manager/partition/PartitionManager.java | 281 --------------------- .../persistence/partition/PartitionInfo.java | 106 ++++---- .../procedure/env/RegionMaintainHandler.java | 79 ++++++ .../impl/region/CreateRegionGroupsProcedure.java | 41 ++- .../impl/region/CreateRegionProcedure.java | 201 +++++++++++++++ .../impl/region/DeleteRegionProcedure.java | 229 +++++++++++++++++ .../impl/schema/DeleteDatabaseProcedure.java | 100 ++------ ...teDatabaseState.java => CreateRegionState.java} | 8 +- ...teDatabaseState.java => DeleteRegionState.java} | 8 +- .../state/schema/DeleteDatabaseState.java | 5 +- .../procedure/store/ProcedureFactory.java | 12 + .../confignode/procedure/store/ProcedureType.java | 2 + .../confignode/persistence/PartitionInfoTest.java | 43 ++++ .../impl/region/CreateRegionProcedureTest.java | 64 +++++ .../impl/region/DeleteRegionProcedureTest.java | 58 +++++ .../apache/iotdb/db/i18n/DataNodeMiscMessages.java | 2 + .../apache/iotdb/db/i18n/DataNodeMiscMessages.java | 2 + .../impl/DataNodeInternalRPCServiceImpl.java | 22 ++ .../iotdb/db/service/RegionMigrateService.java | 162 ++++++++++++ .../src/main/thrift/datanode.thrift | 14 + 28 files changed, 1039 insertions(+), 462 deletions(-) diff --git a/iotdb-core/confignode/src/main/i18n/en/org/apache/iotdb/confignode/i18n/ProcedureMessages.java b/iotdb-core/confignode/src/main/i18n/en/org/apache/iotdb/confignode/i18n/ProcedureMessages.java index f4ae087b159..a8d76910f5f 100644 --- a/iotdb-core/confignode/src/main/i18n/en/org/apache/iotdb/confignode/i18n/ProcedureMessages.java +++ b/iotdb-core/confignode/src/main/i18n/en/org/apache/iotdb/confignode/i18n/ProcedureMessages.java @@ -616,6 +616,28 @@ public final class ProcedureMessages { public static final String PID_ADDREGION_STATE_FAILED = "[pid{}][AddRegion] state {} failed"; public static final String PID_ADDREGION_SUCCESS_HAS_BEEN_ADDED_TO_DATANODE_PROCEDURE_TOOK = "[pid{}][AddRegion] success, {} has been added to DataNode {}. Procedure took {} (start at {})."; + public static final String PID_CREATEREGION_STARTED_WILL_BE_CREATED_ON_DATANODE = + "[pid{}][CreateRegion] started, region {} will be created on DataNode {}."; + public static final String PID_CREATEREGION_STATE_COMPLETE = + "[pid{}][CreateRegion] state {} complete"; + public static final String PID_CREATEREGION_STATE_FAILED = + "[pid{}][CreateRegion] state {} failed"; + public static final String PID_CREATEREGION_CREATE_FAILED_WILL_RETRY = + "[pid{}][CreateRegion] failed to create region {} on DataNode {} (attempt {}), will retry. status: {}"; + public static final String PID_CREATEREGION_SUCCESS_HAS_BEEN_CREATED_ON_DATANODE_PROCEDURE_TOOK = + "[pid{}][CreateRegion] success, region {} has been created on DataNode {}. Procedure took {} (started at {})."; + public static final String PID_DELETEREGION_STARTED_WILL_BE_DELETED_FROM_DATANODE = + "[pid{}][DeleteRegion] started, region {} will be deleted from DataNode {}."; + public static final String PID_DELETEREGION_STATE_COMPLETE = + "[pid{}][DeleteRegion] state {} complete"; + public static final String PID_DELETEREGION_STATE_FAILED = + "[pid{}][DeleteRegion] state {} failed"; + public static final String PID_DELETEREGION_SUBMIT_TASK_FAILED = + "[pid{}][DeleteRegion] failed to submit the delete task for region {} to DataNode {}. status: {}"; + public static final String PID_DELETEREGION_EXECUTED_FAILED = + "[pid{}][DeleteRegion] failed to delete region {} from DataNode {}, task status is {}. You may need to manually clean up the region files."; + public static final String PID_DELETEREGION_SUCCESS_REGION_HAS_BEEN_DELETED_FROM_DATANODE_PROCEDURE = + "[pid{}][DeleteRegion] success, region {} has been deleted from DataNode {}. Procedure took {} (started at {})."; public static final String PID_MIGRATEREGION_STARTED_WILL_BE_MIGRATED_FROM_DATANODE_TO = "[pid{}][MigrateRegion] started, {} will be migrated from DataNode {} to {}."; public static final String PID_MIGRATEREGION_STATE_COMPLETE = diff --git a/iotdb-core/confignode/src/main/i18n/zh/org/apache/iotdb/confignode/i18n/ProcedureMessages.java b/iotdb-core/confignode/src/main/i18n/zh/org/apache/iotdb/confignode/i18n/ProcedureMessages.java index 126055a7c57..a3ed7b44d50 100644 --- a/iotdb-core/confignode/src/main/i18n/zh/org/apache/iotdb/confignode/i18n/ProcedureMessages.java +++ b/iotdb-core/confignode/src/main/i18n/zh/org/apache/iotdb/confignode/i18n/ProcedureMessages.java @@ -614,6 +614,24 @@ public final class ProcedureMessages { public static final String PID_ADDREGION_STATE_FAILED = "[pid{}][AddRegion] state {} failed"; public static final String PID_ADDREGION_SUCCESS_HAS_BEEN_ADDED_TO_DATANODE_PROCEDURE_TOOK = "[pid{}][AddRegion] success, {} has been added to DataNode {}. Procedure took {} (start at {})."; + public static final String PID_CREATEREGION_STARTED_WILL_BE_CREATED_ON_DATANODE = + "[pid{}][CreateRegion] 开始,region {} 将在 DataNode {} 上创建。"; + public static final String PID_CREATEREGION_STATE_COMPLETE = "[pid{}][CreateRegion] 状态 {} 完成"; + public static final String PID_CREATEREGION_STATE_FAILED = "[pid{}][CreateRegion] 状态 {} 失败"; + public static final String PID_CREATEREGION_CREATE_FAILED_WILL_RETRY = + "[pid{}][CreateRegion] 创建 region {} 到 DataNode {} 失败(第 {} 次尝试),将重试。status: {}"; + public static final String PID_CREATEREGION_SUCCESS_HAS_BEEN_CREATED_ON_DATANODE_PROCEDURE_TOOK = + "[pid{}][CreateRegion] 成功,region {} 已在 DataNode {} 上创建。过程耗时 {}(开始于 {})。"; + public static final String PID_DELETEREGION_STARTED_WILL_BE_DELETED_FROM_DATANODE = + "[pid{}][DeleteRegion] 开始,region {} 将从 DataNode {} 上删除。"; + public static final String PID_DELETEREGION_STATE_COMPLETE = "[pid{}][DeleteRegion] 状态 {} 完成"; + public static final String PID_DELETEREGION_STATE_FAILED = "[pid{}][DeleteRegion] 状态 {} 失败"; + public static final String PID_DELETEREGION_SUBMIT_TASK_FAILED = + "[pid{}][DeleteRegion] 提交 region {} 的删除任务到 DataNode {} 失败。status: {}"; + public static final String PID_DELETEREGION_EXECUTED_FAILED = + "[pid{}][DeleteRegion] 从 DataNode {} 删除 region {} 失败,任务状态为 {}。你可能需要手动清理 region 文件。"; + public static final String PID_DELETEREGION_SUCCESS_REGION_HAS_BEEN_DELETED_FROM_DATANODE_PROCEDURE = + "[pid{}][DeleteRegion] 成功,region {} 已从 DataNode {} 上删除。过程耗时 {}(开始于 {})。"; public static final String PID_MIGRATEREGION_STARTED_WILL_BE_MIGRATED_FROM_DATANODE_TO = "[pid{}][MigrateRegion] started, {} will be migrated from DataNode {} to {}."; public static final String PID_MIGRATEREGION_STATE_COMPLETE = diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/CnToDnSyncRequestType.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/CnToDnSyncRequestType.java index 790fd637d61..566e8728dbe 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/CnToDnSyncRequestType.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/CnToDnSyncRequestType.java @@ -35,6 +35,7 @@ public enum CnToDnSyncRequestType { ADD_REGION_PEER, REMOVE_REGION_PEER, DELETE_OLD_REGION_PEER, + DELETE_REGION_ASYNC, RESET_PEER_LIST, // Data Partition Table Maintenance diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncDataNodeClientPool.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncDataNodeClientPool.java index 14c833fb2fc..4ec13cd9089 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncDataNodeClientPool.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncDataNodeClientPool.java @@ -133,6 +133,9 @@ public class SyncDataNodeClientPool { actionMapBuilder.put( CnToDnSyncRequestType.DELETE_OLD_REGION_PEER, (req, client) -> client.deleteOldRegionPeer((TMaintainPeerReq) req)); + actionMapBuilder.put( + CnToDnSyncRequestType.DELETE_REGION_ASYNC, + (req, client) -> client.deleteRegionAsync((TMaintainPeerReq) req)); actionMapBuilder.put( CnToDnSyncRequestType.RESET_PEER_LIST, (req, client) -> client.resetPeerList((TResetPeerListReq) req)); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java index dce1db12cd0..ac47840c873 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java @@ -62,11 +62,17 @@ public enum ConfigPhysicalPlanType { GetRegionInfoList((short) 302), @Deprecated UpdateRegionLocation((short) 303), + // Deprecated: the RegionMaintainer queue has been replaced by Create/DeleteRegionProcedure. These + // plan types are kept (and still deserialized + applied as no-ops) only so an old ConfigNode + // consensus log can be replayed during an upgrade. + @Deprecated OfferRegionMaintainTasks((short) 304), + @Deprecated PollRegionMaintainTask((short) 305), GetRegionId((short) 306), GetSeriesSlotList((short) 307), GetTimeSlotList((short) 308), + @Deprecated PollSpecificRegionMaintainTask((short) 309), CountTimeSlotList((short) 310), AddRegionLocation((short) 311), diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java index ad0a82bcf56..f0e24ad4d1a 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java @@ -380,8 +380,6 @@ public class ConfigRegionStateMachine implements IStateMachine, IStateMachine.Ev new LeaderServiceStartup( "RetryFailedTasksService", () -> configManager.getRetryFailedTasksThread().startRetryFailedTasksService()), - new LeaderServiceStartup( - "RegionCleaner", () -> configManager.getPartitionManager().startRegionCleaner()), // Add metrics after leader ready. new LeaderServiceStartup("Metrics", () -> configManager.addMetrics()), // Activate leader related service for config pipe. @@ -426,7 +424,6 @@ public class ConfigRegionStateMachine implements IStateMachine, IStateMachine.Ev configManager.getLoadManager().stopLoadServices(); configManager.getProcedureManager().stopExecutor(); configManager.getRetryFailedTasksThread().stopRetryFailedTasksService(); - configManager.getPartitionManager().stopRegionCleaner(); configManager.getCQManager().stopCQScheduler(); configManager.getClusterSchemaManager().clearSchemaQuotaCache(); // Remove Metric after leader change diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java index da10bb22283..544da2aaf89 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java @@ -456,9 +456,6 @@ public class ConfigManager implements IManager { } public void close() throws IOException { - if (partitionManager != null) { - partitionManager.getRegionMaintainer().shutdown(); - } if (procedureManager != null) { procedureManager.stopExecutor(); } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java index 2beeea6b4af..05c4357934d 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java @@ -56,7 +56,6 @@ import org.apache.iotdb.confignode.consensus.request.write.datanode.RemoveDataNo import org.apache.iotdb.confignode.consensus.request.write.procedure.UpdateProcedurePlan; import org.apache.iotdb.confignode.consensus.request.write.region.CreateRegionGroupsPlan; import org.apache.iotdb.confignode.i18n.ManagerMessages; -import org.apache.iotdb.confignode.manager.partition.PartitionManager; import org.apache.iotdb.confignode.persistence.ProcedureInfo; import org.apache.iotdb.confignode.procedure.PartitionTableAutoCleaner; import org.apache.iotdb.confignode.procedure.Procedure; @@ -317,9 +316,8 @@ public class ProcedureManager { } List<TSStatus> results = new ArrayList<>(procedures.size()); procedures.forEach(procedure -> results.add(waitingProcedureFinished(procedure))); - // Clear the previously deleted regions - final PartitionManager partitionManager = getConfigManager().getPartitionManager(); - partitionManager.getRegionMaintainer().submit(partitionManager::maintainRegionReplicas); + // Region deletion is now handled by the DeleteRegionProcedure children spawned inside each + // DeleteDatabaseProcedure, so there is no longer a RegionMaintainer queue to kick here. if (results.stream() .allMatch(result -> result.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode())) { return StatusUtils.OK; diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java index 0edb0e389dc..766202afd39 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java @@ -27,17 +27,11 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot; import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot; import org.apache.iotdb.commons.cluster.RegionRoleType; -import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory; -import org.apache.iotdb.commons.concurrent.ThreadName; -import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil; import org.apache.iotdb.commons.conf.CommonConfig; import org.apache.iotdb.commons.conf.CommonDescriptor; import org.apache.iotdb.commons.partition.DataPartitionTable; import org.apache.iotdb.commons.partition.SchemaPartitionTable; import org.apache.iotdb.commons.partition.executor.SeriesPartitionExecutor; -import org.apache.iotdb.confignode.client.async.CnToDnAsyncRequestType; -import org.apache.iotdb.confignode.client.async.CnToDnInternalServiceAsyncRequestManager; -import org.apache.iotdb.confignode.client.async.handlers.DataNodeAsyncRequestContext; import org.apache.iotdb.confignode.conf.ConfigNodeConfig; import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor; import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan; @@ -58,7 +52,6 @@ import org.apache.iotdb.confignode.consensus.request.write.partition.CreateDataP import org.apache.iotdb.confignode.consensus.request.write.partition.CreateSchemaPartitionPlan; import org.apache.iotdb.confignode.consensus.request.write.partition.RemoveRegionLocationPlan; import org.apache.iotdb.confignode.consensus.request.write.region.CreateRegionGroupsPlan; -import org.apache.iotdb.confignode.consensus.request.write.region.PollSpecificRegionMaintainTaskPlan; import org.apache.iotdb.confignode.consensus.response.partition.CountTimeSlotListResp; import org.apache.iotdb.confignode.consensus.response.partition.DataPartitionResp; import org.apache.iotdb.confignode.consensus.response.partition.GetRegionGroupsByTimeResp; @@ -80,10 +73,6 @@ import org.apache.iotdb.confignode.manager.load.LoadManager; import org.apache.iotdb.confignode.manager.node.NodeManager; import org.apache.iotdb.confignode.manager.schema.ClusterSchemaManager; import org.apache.iotdb.confignode.persistence.partition.PartitionInfo; -import org.apache.iotdb.confignode.persistence.partition.maintainer.RegionCreateTask; -import org.apache.iotdb.confignode.persistence.partition.maintainer.RegionDeleteTask; -import org.apache.iotdb.confignode.persistence.partition.maintainer.RegionMaintainTask; -import org.apache.iotdb.confignode.persistence.partition.maintainer.RegionMaintainType; import org.apache.iotdb.confignode.procedure.impl.partition.DataPartitionTableIntegrityCheckProcedure; import org.apache.iotdb.confignode.rpc.thrift.TCountTimeSlotListReq; import org.apache.iotdb.confignode.rpc.thrift.TGetRegionGroupsByTimeReq; @@ -92,8 +81,6 @@ import org.apache.iotdb.confignode.rpc.thrift.TGetSeriesSlotListReq; import org.apache.iotdb.confignode.rpc.thrift.TGetTimeSlotListReq; import org.apache.iotdb.confignode.rpc.thrift.TTimeSlotList; import org.apache.iotdb.consensus.exception.ConsensusException; -import org.apache.iotdb.mpp.rpc.thrift.TCreateDataRegionReq; -import org.apache.iotdb.mpp.rpc.thrift.TCreateSchemaRegionReq; import org.apache.iotdb.rpc.RpcUtils; import org.apache.iotdb.rpc.TSStatusCode; @@ -106,21 +93,14 @@ import org.slf4j.LoggerFactory; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; import java.util.LinkedHashMap; -import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; -import java.util.Queue; import java.util.Set; import java.util.StringJoiner; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.Future; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; @@ -144,25 +124,12 @@ public class PartitionManager { public static final String CONSENSUS_WRITE_ERROR = "Failed in the write API executing the consensus layer due to: "; - // Monitor for leadership change - private final Object scheduleMonitor = new Object(); - - /** Region cleaner. */ - // Try to delete Regions in every 10s - private static final int REGION_MAINTAINER_WORK_INTERVAL = 10; - - private final ScheduledExecutorService regionMaintainer; - private Future<?> currentRegionMaintainerFuture; - private final AtomicBoolean dataPartitionTableIntegrityCheckProcedureRunning = new AtomicBoolean(false); public PartitionManager(IManager configManager, PartitionInfo partitionInfo) { this.configManager = configManager; this.partitionInfo = partitionInfo; - this.regionMaintainer = - IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor( - ThreadName.CONFIG_NODE_REGION_MAINTAINER.getName()); setSeriesPartitionExecutor(); } @@ -1319,250 +1286,6 @@ public class PartitionManager { return partitionInfo.getRegionDatabase(regionId); } - /** - * Called by {@link PartitionManager#regionMaintainer}. - * - * <p>Periodically maintain the RegionReplicas to be created or deleted - */ - public void maintainRegionReplicas() { - // The consensusManager of configManager may not be fully initialized at this time - Optional.ofNullable(getConsensusManager()) - .ifPresent( - consensusManager -> { - if (getConsensusManager().isLeader()) { - List<RegionMaintainTask> regionMaintainTaskList = - partitionInfo.getRegionMaintainEntryList(); - - if (regionMaintainTaskList.isEmpty()) { - return; - } - - // Group tasks by region id - Map<TConsensusGroupId, Queue<RegionMaintainTask>> regionMaintainTaskMap = - new HashMap<>(); - for (RegionMaintainTask regionMaintainTask : regionMaintainTaskList) { - regionMaintainTaskMap - .computeIfAbsent(regionMaintainTask.getRegionId(), k -> new LinkedList<>()) - .add(regionMaintainTask); - } - - while (!regionMaintainTaskMap.isEmpty()) { - // Select same type task from each region group - List<RegionMaintainTask> selectedRegionMaintainTask = new ArrayList<>(); - RegionMaintainType currentType = null; - for (Map.Entry<TConsensusGroupId, Queue<RegionMaintainTask>> entry : - regionMaintainTaskMap.entrySet()) { - RegionMaintainTask regionMaintainTask = entry.getValue().peek(); - if (regionMaintainTask == null) { - continue; - } - - if (currentType == null) { - currentType = regionMaintainTask.getType(); - selectedRegionMaintainTask.add(entry.getValue().peek()); - } else { - if (!currentType.equals(regionMaintainTask.getType())) { - continue; - } - - if (currentType.equals(RegionMaintainType.DELETE) - || entry - .getKey() - .getType() - .equals(selectedRegionMaintainTask.get(0).getRegionId().getType())) { - // Delete or same create task - selectedRegionMaintainTask.add(entry.getValue().peek()); - } - } - } - - if (selectedRegionMaintainTask.isEmpty()) { - break; - } - - Set<TConsensusGroupId> successfulTask = new HashSet<>(); - switch (currentType) { - case CREATE: - // create region - switch (selectedRegionMaintainTask.get(0).getRegionId().getType()) { - case SchemaRegion: - // create SchemaRegion - DataNodeAsyncRequestContext<TCreateSchemaRegionReq, TSStatus> - createSchemaRegionHandler = - new DataNodeAsyncRequestContext<>( - CnToDnAsyncRequestType.CREATE_SCHEMA_REGION); - for (RegionMaintainTask regionMaintainTask : selectedRegionMaintainTask) { - RegionCreateTask schemaRegionCreateTask = - (RegionCreateTask) regionMaintainTask; - LOGGER.info( - ManagerMessages.START_TO_CREATE_REGION_ON_DATANODE, - schemaRegionCreateTask.getRegionReplicaSet().getRegionId(), - schemaRegionCreateTask.getTargetDataNode()); - createSchemaRegionHandler.putRequest( - schemaRegionCreateTask.getRegionId().getId(), - new TCreateSchemaRegionReq( - schemaRegionCreateTask.getRegionReplicaSet(), - schemaRegionCreateTask.getStorageGroup())); - createSchemaRegionHandler.putNodeLocation( - schemaRegionCreateTask.getRegionId().getId(), - schemaRegionCreateTask.getTargetDataNode()); - } - - CnToDnInternalServiceAsyncRequestManager.getInstance() - .sendAsyncRequestWithRetry(createSchemaRegionHandler); - - for (Map.Entry<Integer, TSStatus> entry : - createSchemaRegionHandler.getResponseMap().entrySet()) { - if (entry.getValue().getCode() - == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { - successfulTask.add( - new TConsensusGroupId( - TConsensusGroupType.SchemaRegion, entry.getKey())); - } - } - break; - case DataRegion: - // Create DataRegion - DataNodeAsyncRequestContext<TCreateDataRegionReq, TSStatus> - createDataRegionHandler = - new DataNodeAsyncRequestContext<>( - CnToDnAsyncRequestType.CREATE_DATA_REGION); - for (RegionMaintainTask regionMaintainTask : selectedRegionMaintainTask) { - RegionCreateTask dataRegionCreateTask = - (RegionCreateTask) regionMaintainTask; - LOGGER.info( - ManagerMessages.START_TO_CREATE_REGION_ON_DATANODE, - dataRegionCreateTask.getRegionReplicaSet().getRegionId(), - dataRegionCreateTask.getTargetDataNode()); - createDataRegionHandler.putRequest( - dataRegionCreateTask.getRegionId().getId(), - new TCreateDataRegionReq( - dataRegionCreateTask.getRegionReplicaSet(), - dataRegionCreateTask.getStorageGroup())); - createDataRegionHandler.putNodeLocation( - dataRegionCreateTask.getRegionId().getId(), - dataRegionCreateTask.getTargetDataNode()); - } - - CnToDnInternalServiceAsyncRequestManager.getInstance() - .sendAsyncRequestWithRetry(createDataRegionHandler); - - for (Map.Entry<Integer, TSStatus> entry : - createDataRegionHandler.getResponseMap().entrySet()) { - if (entry.getValue().getCode() - == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { - successfulTask.add( - new TConsensusGroupId( - TConsensusGroupType.DataRegion, entry.getKey())); - } - } - break; - } - break; - case DELETE: - // delete region - DataNodeAsyncRequestContext<TConsensusGroupId, TSStatus> deleteRegionHandler = - new DataNodeAsyncRequestContext<>(CnToDnAsyncRequestType.DELETE_REGION); - Map<Integer, TConsensusGroupId> regionIdMap = new HashMap<>(); - for (RegionMaintainTask regionMaintainTask : selectedRegionMaintainTask) { - RegionDeleteTask regionDeleteTask = (RegionDeleteTask) regionMaintainTask; - LOGGER.info( - ManagerMessages.START_TO_DELETE_REGION_ON_DATANODE, - regionDeleteTask.getRegionId(), - regionDeleteTask.getTargetDataNode()); - deleteRegionHandler.putRequest( - regionDeleteTask.getRegionId().getId(), regionDeleteTask.getRegionId()); - deleteRegionHandler.putNodeLocation( - regionDeleteTask.getRegionId().getId(), - regionDeleteTask.getTargetDataNode()); - regionIdMap.put( - regionDeleteTask.getRegionId().getId(), regionDeleteTask.getRegionId()); - } - - long startTime = System.currentTimeMillis(); - CnToDnInternalServiceAsyncRequestManager.getInstance() - .sendAsyncRequestWithRetry(deleteRegionHandler); - - LOGGER.info( - ManagerMessages.DELETING_REGIONS_COSTS_MS, - (System.currentTimeMillis() - startTime)); - - for (Map.Entry<Integer, TSStatus> entry : - deleteRegionHandler.getResponseMap().entrySet()) { - if (entry.getValue().getCode() - == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { - successfulTask.add(regionIdMap.get(entry.getKey())); - } - } - break; - } - - if (successfulTask.isEmpty()) { - break; - } - - for (TConsensusGroupId regionId : successfulTask) { - regionMaintainTaskMap.compute( - regionId, - (k, v) -> { - if (v == null) { - throw new IllegalStateException(); - } - v.poll(); - if (v.isEmpty()) { - return null; - } else { - return v; - } - }); - } - - // Poll the head entry if success - try { - getConsensusManager() - .write(new PollSpecificRegionMaintainTaskPlan(successfulTask)); - } catch (ConsensusException e) { - LOGGER.warn(CONSENSUS_WRITE_ERROR, e); - } - - if (successfulTask.size() < selectedRegionMaintainTask.size()) { - // Here we just break and wait until next schedule task - // due to all the RegionMaintainEntry should be executed by - // the order of they were offered - break; - } - } - } - }); - } - - public void startRegionCleaner() { - synchronized (scheduleMonitor) { - if (currentRegionMaintainerFuture == null) { - /* Start the RegionCleaner service */ - currentRegionMaintainerFuture = - ScheduledExecutorUtil.safelyScheduleAtFixedRate( - regionMaintainer, - this::maintainRegionReplicas, - 0, - REGION_MAINTAINER_WORK_INTERVAL, - TimeUnit.SECONDS); - LOGGER.info(ManagerMessages.REGIONCLEANER_IS_STARTED_SUCCESSFULLY); - } - } - } - - public void stopRegionCleaner() { - synchronized (scheduleMonitor) { - if (currentRegionMaintainerFuture != null) { - /* Stop the RegionCleaner service */ - currentRegionMaintainerFuture.cancel(false); - currentRegionMaintainerFuture = null; - LOGGER.info(ManagerMessages.REGIONCLEANER_IS_STOPPED_SUCCESSFULLY); - } - } - } - /** * Filter the RegionGroups in the specified Database through the RegionGroupStatus. * @@ -1607,10 +1330,6 @@ public class PartitionManager { return partitionInfo.getLastDataAllotTable(database); } - public ScheduledExecutorService getRegionMaintainer() { - return regionMaintainer; - } - private ConsensusManager getConsensusManager() { return configManager.getConsensusManager(); } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java index af9429e5953..457f21358cb 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java @@ -131,9 +131,13 @@ public class PartitionInfo implements SnapshotProcessor { // For table model databases: The databaseName is a full name without "root." private final Map<String, DatabasePartitionTable> databasePartitionTables; - /** For Region-Maintainer. */ - // For RegionReplicas' asynchronous management - private final List<RegionMaintainTask> regionMaintainTaskList; + /** + * Legacy RegionMaintainer queue. The queue has been replaced by {@code CreateRegionProcedure} / + * {@code DeleteRegionProcedure}; this list is now always empty (offer/poll are no-op shims kept + * for consensus-log replay during upgrade) and is retained only so {@link #equals}/{@link + * #hashCode}/{@link #clear} keep working without touching the snapshot byte layout. + */ + @Deprecated private final List<RegionMaintainTask> regionMaintainTaskList; private static final String SNAPSHOT_FILENAME = "partition_info.bin"; @@ -228,63 +232,40 @@ public class PartitionInfo implements SnapshotProcessor { * Offer a batch of RegionMaintainTasks for the RegionMaintainer. * * @return {@link TSStatusCode#SUCCESS_STATUS} + * @deprecated The RegionMaintainer queue has been replaced by {@code CreateRegionProcedure} / + * {@code DeleteRegionProcedure}. This method is retained only so that an + * OfferRegionMaintainTasksPlan still present in an old ConfigNode consensus log can be + * replayed during an upgrade; it intentionally does nothing now (the tasks are no longer + * drained by anyone). */ + @Deprecated public TSStatus offerRegionMaintainTasks( OfferRegionMaintainTasksPlan offerRegionMaintainTasksPlan) { - synchronized (regionMaintainTaskList) { - regionMaintainTaskList.addAll(offerRegionMaintainTasksPlan.getRegionMaintainTaskList()); - return RpcUtils.SUCCESS_STATUS; - } + // No-op: tasks are no longer queued. See @deprecated note above. + return RpcUtils.SUCCESS_STATUS; } /** - * Poll the head of RegionMaintainTasks from the regionMaintainTaskList after it's executed - * successfully. - * * @return {@link TSStatusCode#SUCCESS_STATUS} + * @deprecated Retained only for replaying old PollRegionMaintainTaskPlan consensus-log entries + * during an upgrade. See {@link #offerRegionMaintainTasks}. */ + @Deprecated public TSStatus pollRegionMaintainTask() { - synchronized (regionMaintainTaskList) { - regionMaintainTaskList.remove(0); - return RpcUtils.SUCCESS_STATUS; - } + // No-op: the queue is always empty now. See @deprecated note above. + return RpcUtils.SUCCESS_STATUS; } /** - * Poll the head of RegionMaintainTasks of target regions from regionMaintainTaskList after they - * are executed successfully. Tasks of each region group are treated as single independent queue. - * * @param plan provides target region ids * @return {@link TSStatusCode#SUCCESS_STATUS} + * @deprecated Retained only for replaying old PollSpecificRegionMaintainTaskPlan consensus-log + * entries during an upgrade. See {@link #offerRegionMaintainTasks}. */ + @Deprecated public TSStatus pollSpecificRegionMaintainTask(PollSpecificRegionMaintainTaskPlan plan) { - synchronized (regionMaintainTaskList) { - Set<TConsensusGroupId> removingRegionIdSet = new HashSet<>(plan.getRegionIdSet()); - TConsensusGroupId regionId; - for (int i = 0; i < regionMaintainTaskList.size(); i++) { - regionId = regionMaintainTaskList.get(i).getRegionId(); - if (removingRegionIdSet.contains(regionId)) { - regionMaintainTaskList.remove(i); - removingRegionIdSet.remove(regionId); - i--; - } - if (removingRegionIdSet.isEmpty()) { - break; - } - } - return RpcUtils.SUCCESS_STATUS; - } - } - - /** - * Get a deep copy of RegionCleanList for RegionCleaner to maintain cluster RegionReplicas. - * - * @return A deep copy of RegionCleanList - */ - public List<RegionMaintainTask> getRegionMaintainEntryList() { - synchronized (regionMaintainTaskList) { - return new ArrayList<>(regionMaintainTaskList); - } + // No-op: the queue is always empty now. See @deprecated note above. + return RpcUtils.SUCCESS_STATUS; } /** @@ -1010,11 +991,11 @@ public class PartitionInfo implements SnapshotProcessor { databasePartitionTableEntry.getValue().serialize(bufferedOutputStream, protocol); } - // serialize regionCleanList - ReadWriteIOUtils.write(regionMaintainTaskList.size(), bufferedOutputStream); - for (RegionMaintainTask task : regionMaintainTaskList) { - task.serialize(bufferedOutputStream, protocol); - } + // The trailing RegionMaintainer queue block is kept in the snapshot format for backward + // compatibility, but the queue itself has been replaced by Create/DeleteRegionProcedure. + // Always write an empty list so the byte layout is unchanged and an old ConfigNode reading + // this snapshot still finds a well-formed (empty) block. + ReadWriteIOUtils.write(0, bufferedOutputStream); // write to file tioStreamTransport.flush(); @@ -1073,12 +1054,31 @@ public class PartitionInfo implements SnapshotProcessor { databasePartitionTables.put(database, databasePartitionTable); } - // restore deletedRegionSet + // Read the trailing RegionMaintainer queue block. The queue has been replaced by + // Create/DeleteRegionProcedure, but an OLD snapshot (taken before the upgrade) may have + // written pending tasks here, so we must still consume exactly those bytes to keep the stream + // aligned. The tasks are discarded because nothing drains the queue anymore. To make any such + // loss observable, we log a WARN naming the dropped tasks; an operator can then re-trigger + // the + // corresponding region creation/deletion if it did not otherwise converge. length = ReadWriteIOUtils.readInt(fileInputStream); + final List<RegionMaintainTask> droppedTasks = new ArrayList<>(); for (int i = 0; i < length; i++) { - final RegionMaintainTask task = - RegionMaintainTask.Factory.create(fileInputStream, protocol); - regionMaintainTaskList.add(task); + // Advance the stream past one serialized task and drop it. + droppedTasks.add(RegionMaintainTask.Factory.create(fileInputStream, protocol)); + } + if (!droppedTasks.isEmpty()) { + final List<String> droppedTaskDescriptions = new ArrayList<>(); + for (final RegionMaintainTask task : droppedTasks) { + droppedTaskDescriptions.add(task.getType() + " " + task.getRegionId()); + } + LOGGER.warn( + "Dropped {} legacy RegionMaintainTask(s) while loading the snapshot; the RegionMaintainer " + + "queue has been replaced by Create/DeleteRegionProcedure and these pending tasks are " + + "no longer executed: {}. Please verify the affected regions and re-trigger their " + + "creation/deletion manually if needed.", + droppedTasks.size(), + droppedTaskDescriptions); } } } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/RegionMaintainHandler.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/RegionMaintainHandler.java index 9ce0c9f72a3..37ca8fe8d85 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/RegionMaintainHandler.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/RegionMaintainHandler.java @@ -53,7 +53,9 @@ import org.apache.iotdb.confignode.manager.load.cache.consensus.ConsensusGroupHe import org.apache.iotdb.confignode.procedure.exception.ProcedureException; import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq; import org.apache.iotdb.consensus.pipe.consensuspipe.ConsensusPipeName; +import org.apache.iotdb.mpp.rpc.thrift.TCreateDataRegionReq; import org.apache.iotdb.mpp.rpc.thrift.TCreatePeerReq; +import org.apache.iotdb.mpp.rpc.thrift.TCreateSchemaRegionReq; import org.apache.iotdb.mpp.rpc.thrift.TMaintainPeerReq; import org.apache.iotdb.mpp.rpc.thrift.TRegionLeaderChangeResp; import org.apache.iotdb.mpp.rpc.thrift.TRegionMigrateResult; @@ -218,6 +220,37 @@ public class RegionMaintainHandler { return status; } + /** + * Synchronously create a single region replica on the target DataNode. + * + * <p>Used by {@code CreateRegionProcedure}. Unlike region deletion, creating a region returns a + * final, unambiguous status from a single RPC (it does not remove large amounts of data), so it + * is kept synchronous; wrapping it in a procedure only adds persistence and retry across leader + * changes. + * + * @param regionReplicaSet the region (carries the regionId and its type) + * @param storageGroup the database the region belongs to + * @param targetDataNode the DataNode on which to create the replica + * @return the TSStatus of the create RPC + */ + public TSStatus createRegion( + TRegionReplicaSet regionReplicaSet, String storageGroup, TDataNodeLocation targetDataNode) { + final TConsensusGroupId regionId = regionReplicaSet.getRegionId(); + final Object req; + final CnToDnSyncRequestType requestType; + if (TConsensusGroupType.SchemaRegion.equals(regionId.getType())) { + req = new TCreateSchemaRegionReq(regionReplicaSet, storageGroup); + requestType = CnToDnSyncRequestType.CREATE_SCHEMA_REGION; + } else { + req = new TCreateDataRegionReq(regionReplicaSet, storageGroup); + requestType = CnToDnSyncRequestType.CREATE_DATA_REGION; + } + return (TSStatus) + SyncDataNodeClientPool.getInstance() + .sendSyncRequestToDataNodeWithRetry( + targetDataNode.getInternalEndPoint(), req, requestType); + } + /** * Order the specific ConsensusGroup to add peer for the new RegionReplica. * @@ -332,6 +365,52 @@ public class RegionMaintainHandler { return status; } + /** + * Order the specified DataNode to asynchronously delete a region replica and all of its data. + * + * <p>The DataNode submits the deletion as a background task keyed by {@code procedureId} and + * returns immediately. The caller (a {@code DeleteRegionProcedure}) then polls progress via + * {@link #waitTaskFinish(long, TDataNodeLocation)}. This is the replacement for the old + * synchronous {@code deleteRegion} RPC, which could time out on slow deletions and let the + * ConfigNode's retry be wrongly reported as success. + * + * @param procedureId used as the taskId so the DataNode can dedup retried submits and the + * ConfigNode can poll the result + * @param targetDataNode the DataNode that holds the replica to be deleted + * @param regionId region id + * @return TSStatus of the submit (not of the deletion itself) + */ + public TSStatus submitDeleteRegionTask( + long procedureId, TDataNodeLocation targetDataNode, TConsensusGroupId regionId) { + TMaintainPeerReq maintainPeerReq = new TMaintainPeerReq(regionId, targetDataNode, procedureId); + + // The target DataNode may be down (e.g. while deleting a database whose replica sits on an + // unreachable node). Fall back to a single-shot retry in that case, mirroring + // submitDeleteOldRegionPeerTask, so we do not block on an endless retry of a dead node. + final NodeStatus nodeStatus = getDataNodeStatus(targetDataNode.getDataNodeId()); + final boolean useFullRetry = !NodeStatus.Unknown.equals(nodeStatus); + if (!useFullRetry) { + LOGGER.info( + ProcedureMessages.DATANODE_IS_SUBMIT_DELETE_OLD_REGION_PEER_WITH_A_SINGLE, + REGION_MIGRATE_PROCESS, + simplifiedLocation(targetDataNode), + nodeStatus); + } + + TSStatus status = + submitDataNodeSyncRequest( + targetDataNode.getInternalEndPoint(), + maintainPeerReq, + CnToDnSyncRequestType.DELETE_REGION_ASYNC, + useFullRetry); + LOGGER.info( + ProcedureMessages.SEND_ACTION_DELETEOLDREGIONPEER_FINISHED_REGIONID_DATANODEID, + REGION_MIGRATE_PROCESS, + regionId, + targetDataNode.getInternalEndPoint()); + return status; + } + protected NodeStatus getDataNodeStatus(int dataNodeId) { return configManager.getLoadManager().getNodeStatus(dataNodeId); } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/CreateRegionGroupsProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/CreateRegionGroupsProcedure.java index e9cce807e77..850fd8bb69b 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/CreateRegionGroupsProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/CreateRegionGroupsProcedure.java @@ -31,18 +31,14 @@ import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils; import org.apache.iotdb.confignode.conf.ConfigNodeConfig; import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor; import org.apache.iotdb.confignode.consensus.request.write.region.CreateRegionGroupsPlan; -import org.apache.iotdb.confignode.consensus.request.write.region.OfferRegionMaintainTasksPlan; -import org.apache.iotdb.confignode.i18n.ConfigNodeMessages; import org.apache.iotdb.confignode.i18n.ProcedureMessages; import org.apache.iotdb.confignode.manager.load.cache.region.RegionHeartbeatSample; -import org.apache.iotdb.confignode.persistence.partition.maintainer.RegionCreateTask; -import org.apache.iotdb.confignode.persistence.partition.maintainer.RegionDeleteTask; +import org.apache.iotdb.confignode.procedure.Procedure; import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv; import org.apache.iotdb.confignode.procedure.exception.ProcedureException; import org.apache.iotdb.confignode.procedure.impl.StateMachineProcedure; import org.apache.iotdb.confignode.procedure.state.CreateRegionGroupsState; import org.apache.iotdb.confignode.procedure.store.ProcedureType; -import org.apache.iotdb.consensus.exception.ConsensusException; import org.apache.iotdb.rpc.TSStatusCode; import org.slf4j.Logger; @@ -51,7 +47,9 @@ import org.slf4j.LoggerFactory; import java.io.DataOutputStream; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; @@ -106,7 +104,12 @@ public class CreateRegionGroupsProcedure break; case SHUNT_REGION_REPLICAS: persistPlan = new CreateRegionGroupsPlan(); - final OfferRegionMaintainTasksPlan offerPlan = new OfferRegionMaintainTasksPlan(); + // Recreate-failed-replica and delete-redundant-replica tasks were previously offered to the + // RegionMaintainer queue. They are now run as child procedures (CreateRegionProcedure / + // DeleteRegionProcedure) so they are persisted, retried across leader changes, and — for + // the + // delete case — polled to completion instead of being fire-and-forget. + final List<Procedure<ConfigNodeProcedureEnv>> shuntProcedures = new ArrayList<>(); // Filter those RegionGroups that created successfully createRegionGroupsPlan .getRegionGroupMap() @@ -138,16 +141,14 @@ public class CreateRegionGroupsProcedure // half of the RegionReplicas created successfully persistPlan.addRegionGroup(database, regionReplicaSet); - // Build recreate tasks + // Build recreate tasks for the replicas that failed to create failedRegionReplicas .getDataNodeLocations() .forEach( - targetDataNode -> { - RegionCreateTask createTask = - new RegionCreateTask( - targetDataNode, database, regionReplicaSet); - offerPlan.appendRegionMaintainTask(createTask); - }); + targetDataNode -> + shuntProcedures.add( + new CreateRegionProcedure( + database, regionReplicaSet, targetDataNode))); LOGGER.info( ProcedureMessages @@ -162,10 +163,9 @@ public class CreateRegionGroupsProcedure if (!failedRegionReplicas .getDataNodeLocations() .contains(targetDataNode)) { - RegionDeleteTask deleteTask = - new RegionDeleteTask( - targetDataNode, regionReplicaSet.getRegionId()); - offerPlan.appendRegionMaintainTask(deleteTask); + shuntProcedures.add( + new DeleteRegionProcedure( + regionReplicaSet.getRegionId(), targetDataNode)); } }); @@ -182,12 +182,7 @@ public class CreateRegionGroupsProcedure setFailure(new ProcedureException(new IoTDBException(persistStatus))); return Flow.NO_MORE_STATE; } - try { - env.getConfigManager().getConsensusManager().write(offerPlan); - } catch (final ConsensusException e) { - LOGGER.warn( - ConfigNodeMessages.FAILED_IN_THE_WRITE_API_EXECUTING_THE_CONSENSUS_LAYER_DUE, e); - } + shuntProcedures.forEach(this::addChildProcedure); setNextState(CreateRegionGroupsState.REBALANCE_DATA_PARTITION_POLICY); break; case REBALANCE_DATA_PARTITION_POLICY: diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/CreateRegionProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/CreateRegionProcedure.java new file mode 100644 index 00000000000..b85b4d96adb --- /dev/null +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/CreateRegionProcedure.java @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.confignode.procedure.impl.region; + +import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; +import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; +import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.commons.exception.runtime.ThriftSerDeException; +import org.apache.iotdb.commons.queryengine.utils.DateTimeUtils; +import org.apache.iotdb.commons.utils.CommonDateTimeUtils; +import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils; +import org.apache.iotdb.confignode.i18n.ProcedureMessages; +import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv; +import org.apache.iotdb.confignode.procedure.env.RegionMaintainHandler; +import org.apache.iotdb.confignode.procedure.exception.ProcedureException; +import org.apache.iotdb.confignode.procedure.state.CreateRegionState; +import org.apache.iotdb.confignode.procedure.store.ProcedureType; + +import org.apache.tsfile.utils.ReadWriteIOUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Objects; + +import static org.apache.iotdb.commons.utils.KillPoint.KillPoint.setKillPoint; +import static org.apache.iotdb.confignode.procedure.env.RegionMaintainHandler.simplifiedLocation; +import static org.apache.iotdb.rpc.TSStatusCode.SUCCESS_STATUS; + +/** + * Create a single region replica on one DataNode. + * + * <p>This replaces the queue-based CREATE path in {@code PartitionManager.maintainRegionReplicas}. + * Region creation already returns a final, unambiguous status from a single synchronous RPC, so + * this procedure simply wraps that RPC, gaining persistence and retry across ConfigNode leader + * changes. + */ +public class CreateRegionProcedure extends RegionOperationProcedure<CreateRegionState> { + private static final Logger LOGGER = LoggerFactory.getLogger(CreateRegionProcedure.class); + private static final int MAX_RETRY = 5; + + private String storageGroup; + private TRegionReplicaSet regionReplicaSet; + private TDataNodeLocation targetDataNode; + + public CreateRegionProcedure() { + super(); + } + + public CreateRegionProcedure( + String storageGroup, TRegionReplicaSet regionReplicaSet, TDataNodeLocation targetDataNode) { + super(regionReplicaSet.getRegionId()); + this.storageGroup = storageGroup; + this.regionReplicaSet = regionReplicaSet; + this.targetDataNode = targetDataNode; + } + + @Override + protected Flow executeFromState(ConfigNodeProcedureEnv env, CreateRegionState state) + throws InterruptedException { + if (regionId == null || targetDataNode == null) { + return Flow.NO_MORE_STATE; + } + RegionMaintainHandler handler = env.getRegionMaintainHandler(); + try { + switch (state) { + case CREATE_REGION: + LOGGER.info( + ProcedureMessages.PID_CREATEREGION_STARTED_WILL_BE_CREATED_ON_DATANODE, + getProcId(), + regionId, + simplifiedLocation(targetDataNode)); + TSStatus status = handler.createRegion(regionReplicaSet, storageGroup, targetDataNode); + setKillPoint(state); + if (status.getCode() != SUCCESS_STATUS.getStatusCode()) { + if (getCycles() < MAX_RETRY) { + LOGGER.warn( + ProcedureMessages.PID_CREATEREGION_CREATE_FAILED_WILL_RETRY, + getProcId(), + regionId, + simplifiedLocation(targetDataNode), + getCycles() + 1, + status); + setNextState(CreateRegionState.CREATE_REGION); + return Flow.HAS_MORE_STATE; + } + LOGGER.warn(ProcedureMessages.PID_CREATEREGION_STATE_FAILED, getProcId(), state); + return Flow.NO_MORE_STATE; + } + // Requirement: every successfully completed maintain task must be logged. + LOGGER.info( + ProcedureMessages + .PID_CREATEREGION_SUCCESS_HAS_BEEN_CREATED_ON_DATANODE_PROCEDURE_TOOK, + getProcId(), + regionId, + simplifiedLocation(targetDataNode), + CommonDateTimeUtils.convertMillisecondToDurationStr( + System.currentTimeMillis() - getSubmittedTime()), + DateTimeUtils.convertLongToDate(getSubmittedTime(), "ms")); + return Flow.NO_MORE_STATE; + default: + throw new ProcedureException(ProcedureMessages.UNSUPPORTED_STATE + state.name()); + } + } catch (Exception e) { + LOGGER.error(ProcedureMessages.PID_CREATEREGION_STATE_FAILED, getProcId(), state, e); + return Flow.NO_MORE_STATE; + } + } + + @Override + protected void rollbackState(ConfigNodeProcedureEnv env, CreateRegionState state) + throws IOException, InterruptedException, ProcedureException {} + + @Override + protected CreateRegionState getState(int stateId) { + return CreateRegionState.values()[stateId]; + } + + @Override + protected int getStateId(CreateRegionState createRegionState) { + return createRegionState.ordinal(); + } + + @Override + protected CreateRegionState getInitialState() { + return CreateRegionState.CREATE_REGION; + } + + public TDataNodeLocation getTargetDataNode() { + return targetDataNode; + } + + @Override + public void serialize(DataOutputStream stream) throws IOException { + stream.writeShort(ProcedureType.CREATE_REGION_PROCEDURE.getTypeCode()); + super.serialize(stream); + ReadWriteIOUtils.write(storageGroup, stream); + ThriftCommonsSerDeUtils.serializeTRegionReplicaSet(regionReplicaSet, stream); + ThriftCommonsSerDeUtils.serializeTDataNodeLocation(targetDataNode, stream); + } + + @Override + public void deserialize(ByteBuffer byteBuffer) { + super.deserialize(byteBuffer); + try { + storageGroup = ReadWriteIOUtils.readString(byteBuffer); + regionReplicaSet = ThriftCommonsSerDeUtils.deserializeTRegionReplicaSet(byteBuffer); + targetDataNode = ThriftCommonsSerDeUtils.deserializeTDataNodeLocation(byteBuffer); + regionId = regionReplicaSet.getRegionId(); + } catch (ThriftSerDeException e) { + LOGGER.error(ProcedureMessages.ERROR_IN_DESERIALIZE, this.getClass(), e); + } + } + + @Override + public boolean equals(Object obj) { + if (!(obj instanceof CreateRegionProcedure)) { + return false; + } + CreateRegionProcedure procedure = (CreateRegionProcedure) obj; + return Objects.equals(this.storageGroup, procedure.storageGroup) + && Objects.equals(this.regionReplicaSet, procedure.regionReplicaSet) + && Objects.equals(this.targetDataNode, procedure.targetDataNode); + } + + @Override + public int hashCode() { + return Objects.hash(storageGroup, regionReplicaSet, targetDataNode); + } + + @Override + public String toString() { + return "CreateRegionProcedure{" + + "regionId=" + + regionId + + ", storageGroup=" + + storageGroup + + ", targetDataNode=" + + simplifiedLocation(targetDataNode) + + '}'; + } +} diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/DeleteRegionProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/DeleteRegionProcedure.java new file mode 100644 index 00000000000..d5988481fb9 --- /dev/null +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/DeleteRegionProcedure.java @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.confignode.procedure.impl.region; + +import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; +import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; +import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.commons.exception.runtime.ThriftSerDeException; +import org.apache.iotdb.commons.queryengine.utils.DateTimeUtils; +import org.apache.iotdb.commons.utils.CommonDateTimeUtils; +import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils; +import org.apache.iotdb.confignode.i18n.ProcedureMessages; +import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv; +import org.apache.iotdb.confignode.procedure.env.RegionMaintainHandler; +import org.apache.iotdb.confignode.procedure.exception.ProcedureException; +import org.apache.iotdb.confignode.procedure.state.DeleteRegionState; +import org.apache.iotdb.confignode.procedure.store.ProcedureType; +import org.apache.iotdb.mpp.rpc.thrift.TRegionMigrateResult; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Objects; + +import static org.apache.iotdb.commons.utils.KillPoint.KillPoint.setKillPoint; +import static org.apache.iotdb.confignode.procedure.env.RegionMaintainHandler.simplifiedLocation; +import static org.apache.iotdb.rpc.TSStatusCode.SUCCESS_STATUS; + +/** + * Delete a single region replica (the consensus peer and all of its data) from one DataNode. + * + * <p>This replaces the old queue-based DELETE path in {@code + * PartitionManager.maintainRegionReplicas}. The DataNode runs the deletion asynchronously (it can + * remove a large amount of TsFile data and outlive a single RPC timeout) and this procedure polls + * for the result, so a slow deletion can never be wrongly reported as finished — fixing the "ghost + * task" problem where the coordinator forgot a task that was still running. + */ +public class DeleteRegionProcedure extends RegionOperationProcedure<DeleteRegionState> { + private static final Logger LOGGER = LoggerFactory.getLogger(DeleteRegionProcedure.class); + + private TDataNodeLocation targetDataNode; + + public DeleteRegionProcedure() { + super(); + } + + public DeleteRegionProcedure(TConsensusGroupId regionId, TDataNodeLocation targetDataNode) { + super(regionId); + this.targetDataNode = targetDataNode; + } + + @Override + protected Flow executeFromState(ConfigNodeProcedureEnv env, DeleteRegionState state) + throws InterruptedException { + if (regionId == null || targetDataNode == null) { + return Flow.NO_MORE_STATE; + } + RegionMaintainHandler handler = env.getRegionMaintainHandler(); + try { + switch (state) { + case DELETE_REGION: + LOGGER.info( + ProcedureMessages.PID_DELETEREGION_STARTED_WILL_BE_DELETED_FROM_DATANODE, + getProcId(), + regionId, + simplifiedLocation(targetDataNode)); + // Only submit the delete task on the very first entry of this state. We must NOT + // re-submit + // when: + // - the state was restored from disk after a leader change / ConfigNode reboot + // (isStateDeserialized()), or + // - this state is being re-entered in place because a previous attempt parked here on + // PROCESSING (getCycles() > 0). + // The DataNode also dedups by taskId (= procId), so a duplicate submit would be a no-op, + // but skipping it here avoids the useless RPC and keeps the re-poll cheap. + if (!this.isStateDeserialized() && getCycles() == 0) { + TSStatus submitStatus = + handler.submitDeleteRegionTask(this.getProcId(), targetDataNode, regionId); + setKillPoint(state); + if (submitStatus.getCode() != SUCCESS_STATUS.getStatusCode()) { + // Submit failed (e.g. the DataNode is unreachable). End the procedure; the parent + // will + // detect via the partition table that the region is still present and may retry. + LOGGER.warn( + ProcedureMessages.PID_DELETEREGION_SUBMIT_TASK_FAILED, + getProcId(), + regionId, + simplifiedLocation(targetDataNode), + submitStatus); + return Flow.NO_MORE_STATE; + } + } + TRegionMigrateResult result = handler.waitTaskFinish(this.getProcId(), targetDataNode); + switch (result.getTaskStatus()) { + case SUCCESS: + // Requirement: every successfully completed maintain task must be logged. + LOGGER.info( + ProcedureMessages + .PID_DELETEREGION_SUCCESS_REGION_HAS_BEEN_DELETED_FROM_DATANODE_PROCEDURE, + getProcId(), + regionId, + simplifiedLocation(targetDataNode), + CommonDateTimeUtils.convertMillisecondToDurationStr( + System.currentTimeMillis() - getSubmittedTime()), + DateTimeUtils.convertLongToDate(getSubmittedTime(), "ms")); + return Flow.NO_MORE_STATE; + case PROCESSING: + // waitTaskFinish() only returns PROCESSING when its polling loop was interrupted, + // i.e. + // this ConfigNode is shutting down / losing leadership. The delete task is still + // running on the DataNode, so we must persist and re-poll after recovery rather than + // declare the region deleted. Stay in DELETE_REGION; the isStateDeserialized() guard + // above prevents re-submitting. + setNextState(DeleteRegionState.DELETE_REGION); + return Flow.HAS_MORE_STATE; + case TASK_NOT_EXIST: + // The DataNode has no record of this task (it never received the submit, or it + // restarted + // and lost its in-memory task table). Treat it as a terminal failure: the parent's + // CHECK + // state inspects the partition table and will spawn a fresh DeleteRegionProcedure (with + // a + // new procId) if the region is still present, which retries cleanly. + case FAIL: + default: + LOGGER.warn( + ProcedureMessages.PID_DELETEREGION_EXECUTED_FAILED, + getProcId(), + regionId, + simplifiedLocation(targetDataNode), + result.getTaskStatus()); + return Flow.NO_MORE_STATE; + } + default: + throw new ProcedureException(ProcedureMessages.UNSUPPORTED_STATE + state.name()); + } + } catch (Exception e) { + LOGGER.error(ProcedureMessages.PID_DELETEREGION_STATE_FAILED, getProcId(), state, e); + return Flow.NO_MORE_STATE; + } + } + + @Override + protected void rollbackState(ConfigNodeProcedureEnv env, DeleteRegionState state) + throws IOException, InterruptedException, ProcedureException {} + + @Override + protected DeleteRegionState getState(int stateId) { + return DeleteRegionState.values()[stateId]; + } + + @Override + protected int getStateId(DeleteRegionState deleteRegionState) { + return deleteRegionState.ordinal(); + } + + @Override + protected DeleteRegionState getInitialState() { + return DeleteRegionState.DELETE_REGION; + } + + public TDataNodeLocation getTargetDataNode() { + return targetDataNode; + } + + @Override + public void serialize(DataOutputStream stream) throws IOException { + stream.writeShort(ProcedureType.DELETE_REGION_PROCEDURE.getTypeCode()); + super.serialize(stream); + ThriftCommonsSerDeUtils.serializeTConsensusGroupId(regionId, stream); + ThriftCommonsSerDeUtils.serializeTDataNodeLocation(targetDataNode, stream); + } + + @Override + public void deserialize(ByteBuffer byteBuffer) { + super.deserialize(byteBuffer); + try { + regionId = ThriftCommonsSerDeUtils.deserializeTConsensusGroupId(byteBuffer); + targetDataNode = ThriftCommonsSerDeUtils.deserializeTDataNodeLocation(byteBuffer); + } catch (ThriftSerDeException e) { + LOGGER.error(ProcedureMessages.ERROR_IN_DESERIALIZE, this.getClass(), e); + } + } + + @Override + public boolean equals(Object obj) { + if (!(obj instanceof DeleteRegionProcedure)) { + return false; + } + DeleteRegionProcedure procedure = (DeleteRegionProcedure) obj; + return this.regionId.equals(procedure.regionId) + && this.targetDataNode.equals(procedure.targetDataNode); + } + + @Override + public int hashCode() { + return Objects.hash(regionId, targetDataNode); + } + + @Override + public String toString() { + return "DeleteRegionProcedure{" + + "regionId=" + + regionId + + ", targetDataNode=" + + simplifiedLocation(targetDataNode) + + '}'; + } +} diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteDatabaseProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteDatabaseProcedure.java index 7b8de782173..493c0124eaf 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteDatabaseProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/schema/DeleteDatabaseProcedure.java @@ -19,29 +19,21 @@ package org.apache.iotdb.confignode.procedure.impl.schema; -import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; -import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType; -import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.exception.runtime.ThriftSerDeException; import org.apache.iotdb.commons.service.metric.MetricService; import org.apache.iotdb.commons.utils.ThriftConfigNodeSerDeUtils; -import org.apache.iotdb.confignode.client.async.CnToDnAsyncRequestType; -import org.apache.iotdb.confignode.client.async.CnToDnInternalServiceAsyncRequestManager; -import org.apache.iotdb.confignode.client.async.handlers.DataNodeAsyncRequestContext; import org.apache.iotdb.confignode.consensus.request.write.database.PreDeleteDatabasePlan; -import org.apache.iotdb.confignode.consensus.request.write.region.OfferRegionMaintainTasksPlan; import org.apache.iotdb.confignode.i18n.ProcedureMessages; import org.apache.iotdb.confignode.manager.partition.PartitionMetrics; -import org.apache.iotdb.confignode.persistence.partition.maintainer.RegionDeleteTask; import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv; import org.apache.iotdb.confignode.procedure.exception.ProcedureException; import org.apache.iotdb.confignode.procedure.impl.StateMachineProcedure; +import org.apache.iotdb.confignode.procedure.impl.region.DeleteRegionProcedure; import org.apache.iotdb.confignode.procedure.state.schema.DeleteDatabaseState; import org.apache.iotdb.confignode.procedure.store.ProcedureType; import org.apache.iotdb.confignode.rpc.thrift.TDatabaseSchema; -import org.apache.iotdb.consensus.exception.ConsensusException; import org.apache.iotdb.rpc.TSStatusCode; import org.apache.thrift.TException; @@ -51,10 +43,7 @@ import org.slf4j.LoggerFactory; import java.io.DataOutputStream; import java.io.IOException; import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.Objects; public class DeleteDatabaseProcedure @@ -114,86 +103,29 @@ public class DeleteDatabaseProcedure "[DeleteDatabaseProcedure] Delete DatabaseSchema: {}", deleteDatabaseSchema.getName()); - // Submit RegionDeleteTasks - final OfferRegionMaintainTasksPlan dataRegionDeleteTaskOfferPlan = - new OfferRegionMaintainTasksPlan(); + // Delete every region replica (both schema and data regions) of this database via a + // DeleteRegionProcedure child. Unlike the old fire-and-forget RegionMaintainer queue, the + // DatabasePartitionTable (handled in the next state) is only removed once these children + // have finished, so a slow region deletion can no longer become a forgotten "ghost task". final List<TRegionReplicaSet> regionReplicaSets = env.getAllReplicaSets(deleteDatabaseSchema.getName()); - final List<TRegionReplicaSet> schemaRegionReplicaSets = new ArrayList<>(); regionReplicaSets.forEach( regionReplicaSet -> { // Clear heartbeat cache along the way env.getConfigManager() .getLoadManager() .removeRegionGroupRelatedCache(regionReplicaSet.getRegionId()); - - if (regionReplicaSet - .getRegionId() - .getType() - .equals(TConsensusGroupType.SchemaRegion)) { - schemaRegionReplicaSets.add(regionReplicaSet); - } else { - regionReplicaSet - .getDataNodeLocations() - .forEach( - targetDataNode -> - dataRegionDeleteTaskOfferPlan.appendRegionMaintainTask( - new RegionDeleteTask( - targetDataNode, regionReplicaSet.getRegionId()))); - } + regionReplicaSet + .getDataNodeLocations() + .forEach( + targetDataNode -> + addChildProcedure( + new DeleteRegionProcedure( + regionReplicaSet.getRegionId(), targetDataNode))); }); - - if (!dataRegionDeleteTaskOfferPlan.getRegionMaintainTaskList().isEmpty()) { - // submit async data region delete task - env.getConfigManager().getConsensusManager().write(dataRegionDeleteTaskOfferPlan); - } - - // try sync delete schemaengine region - final DataNodeAsyncRequestContext<TConsensusGroupId, TSStatus> asyncClientHandler = - new DataNodeAsyncRequestContext<>(CnToDnAsyncRequestType.DELETE_REGION); - final Map<Integer, RegionDeleteTask> schemaRegionDeleteTaskMap = new HashMap<>(); - int requestIndex = 0; - for (final TRegionReplicaSet schemaRegionReplicaSet : schemaRegionReplicaSets) { - for (final TDataNodeLocation dataNodeLocation : - schemaRegionReplicaSet.getDataNodeLocations()) { - asyncClientHandler.putRequest(requestIndex, schemaRegionReplicaSet.getRegionId()); - asyncClientHandler.putNodeLocation(requestIndex, dataNodeLocation); - schemaRegionDeleteTaskMap.put( - requestIndex, - new RegionDeleteTask(dataNodeLocation, schemaRegionReplicaSet.getRegionId())); - requestIndex++; - } - } - if (!schemaRegionDeleteTaskMap.isEmpty()) { - CnToDnInternalServiceAsyncRequestManager.getInstance() - .sendAsyncRequestWithRetry(asyncClientHandler); - for (final Map.Entry<Integer, TSStatus> entry : - asyncClientHandler.getResponseMap().entrySet()) { - if (entry.getValue().getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { - LOG.info( - "[DeleteDatabaseProcedure] Successfully delete SchemaRegion[{}] on {}", - asyncClientHandler.getRequest(entry.getKey()), - schemaRegionDeleteTaskMap.get(entry.getKey()).getTargetDataNode()); - schemaRegionDeleteTaskMap.remove(entry.getKey()); - } else { - LOG.warn( - "[DeleteDatabaseProcedure] Failed to delete SchemaRegion[{}] on {}. Submit to async deletion.", - asyncClientHandler.getRequest(entry.getKey()), - schemaRegionDeleteTaskMap.get(entry.getKey()).getTargetDataNode()); - } - } - - if (!schemaRegionDeleteTaskMap.isEmpty()) { - // submit async schemaengine region delete task for failed sync execution - final OfferRegionMaintainTasksPlan schemaRegionDeleteTaskOfferPlan = - new OfferRegionMaintainTasksPlan(); - schemaRegionDeleteTaskMap - .values() - .forEach(schemaRegionDeleteTaskOfferPlan::appendRegionMaintainTask); - env.getConfigManager().getConsensusManager().write(schemaRegionDeleteTaskOfferPlan); - } - } - + setNextState(DeleteDatabaseState.DELETE_DATABASE_CONFIG); + break; + case DELETE_DATABASE_CONFIG: env.getConfigManager() .getLoadManager() .clearDataPartitionPolicyTable(deleteDatabaseSchema.getName()); @@ -220,7 +152,7 @@ public class DeleteDatabaseProcedure ProcedureMessages.DELETEDATABASEPROCEDURE_DELETE_DATABASESCHEMA_FAILED)); } } - } catch (final ConsensusException | TException | IOException e) { + } catch (final TException | IOException e) { if (isRollbackSupported(state)) { setFailure( new ProcedureException( diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/schema/DeleteDatabaseState.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/CreateRegionState.java similarity index 83% copy from iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/schema/DeleteDatabaseState.java copy to iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/CreateRegionState.java index cde6b2bdd81..798544ed794 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/schema/DeleteDatabaseState.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/CreateRegionState.java @@ -17,10 +17,8 @@ * under the License. */ -package org.apache.iotdb.confignode.procedure.state.schema; +package org.apache.iotdb.confignode.procedure.state; -public enum DeleteDatabaseState { - PRE_DELETE_DATABASE, - INVALIDATE_CACHE, - DELETE_DATABASE_SCHEMA +public enum CreateRegionState { + CREATE_REGION, } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/schema/DeleteDatabaseState.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/DeleteRegionState.java similarity index 83% copy from iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/schema/DeleteDatabaseState.java copy to iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/DeleteRegionState.java index cde6b2bdd81..5cc51cf6c52 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/schema/DeleteDatabaseState.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/DeleteRegionState.java @@ -17,10 +17,8 @@ * under the License. */ -package org.apache.iotdb.confignode.procedure.state.schema; +package org.apache.iotdb.confignode.procedure.state; -public enum DeleteDatabaseState { - PRE_DELETE_DATABASE, - INVALIDATE_CACHE, - DELETE_DATABASE_SCHEMA +public enum DeleteRegionState { + DELETE_REGION, } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/schema/DeleteDatabaseState.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/schema/DeleteDatabaseState.java index cde6b2bdd81..070d0518a16 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/schema/DeleteDatabaseState.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/schema/DeleteDatabaseState.java @@ -22,5 +22,8 @@ package org.apache.iotdb.confignode.procedure.state.schema; public enum DeleteDatabaseState { PRE_DELETE_DATABASE, INVALIDATE_CACHE, - DELETE_DATABASE_SCHEMA + DELETE_DATABASE_SCHEMA, + // Delete the DatabasePartitionTable and related config after all region replicas have been + // deleted by the DeleteRegionProcedure children spawned in DELETE_DATABASE_SCHEMA. + DELETE_DATABASE_CONFIG } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java index 231b44ad2ee..40dd48a3def 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java @@ -40,6 +40,8 @@ import org.apache.iotdb.confignode.procedure.impl.pipe.task.StartPipeProcedureV2 import org.apache.iotdb.confignode.procedure.impl.pipe.task.StopPipeProcedureV2; import org.apache.iotdb.confignode.procedure.impl.region.AddRegionPeerProcedure; import org.apache.iotdb.confignode.procedure.impl.region.CreateRegionGroupsProcedure; +import org.apache.iotdb.confignode.procedure.impl.region.CreateRegionProcedure; +import org.apache.iotdb.confignode.procedure.impl.region.DeleteRegionProcedure; import org.apache.iotdb.confignode.procedure.impl.region.NotifyRegionMigrationProcedure; import org.apache.iotdb.confignode.procedure.impl.region.ReconstructRegionProcedure; import org.apache.iotdb.confignode.procedure.impl.region.RegionMigrateProcedure; @@ -145,6 +147,12 @@ public class ProcedureFactory implements IProcedureFactory { case NOTIFY_REGION_MIGRATION_PROCEDURE: procedure = new NotifyRegionMigrationProcedure(); break; + case DELETE_REGION_PROCEDURE: + procedure = new DeleteRegionProcedure(); + break; + case CREATE_REGION_PROCEDURE: + procedure = new CreateRegionProcedure(); + break; case ALTER_ENCODING_COMPRESSOR_PROCEDURE: procedure = new AlterEncodingCompressorProcedure(false); break; @@ -463,6 +471,10 @@ public class ProcedureFactory implements IProcedureFactory { return ProcedureType.RECONSTRUCT_REGION_PROCEDURE; } else if (procedure instanceof NotifyRegionMigrationProcedure) { return ProcedureType.NOTIFY_REGION_MIGRATION_PROCEDURE; + } else if (procedure instanceof DeleteRegionProcedure) { + return ProcedureType.DELETE_REGION_PROCEDURE; + } else if (procedure instanceof CreateRegionProcedure) { + return ProcedureType.CREATE_REGION_PROCEDURE; } else if (procedure instanceof CreateTriggerProcedure) { return ProcedureType.CREATE_TRIGGER_PROCEDURE; } else if (procedure instanceof DropTriggerProcedure) { diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureType.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureType.java index 1cd6a46a4dc..c070663e2de 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureType.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureType.java @@ -41,6 +41,8 @@ public enum ProcedureType { ADD_REGION_PEER_PROCEDURE((short) 204), REMOVE_REGION_PEER_PROCEDURE((short) 205), NOTIFY_REGION_MIGRATION_PROCEDURE((short) 206), + DELETE_REGION_PROCEDURE((short) 207), + CREATE_REGION_PROCEDURE((short) 208), @TestOnly CREATE_MANY_DATABASES_PROCEDURE((short) 250), diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PartitionInfoTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PartitionInfoTest.java index afccb0c0eba..8e67bc6f5d8 100644 --- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PartitionInfoTest.java +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PartitionInfoTest.java @@ -143,6 +143,9 @@ public class PartitionInfoTest { testFlag.DataPartition.getFlag(), TConsensusGroupType.DataRegion)); partitionInfo.createDataPartition(createDataPartitionPlan); + // The RegionMaintainer queue has been replaced by Create/DeleteRegionProcedure; + // offerRegionMaintainTasks is now a no-op kept only for consensus-log replay. Calling it here + // verifies it does not corrupt the snapshot (the maintain-task block is written as empty). partitionInfo.offerRegionMaintainTasks(generateOfferRegionMaintainTasksPlan()); Assert.assertTrue(partitionInfo.processTakeSnapshot(snapshotDir)); @@ -152,6 +155,46 @@ public class PartitionInfoTest { Assert.assertEquals(partitionInfo, partitionInfo1); } + /** + * An old ConfigNode could take a snapshot while its RegionMaintainer queue was non-empty, writing + * a trailing block of serialized tasks (count {@literal >} 0). The new code no longer drains that + * queue, but it must still consume exactly those bytes so the snapshot stream stays aligned, and + * then discard the tasks. This test writes such a legacy snapshot by hand and verifies the new + * loader reads it without error and ends up with an empty maintain list. + */ + @Test + public void testLoadLegacySnapshotWithRegionMaintainTasksIsDrained() + throws TException, IOException { + partitionInfo.generateNextRegionGroupId(); + partitionInfo.createDatabase( + new DatabaseSchemaPlan( + ConfigPhysicalPlanType.CreateDatabase, new TDatabaseSchema("root.test"))); + + // Produce a baseline snapshot with the new (empty) maintain block, then rewrite its trailing + // block with two legacy tasks so it mimics a snapshot taken by an old node. + Assert.assertTrue(partitionInfo.processTakeSnapshot(snapshotDir)); + File snapshotFile = new File(snapshotDir, "partition_info.bin"); + byte[] base = java.nio.file.Files.readAllBytes(snapshotFile.toPath()); + // The new snapshot ends with an int "0" (the empty maintain-task count). Strip those last 4 + // bytes and append a legacy block: count=2 followed by two serialized RegionMaintainTasks. + java.io.ByteArrayOutputStream legacy = new java.io.ByteArrayOutputStream(); + legacy.write(base, 0, base.length - Integer.BYTES); + org.apache.tsfile.utils.ReadWriteIOUtils.write(2, legacy); + org.apache.thrift.protocol.TProtocol protocol = + new org.apache.thrift.protocol.TBinaryProtocol( + new org.apache.thrift.transport.TIOStreamTransport(legacy)); + OfferRegionMaintainTasksPlan tasks = generateOfferRegionMaintainTasksPlan(); + tasks.getRegionMaintainTaskList().get(0).serialize(legacy, protocol); + tasks.getRegionMaintainTaskList().get(1).serialize(legacy, protocol); + protocol.getTransport().flush(); + java.nio.file.Files.write(snapshotFile.toPath(), legacy.toByteArray()); + + PartitionInfo loaded = new PartitionInfo(); + // Must not throw, and the legacy tasks must be drained (not retained). + loaded.processLoadSnapshot(snapshotDir); + Assert.assertEquals(partitionInfo, loaded); + } + @Test public void testGetRegionType() { diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/region/CreateRegionProcedureTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/region/CreateRegionProcedureTest.java new file mode 100644 index 00000000000..be5689f9944 --- /dev/null +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/region/CreateRegionProcedureTest.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.confignode.procedure.impl.region; + +import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; +import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType; +import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; +import org.apache.iotdb.common.rpc.thrift.TEndPoint; +import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; +import org.apache.iotdb.confignode.procedure.store.ProcedureFactory; + +import org.apache.tsfile.utils.PublicBAOS; +import org.junit.Assert; +import org.junit.Test; + +import java.io.DataOutputStream; +import java.nio.ByteBuffer; +import java.util.Collections; + +public class CreateRegionProcedureTest { + @Test + public void serDeTest() throws Exception { + TDataNodeLocation targetDataNode = + new TDataNodeLocation( + 1, + new TEndPoint("127.0.0.1", 0), + new TEndPoint("127.0.0.1", 1), + new TEndPoint("127.0.0.1", 2), + new TEndPoint("127.0.0.1", 3), + new TEndPoint("127.0.0.1", 4)); + TRegionReplicaSet regionReplicaSet = + new TRegionReplicaSet( + new TConsensusGroupId(TConsensusGroupType.DataRegion, 10), + Collections.singletonList(targetDataNode)); + CreateRegionProcedure procedure = + new CreateRegionProcedure("root.sg", regionReplicaSet, targetDataNode); + try (PublicBAOS byteArrayOutputStream = new PublicBAOS(); + DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream)) { + procedure.serialize(outputStream); + ByteBuffer buffer = + ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size()); + // Exercises ProcedureType.CREATE_REGION_PROCEDURE + ProcedureFactory registration as well as + // the procedure's own serialize/deserialize. + Assert.assertEquals(procedure, ProcedureFactory.getInstance().create(buffer)); + } + } +} diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/region/DeleteRegionProcedureTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/region/DeleteRegionProcedureTest.java new file mode 100644 index 00000000000..241f5bb7d3c --- /dev/null +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/region/DeleteRegionProcedureTest.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.confignode.procedure.impl.region; + +import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; +import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType; +import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; +import org.apache.iotdb.common.rpc.thrift.TEndPoint; +import org.apache.iotdb.confignode.procedure.store.ProcedureFactory; + +import org.apache.tsfile.utils.PublicBAOS; +import org.junit.Assert; +import org.junit.Test; + +import java.io.DataOutputStream; +import java.nio.ByteBuffer; + +public class DeleteRegionProcedureTest { + @Test + public void serDeTest() throws Exception { + DeleteRegionProcedure procedure = + new DeleteRegionProcedure( + new TConsensusGroupId(TConsensusGroupType.DataRegion, 10), + new TDataNodeLocation( + 1, + new TEndPoint("127.0.0.1", 0), + new TEndPoint("127.0.0.1", 1), + new TEndPoint("127.0.0.1", 2), + new TEndPoint("127.0.0.1", 3), + new TEndPoint("127.0.0.1", 4))); + try (PublicBAOS byteArrayOutputStream = new PublicBAOS(); + DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream)) { + procedure.serialize(outputStream); + ByteBuffer buffer = + ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size()); + // Exercises ProcedureType.DELETE_REGION_PROCEDURE + ProcedureFactory registration as well as + // the procedure's own serialize/deserialize. + Assert.assertEquals(procedure, ProcedureFactory.getInstance().create(buffer)); + } + } +} diff --git a/iotdb-core/datanode/src/main/i18n/en/org/apache/iotdb/db/i18n/DataNodeMiscMessages.java b/iotdb-core/datanode/src/main/i18n/en/org/apache/iotdb/db/i18n/DataNodeMiscMessages.java index 2f0a53db5c4..9d057490b24 100644 --- a/iotdb-core/datanode/src/main/i18n/en/org/apache/iotdb/db/i18n/DataNodeMiscMessages.java +++ b/iotdb-core/datanode/src/main/i18n/en/org/apache/iotdb/db/i18n/DataNodeMiscMessages.java @@ -977,6 +977,8 @@ public final class DataNodeMiscMessages { "Submit removeRegionPeer task failed, region: "; public static final String SUBMIT_DELETE_OLD_REGION_PEER_TASK_FAILED_REGION = "Submit deleteOldRegionPeer task failed, region: "; + public static final String SUBMIT_DELETE_REGION_TASK_FAILED_REGION = + "Submit deleteRegion task failed, region: "; public static final String CREATE_NEW_REGION_PEER_SUCCEED_REGION_ID = "createNewRegionPeer succeed, regionId: "; public static final String DISABLE_DATANODE_SUCCEED = "disable datanode succeed"; diff --git a/iotdb-core/datanode/src/main/i18n/zh/org/apache/iotdb/db/i18n/DataNodeMiscMessages.java b/iotdb-core/datanode/src/main/i18n/zh/org/apache/iotdb/db/i18n/DataNodeMiscMessages.java index c5625447212..d5dc2405f38 100644 --- a/iotdb-core/datanode/src/main/i18n/zh/org/apache/iotdb/db/i18n/DataNodeMiscMessages.java +++ b/iotdb-core/datanode/src/main/i18n/zh/org/apache/iotdb/db/i18n/DataNodeMiscMessages.java @@ -975,6 +975,8 @@ public final class DataNodeMiscMessages { "提交 removeRegionPeer 任务失败,region: "; public static final String SUBMIT_DELETE_OLD_REGION_PEER_TASK_FAILED_REGION = "提交 deleteOldRegionPeer 任务失败,region: "; + public static final String SUBMIT_DELETE_REGION_TASK_FAILED_REGION = + "提交 deleteRegion 任务失败,region: "; public static final String CREATE_NEW_REGION_PEER_SUCCEED_REGION_ID = "createNewRegionPeer 成功,regionId: "; public static final String DISABLE_DATANODE_SUCCEED = "禁用 DataNode 成功"; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java index c588c94c7b5..d7dfe61410d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java @@ -2998,6 +2998,28 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface return status; } + @Override + public TSStatus deleteRegionAsync(TMaintainPeerReq req) { + TSStatus consensusStatus = waitForConsensusStarted(); + if (consensusStatus != null) { + return consensusStatus; + } + TConsensusGroupId regionId = req.getRegionId(); + String selectedDataNodeIP = req.getDestNode().getInternalEndPoint().getIp(); + boolean submitSucceed = RegionMigrateService.getInstance().submitDeleteRegionTask(req); + TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); + if (submitSucceed) { + LOGGER.info( + "Successfully submit deleteRegion task for region: {}, target DataNode: {}", + regionId, + selectedDataNodeIP); + return status; + } + status.setCode(TSStatusCode.DELETE_REGION_ERROR.getStatusCode()); + status.setMessage(DataNodeMiscMessages.SUBMIT_DELETE_REGION_TASK_FAILED_REGION + regionId); + return status; + } + // TODO: return which DataNode fail @Override public TSStatus resetPeerList(TResetPeerListReq req) throws TException { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java index 39c1a1678cf..f6fae2c4a8b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java @@ -236,6 +236,43 @@ public class RegionMigrateService implements IService { return submitSucceed; } + /** + * Submit a {@link DeleteRegionTask} that asynchronously deletes a region replica on this + * DataNode. + * + * <p>Used by the ConfigNode's {@code DeleteRegionProcedure} to replace the old synchronous {@code + * deleteRegion} RPC. The deletion can outlive the RPC timeout (it removes TsFiles and the + * consensus peer), so it is run in the background and the ConfigNode polls {@link + * #getRegionMaintainResult} for progress. The {@link #addToTaskResultMap} guard makes a retried + * submit (after an RPC timeout or a ConfigNode leader change) a no-op, so the task is executed + * exactly once. + * + * @param req TMaintainPeerReq, whose destNode is the DataNode to delete the replica from + * @return whether the submit succeeded + */ + public synchronized boolean submitDeleteRegionTask(TMaintainPeerReq req) { + boolean submitSucceed = true; + try { + if (!addToTaskResultMap(req.getTaskId())) { + LOGGER.warn( + "{} The DeleteRegionTask {} has already been submitted and will not be submitted again.", + REGION_MIGRATE_PROCESS, + req.getTaskId()); + return true; + } + regionMigratePool.submit( + new DeleteRegionTask(req.getTaskId(), req.getRegionId(), req.getDestNode())); + } catch (Exception e) { + LOGGER.error( + "{}, Submit DeleteRegionTask error for Region: {}", + REGION_MIGRATE_PROCESS, + req.getRegionId(), + e); + submitSucceed = false; + } + return submitSucceed; + } + public synchronized TSStatus resetPeerList(TResetPeerListReq req) { List<Peer> correctPeers = req.getCorrectLocations().stream() @@ -586,6 +623,131 @@ public class RegionMigrateService implements IService { } } + /** + * Asynchronously deletes a region replica (the consensus peer and all of its data) on this + * DataNode. + * + * <p>This is the background worker behind {@link #submitDeleteRegionTask}. It mirrors the logic + * of the synchronous {@code deleteRegion} RPC (delete the local consensus peer, then delete the + * region data), but records its terminal state in {@link #taskResultMap} so the ConfigNode can + * poll for completion instead of relying on a single RPC response. + * + * <p>Note: unlike {@link DeleteOldRegionPeerTask}, this task returns immediately after recording + * a failure via {@link #taskFail}; it never falls through to {@link #taskSucceed}. A failed + * deletion must never be reported as success, otherwise the ConfigNode would forget the task + * while the data is still present. + */ + private static class DeleteRegionTask implements Runnable { + + private static final Logger taskLogger = LoggerFactory.getLogger(DeleteRegionTask.class); + private final long taskId; + private final TConsensusGroupId tRegionId; + private final TDataNodeLocation targetDataNode; + + public DeleteRegionTask( + long taskId, TConsensusGroupId tRegionId, TDataNodeLocation targetDataNode) { + this.taskId = taskId; + this.tRegionId = tRegionId; + this.targetDataNode = targetDataNode; + } + + @Override + public void run() { + // deletePeer: remove the local consensus peer from the consensus group + TSStatus runResult = deletePeer(); + if (isFailed(runResult)) { + taskFail( + taskId, + tRegionId, + targetDataNode, + TRegionMigrateFailedType.RemoveConsensusGroupFailed, + runResult); + return; + } + + // deleteRegion: delete the region data (TsFiles, schema, etc.) + runResult = deleteRegion(); + if (isFailed(runResult)) { + taskFail( + taskId, + tRegionId, + targetDataNode, + TRegionMigrateFailedType.DeleteRegionFailed, + runResult); + return; + } + + taskSucceed(taskId, tRegionId, "DeleteRegion"); + } + + private TSStatus deletePeer() { + taskLogger.info( + "{}, Start to delete the local peer of region {} on datanode {}", + REGION_MIGRATE_PROCESS, + tRegionId, + targetDataNode); + ConsensusGroupId regionId = ConsensusGroupId.Factory.createFromTConsensusGroupId(tRegionId); + TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); + try { + if (regionId instanceof DataRegionId) { + DataRegionConsensusImpl.getInstance().deleteLocalPeer(regionId); + } else { + SchemaRegionConsensusImpl.getInstance().deleteLocalPeer(regionId); + } + } catch (ConsensusGroupNotExistException e) { + // The peer is already absent. This is expected when the task is retried after a previous + // attempt already removed the peer, so treat it as success and continue to delete the data. + taskLogger.info( + "{}, The local peer of region {} does not exist, skip deleting it", + REGION_MIGRATE_PROCESS, + regionId); + } catch (ConsensusException e) { + String errorMsg = + String.format( + "delete local peer error, regionId: %s, errorMessage: %s", + regionId, e.getMessage()); + taskLogger.error(errorMsg); + status.setCode(TSStatusCode.DELETE_REGION_ERROR.getStatusCode()); + status.setMessage(errorMsg); + return status; + } catch (Exception e) { + taskLogger.error( + "{}, delete local peer error, regionId: {}", REGION_MIGRATE_PROCESS, regionId, e); + status.setCode(TSStatusCode.DELETE_REGION_ERROR.getStatusCode()); + status.setMessage( + "delete local peer for region: " + regionId + " error. exception: " + e.getMessage()); + return status; + } + return status; + } + + private TSStatus deleteRegion() { + taskLogger.info( + "{}, Start to delete the data of region {} on datanode {}", + REGION_MIGRATE_PROCESS, + tRegionId, + targetDataNode); + TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); + ConsensusGroupId regionId = ConsensusGroupId.Factory.createFromTConsensusGroupId(tRegionId); + try { + if (regionId instanceof DataRegionId) { + DataNodeRegionManager.getInstance().deleteDataRegion((DataRegionId) regionId); + } else { + DataNodeRegionManager.getInstance().deleteSchemaRegion((SchemaRegionId) regionId); + } + } catch (Exception e) { + taskLogger.error("{}, delete region {} error", REGION_MIGRATE_PROCESS, regionId, e); + status.setCode(TSStatusCode.DELETE_REGION_ERROR.getStatusCode()); + status.setMessage( + String.format(DataNodeMiscMessages.DELETE_REGION_ERROR, regionId, e.getMessage())); + return status; + } + status.setMessage(String.format(DataNodeMiscMessages.DELETE_REGION_SUCCEED, regionId)); + taskLogger.info("{}, Succeed to delete region {}", REGION_MIGRATE_PROCESS, regionId); + return status; + } + } + private static class Holder { private static final RegionMigrateService INSTANCE = new RegionMigrateService(); diff --git a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift index 11de2e4cba2..27fc51ee74a 100644 --- a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift +++ b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift @@ -944,6 +944,20 @@ service IDataNodeRPCService { */ common.TSStatus deleteRegion(common.TConsensusGroupId consensusGroupId) + /** + * Config node asks the DataNode to asynchronously delete a data/schema region replica. + * + * <p>Unlike the synchronous {@link #deleteRegion}, the DataNode submits the deletion as a + * background task keyed by {@code taskId} and returns immediately. The ConfigNode then polls + * {@link #getRegionMaintainResult} until the task reaches a terminal state. This avoids the + * "ghost task" problem where a slow deletion outlives the RPC timeout and the ConfigNode's + * retry is wrongly reported as success. + * + * @param TMaintainPeerReq which contains the RegionId, the DataNodeLocation to delete the + * replica from (destNode) and the taskId (procedureId) + */ + common.TSStatus deleteRegionAsync(TMaintainPeerReq req) + /** * Change the leader of specified RegionGroup to another DataNode *
