This is an automated email from the ASF dual-hosted git repository. tanxinyu pushed a commit to branch region_migration in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit e7ea7bd6f4dafef9fe7368ad5520d5faf7d11d3c Author: liyuheng <[email protected]> AuthorDate: Fri Mar 1 09:46:39 2024 +0800 split RegionMigrateProcedure into AddRegionPeerProcedure and RemoveRegionPeerProcedure, include below commits: split RegionMigrateProcedure: delete useless code split procedure: RemoveRegionPeer successfully split, IT pass split procedure: AddRegionPeer successfully split, IT pass split procedure: ready split procedure: ready split procedure: have some problem --- .../it/IoTDBRegionMigrateReliabilityIT.java | 12 ++ .../client/sync/SyncDataNodeClientPool.java | 2 +- .../iotdb/confignode/conf/ConfigNodeConstant.java | 1 + .../consensus/request/ConfigPhysicalPlan.java | 4 - .../consensus/request/ConfigPhysicalPlanType.java | 4 +- ...ocationPlan.java => AddRegionLocationPlan.java} | 40 +--- ...tionPlan.java => RemoveRegionLocationPlan.java} | 46 ++--- .../write/partition/UpdateRegionLocationPlan.java | 1 + .../iotdb/confignode/manager/ConfigManager.java | 1 + .../iotdb/confignode/manager/ProcedureManager.java | 26 ++- .../manager/partition/PartitionManager.java | 22 +- .../persistence/executor/ConfigPlanExecutor.java | 9 +- .../partition/DatabasePartitionTable.java | 7 +- .../persistence/partition/PartitionInfo.java | 35 ++-- .../procedure/env/DataNodeRemoveHandler.java | 53 +++-- .../impl/statemachine/AddRegionPeerProcedure.java | 223 ++++++++++++++++++++ .../impl/statemachine/RegionMigrateProcedure.java | 121 ++--------- .../statemachine/RemoveRegionPeerProcedure.java | 226 +++++++++++++++++++++ ...ransitionState.java => AddRegionPeerState.java} | 10 +- .../procedure/state/RegionTransitionState.java | 4 +- ...sitionState.java => RemoveRegionPeerState.java} | 8 +- .../procedure/store/ProcedureFactory.java | 12 ++ .../confignode/procedure/store/ProcedureType.java | 2 + 23 files changed, 630 insertions(+), 239 deletions(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBRegionMigrateReliabilityIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBRegionMigrateReliabilityIT.java index ffdda9a088a..18ddac96ac5 100644 --- a/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBRegionMigrateReliabilityIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBRegionMigrateReliabilityIT.java @@ -106,6 +106,18 @@ public class IoTDBRegionMigrateReliabilityIT { final int originalDataNode = selectOriginalDataNode(regionMap, selectedRegion); final int destDataNode = selectDestDataNode(dataNodeSet, regionMap, selectedRegion); + // set breakpoint + HashMap<String, Runnable> keywordAction = new HashMap<>(); + Arrays.stream(RegionTransitionState.values()) + .forEach( + state -> + keywordAction.put( + String.valueOf(state), () -> LOGGER.info(String.valueOf(state)))); + ExecutorService service = IoTDBThreadPoolFactory.newCachedThreadPool("regionMigrateIT"); + LOGGER.info("breakpoint setting..."); + service.submit(() -> logBreakpointMonitor(0, keywordAction)); + LOGGER.info("breakpoint set"); + statement.execute(regionMigrateCommand(selectedRegion, originalDataNode, destDataNode)); awaitUntilSuccess(statement, selectedRegion, originalDataNode, destDataNode); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncDataNodeClientPool.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncDataNodeClientPool.java index 7b1549d44cf..3fd37a98c4f 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncDataNodeClientPool.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncDataNodeClientPool.java @@ -150,7 +150,7 @@ public class SyncDataNodeClientPool { TimeUnit.MILLISECONDS.sleep(3200L); } } catch (InterruptedException e) { - LOGGER.error("Retry wait failed.", e); + LOGGER.warn("Retry wait failed.", e); Thread.currentThread().interrupt(); } } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConstant.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConstant.java index 214db2c5956..f8972a58389 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConstant.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConstant.java @@ -46,6 +46,7 @@ public class ConfigNodeConstant { public static final String REMOVE_DATANODE_PROCESS = "[REMOVE_DATANODE_PROCESS]"; public static final String REGION_MIGRATE_PROCESS = "[REGION_MIGRATE_PROCESS]"; + public static final String ADD_REGION_PEER_PROGRESS = "[ADD_REGION_PEER_PROGRESS]"; public static final String IOTDB_FOREGROUND = "iotdb-foreground"; public static final String IOTDB_PIDFILE = "iotdb-pidfile"; diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java index 9200b8fc9ca..f08c7b1129b 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java @@ -73,7 +73,6 @@ import org.apache.iotdb.confignode.consensus.request.write.function.CreateFuncti import org.apache.iotdb.confignode.consensus.request.write.function.DropFunctionPlan; import org.apache.iotdb.confignode.consensus.request.write.partition.CreateDataPartitionPlan; import org.apache.iotdb.confignode.consensus.request.write.partition.CreateSchemaPartitionPlan; -import org.apache.iotdb.confignode.consensus.request.write.partition.UpdateRegionLocationPlan; import org.apache.iotdb.confignode.consensus.request.write.pipe.PipeEnrichedPlan; import org.apache.iotdb.confignode.consensus.request.write.pipe.plugin.CreatePipePluginPlan; import org.apache.iotdb.confignode.consensus.request.write.pipe.plugin.DropPipePluginPlan; @@ -376,9 +375,6 @@ public abstract class ConfigPhysicalPlan implements IConsensusRequest { case GetRegionInfoList: plan = new GetRegionInfoListPlan(); break; - case UpdateRegionLocation: - plan = new UpdateRegionLocationPlan(); - break; case CreatePipeSinkV1: plan = new CreatePipeSinkPlanV1(); break; diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java index 24ecb66dfe7..8f1c0bb9244 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java @@ -53,6 +53,7 @@ public enum ConfigPhysicalPlanType { CreateRegionGroups((short) 300), DeleteRegionGroups((short) 301), GetRegionInfoList((short) 302), + @Deprecated UpdateRegionLocation((short) 303), OfferRegionMaintainTasks((short) 304), PollRegionMaintainTask((short) 305), @@ -60,8 +61,9 @@ public enum ConfigPhysicalPlanType { GetSeriesSlotList((short) 307), GetTimeSlotList((short) 308), PollSpecificRegionMaintainTask((short) 309), - CountTimeSlotList((short) 310), + AddRegionLocation((short) 311), + RemoveRegionLocation((short) 312), /** Partition. */ GetSchemaPartition((short) 400), diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/partition/UpdateRegionLocationPlan.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/partition/AddRegionLocationPlan.java similarity index 62% copy from iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/partition/UpdateRegionLocationPlan.java copy to iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/partition/AddRegionLocationPlan.java index 06c94b07394..7f056bc37e7 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/partition/UpdateRegionLocationPlan.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/partition/AddRegionLocationPlan.java @@ -29,59 +29,39 @@ import java.io.DataOutputStream; import java.io.IOException; import java.nio.ByteBuffer; -public class UpdateRegionLocationPlan extends ConfigPhysicalPlan { - /*which region*/ +public class AddRegionLocationPlan extends ConfigPhysicalPlan { TConsensusGroupId regionId; - /*remove it from the region's location*/ - TDataNodeLocation oldNode; + TDataNodeLocation newLocation; - /*add it to the region's location*/ - TDataNodeLocation newNode; - - public UpdateRegionLocationPlan() { - super(ConfigPhysicalPlanType.UpdateRegionLocation); + public AddRegionLocationPlan() { + super(ConfigPhysicalPlanType.AddRegionLocation); } - /** - * Constructor. - * - * @param regionId update the region location - * @param oldNode remove the old location - * @param newNode add the new location - */ - public UpdateRegionLocationPlan( - TConsensusGroupId regionId, TDataNodeLocation oldNode, TDataNodeLocation newNode) { + public AddRegionLocationPlan(TConsensusGroupId regionId, TDataNodeLocation newLocation) { this(); this.regionId = regionId; - this.oldNode = oldNode; - this.newNode = newNode; + this.newLocation = newLocation; } @Override protected void serializeImpl(DataOutputStream stream) throws IOException { stream.writeShort(getType().getPlanType()); ThriftCommonsSerDeUtils.serializeTConsensusGroupId(regionId, stream); - ThriftCommonsSerDeUtils.serializeTDataNodeLocation(oldNode, stream); - ThriftCommonsSerDeUtils.serializeTDataNodeLocation(newNode, stream); + ThriftCommonsSerDeUtils.serializeTDataNodeLocation(newLocation, stream); } @Override protected void deserializeImpl(ByteBuffer buffer) throws IOException { regionId = ThriftCommonsSerDeUtils.deserializeTConsensusGroupId(buffer); - oldNode = ThriftCommonsSerDeUtils.deserializeTDataNodeLocation(buffer); - newNode = ThriftCommonsSerDeUtils.deserializeTDataNodeLocation(buffer); + newLocation = ThriftCommonsSerDeUtils.deserializeTDataNodeLocation(buffer); } public TConsensusGroupId getRegionId() { return regionId; } - public TDataNodeLocation getOldNode() { - return oldNode; - } - - public TDataNodeLocation getNewNode() { - return newNode; + public TDataNodeLocation getNewLocation() { + return newLocation; } } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/partition/UpdateRegionLocationPlan.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/partition/RemoveRegionLocationPlan.java similarity index 63% copy from iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/partition/UpdateRegionLocationPlan.java copy to iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/partition/RemoveRegionLocationPlan.java index 06c94b07394..9dc6a007052 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/partition/UpdateRegionLocationPlan.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/partition/RemoveRegionLocationPlan.java @@ -29,59 +29,45 @@ import java.io.DataOutputStream; import java.io.IOException; import java.nio.ByteBuffer; -public class UpdateRegionLocationPlan extends ConfigPhysicalPlan { - /*which region*/ +public class RemoveRegionLocationPlan extends ConfigPhysicalPlan { TConsensusGroupId regionId; - /*remove it from the region's location*/ - TDataNodeLocation oldNode; + TDataNodeLocation deprecatedLocation; - /*add it to the region's location*/ - TDataNodeLocation newNode; - - public UpdateRegionLocationPlan() { - super(ConfigPhysicalPlanType.UpdateRegionLocation); + public RemoveRegionLocationPlan() { + super(ConfigPhysicalPlanType.RemoveRegionLocation); } - /** - * Constructor. - * - * @param regionId update the region location - * @param oldNode remove the old location - * @param newNode add the new location - */ - public UpdateRegionLocationPlan( - TConsensusGroupId regionId, TDataNodeLocation oldNode, TDataNodeLocation newNode) { + public RemoveRegionLocationPlan( + TConsensusGroupId regionId, TDataNodeLocation deprecatedLocation) { this(); this.regionId = regionId; - this.oldNode = oldNode; - this.newNode = newNode; + this.deprecatedLocation = deprecatedLocation; } @Override protected void serializeImpl(DataOutputStream stream) throws IOException { stream.writeShort(getType().getPlanType()); ThriftCommonsSerDeUtils.serializeTConsensusGroupId(regionId, stream); - ThriftCommonsSerDeUtils.serializeTDataNodeLocation(oldNode, stream); - ThriftCommonsSerDeUtils.serializeTDataNodeLocation(newNode, stream); + ThriftCommonsSerDeUtils.serializeTDataNodeLocation(deprecatedLocation, stream); } @Override protected void deserializeImpl(ByteBuffer buffer) throws IOException { regionId = ThriftCommonsSerDeUtils.deserializeTConsensusGroupId(buffer); - oldNode = ThriftCommonsSerDeUtils.deserializeTDataNodeLocation(buffer); - newNode = ThriftCommonsSerDeUtils.deserializeTDataNodeLocation(buffer); + deprecatedLocation = ThriftCommonsSerDeUtils.deserializeTDataNodeLocation(buffer); } - public TConsensusGroupId getRegionId() { - return regionId; + @Override + public ConfigPhysicalPlanType getType() { + return super.getType(); } - public TDataNodeLocation getOldNode() { - return oldNode; + public TConsensusGroupId getRegionId() { + return regionId; } - public TDataNodeLocation getNewNode() { - return newNode; + public TDataNodeLocation getDeprecatedLocation() { + return deprecatedLocation; } } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/partition/UpdateRegionLocationPlan.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/partition/UpdateRegionLocationPlan.java index 06c94b07394..91b47d06200 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/partition/UpdateRegionLocationPlan.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/partition/UpdateRegionLocationPlan.java @@ -29,6 +29,7 @@ import java.io.DataOutputStream; import java.io.IOException; import java.nio.ByteBuffer; +@Deprecated public class UpdateRegionLocationPlan extends ConfigPhysicalPlan { /*which region*/ TConsensusGroupId regionId; diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java index 760de526241..4f45ea5587a 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java @@ -415,6 +415,7 @@ public class ConfigManager implements IManager { public TSStatus reportRegionMigrateResult(TRegionMigrateResultReportReq req) { TSStatus status = confirmLeader(); if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + // TODO: 这里需要修改report机制,改为向AddRegionPeerProcedure汇报 procedureManager.reportRegionMigrateResult(req); } return status; diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java index 3db3ece81f2..5d10d8f7871 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java @@ -74,8 +74,10 @@ import org.apache.iotdb.confignode.procedure.impl.schema.DeleteLogicalViewProced import org.apache.iotdb.confignode.procedure.impl.schema.DeleteTimeSeriesProcedure; import org.apache.iotdb.confignode.procedure.impl.schema.SetTemplateProcedure; import org.apache.iotdb.confignode.procedure.impl.schema.UnsetTemplateProcedure; +import org.apache.iotdb.confignode.procedure.impl.statemachine.AddRegionPeerProcedure; import org.apache.iotdb.confignode.procedure.impl.statemachine.CreateRegionGroupsProcedure; import org.apache.iotdb.confignode.procedure.impl.statemachine.RegionMigrateProcedure; +import org.apache.iotdb.confignode.procedure.impl.statemachine.RemoveRegionPeerProcedure; import org.apache.iotdb.confignode.procedure.impl.sync.AuthOperationProcedure; import org.apache.iotdb.confignode.procedure.impl.trigger.CreateTriggerProcedure; import org.apache.iotdb.confignode.procedure.impl.trigger.DropTriggerProcedure; @@ -1076,16 +1078,30 @@ public class ProcedureManager { } public void reportRegionMigrateResult(TRegionMigrateResultReportReq req) { + // TODO: ugly, will fix soon + this.executor + .getProcedures() + .values() + .forEach( + procedure1 -> { + if (procedure1 instanceof AddRegionPeerProcedure) { + AddRegionPeerProcedure procedure = (AddRegionPeerProcedure) procedure1; + if (procedure.getConsensusGroupId().equals(req.getRegionId())) { + procedure.notifyAddPeerFinished(req); + } + } + }); + // TODO: ugly, will fix soon this.executor .getProcedures() .values() .forEach( - procedure -> { - if (procedure instanceof RegionMigrateProcedure) { - RegionMigrateProcedure regionMigrateProcedure = (RegionMigrateProcedure) procedure; - if (regionMigrateProcedure.getConsensusGroupId().equals(req.getRegionId())) { - regionMigrateProcedure.notifyTheRegionMigrateFinished(req); + procedure1 -> { + if (procedure1 instanceof RemoveRegionPeerProcedure) { + RemoveRegionPeerProcedure procedure = (RemoveRegionPeerProcedure) procedure1; + if (procedure.getConsensusGroupId().equals(req.getRegionId())) { + procedure.notifyRemovePeerFinished(req); } } }); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java index c61f13043a1..1d3205e0f1c 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java @@ -51,9 +51,10 @@ import org.apache.iotdb.confignode.consensus.request.read.partition.GetTimeSlotL import org.apache.iotdb.confignode.consensus.request.read.region.GetRegionIdPlan; import org.apache.iotdb.confignode.consensus.request.read.region.GetRegionInfoListPlan; import org.apache.iotdb.confignode.consensus.request.write.database.PreDeleteDatabasePlan; +import org.apache.iotdb.confignode.consensus.request.write.partition.AddRegionLocationPlan; import org.apache.iotdb.confignode.consensus.request.write.partition.CreateDataPartitionPlan; import org.apache.iotdb.confignode.consensus.request.write.partition.CreateSchemaPartitionPlan; -import org.apache.iotdb.confignode.consensus.request.write.partition.UpdateRegionLocationPlan; +import org.apache.iotdb.confignode.consensus.request.write.partition.RemoveRegionLocationPlan; import org.apache.iotdb.confignode.consensus.request.write.region.CreateRegionGroupsPlan; import org.apache.iotdb.confignode.consensus.request.write.region.PollSpecificRegionMaintainTaskPlan; import org.apache.iotdb.confignode.consensus.response.partition.CountTimeSlotListResp; @@ -1000,13 +1001,18 @@ public class PartitionManager { return partitionInfo.isRegionGroupExisted(regionGroupId); } - /** - * Update region location. - * - * @param req UpdateRegionLocationReq - * @return TSStatus - */ - public TSStatus updateRegionLocation(UpdateRegionLocationPlan req) { + public TSStatus addRegionLocation(AddRegionLocationPlan req) { + try { + return getConsensusManager().write(req); + } catch (ConsensusException e) { + LOGGER.warn(CONSENSUS_WRITE_ERROR, e); + TSStatus res = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()); + res.setMessage(e.getMessage()); + return res; + } + } + + public TSStatus removeRegionLocation(RemoveRegionLocationPlan req) { try { return getConsensusManager().write(req); } catch (ConsensusException e) { diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java index 2c977b85165..61d398c6c55 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java @@ -68,9 +68,10 @@ import org.apache.iotdb.confignode.consensus.request.write.datanode.RemoveDataNo import org.apache.iotdb.confignode.consensus.request.write.datanode.UpdateDataNodePlan; import org.apache.iotdb.confignode.consensus.request.write.function.CreateFunctionPlan; import org.apache.iotdb.confignode.consensus.request.write.function.DropFunctionPlan; +import org.apache.iotdb.confignode.consensus.request.write.partition.AddRegionLocationPlan; import org.apache.iotdb.confignode.consensus.request.write.partition.CreateDataPartitionPlan; import org.apache.iotdb.confignode.consensus.request.write.partition.CreateSchemaPartitionPlan; -import org.apache.iotdb.confignode.consensus.request.write.partition.UpdateRegionLocationPlan; +import org.apache.iotdb.confignode.consensus.request.write.partition.RemoveRegionLocationPlan; import org.apache.iotdb.confignode.consensus.request.write.pipe.PipeEnrichedPlan; import org.apache.iotdb.confignode.consensus.request.write.pipe.plugin.CreatePipePluginPlan; import org.apache.iotdb.confignode.consensus.request.write.pipe.plugin.DropPipePluginPlan; @@ -394,8 +395,10 @@ public class ConfigPlanExecutor { return triggerInfo.updateTriggerLocation((UpdateTriggerLocationPlan) physicalPlan); case CreateSchemaTemplate: return clusterSchemaInfo.createSchemaTemplate((CreateSchemaTemplatePlan) physicalPlan); - case UpdateRegionLocation: - return partitionInfo.updateRegionLocation((UpdateRegionLocationPlan) physicalPlan); + case AddRegionLocation: + return partitionInfo.addRegionLocation((AddRegionLocationPlan) physicalPlan); + case RemoveRegionLocation: + return partitionInfo.removeRegionLocation((RemoveRegionLocationPlan) physicalPlan); case SetSchemaTemplate: return clusterSchemaInfo.setSchemaTemplate((SetSchemaTemplatePlan) physicalPlan); case PreSetSchemaTemplate: diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java index 5a4971b9699..253acaddd4c 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java @@ -526,13 +526,14 @@ public class DatabasePartitionTable { * @param oldNode old location, will remove it * @param newNode new location, will add it */ + @Deprecated public void updateRegionLocation( TConsensusGroupId regionId, TDataNodeLocation oldNode, TDataNodeLocation newNode) { addRegionNewLocation(regionId, newNode); - removeRegionOldLocation(regionId, oldNode); + removeRegionLocation(regionId, oldNode); } - private void addRegionNewLocation(TConsensusGroupId regionId, TDataNodeLocation node) { + void addRegionNewLocation(TConsensusGroupId regionId, TDataNodeLocation node) { RegionGroup regionGroup = regionGroupMap.get(regionId); if (regionGroup == null) { LOGGER.warn( @@ -553,7 +554,7 @@ public class DatabasePartitionTable { regionGroup.addRegionLocation(node); } - private void removeRegionOldLocation(TConsensusGroupId regionId, TDataNodeLocation node) { + void removeRegionLocation(TConsensusGroupId regionId, TDataNodeLocation node) { RegionGroup regionGroup = regionGroupMap.get(regionId); if (regionGroup == null) { LOGGER.warn( diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java index 16317152dee..72c314c7721 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/PartitionInfo.java @@ -40,9 +40,10 @@ import org.apache.iotdb.confignode.consensus.request.write.database.DatabaseSche import org.apache.iotdb.confignode.consensus.request.write.database.DeleteDatabasePlan; import org.apache.iotdb.confignode.consensus.request.write.database.PreDeleteDatabasePlan; import org.apache.iotdb.confignode.consensus.request.write.datanode.UpdateDataNodePlan; +import org.apache.iotdb.confignode.consensus.request.write.partition.AddRegionLocationPlan; import org.apache.iotdb.confignode.consensus.request.write.partition.CreateDataPartitionPlan; import org.apache.iotdb.confignode.consensus.request.write.partition.CreateSchemaPartitionPlan; -import org.apache.iotdb.confignode.consensus.request.write.partition.UpdateRegionLocationPlan; +import org.apache.iotdb.confignode.consensus.request.write.partition.RemoveRegionLocationPlan; import org.apache.iotdb.confignode.consensus.request.write.region.CreateRegionGroupsPlan; import org.apache.iotdb.confignode.consensus.request.write.region.OfferRegionMaintainTasksPlan; import org.apache.iotdb.confignode.consensus.request.write.region.PollSpecificRegionMaintainTaskPlan; @@ -559,24 +560,28 @@ public class PartitionInfo implements SnapshotProcessor { databasePartitionTable -> databasePartitionTable.containRegionGroup(regionGroupId)); } - /** - * Update the location info of given regionId. - * - * @param req UpdateRegionLocationReq - * @return {@link TSStatus} - */ - public TSStatus updateRegionLocation(UpdateRegionLocationPlan req) { - TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); - TConsensusGroupId regionId = req.getRegionId(); - TDataNodeLocation oldNode = req.getOldNode(); - TDataNodeLocation newNode = req.getNewNode(); + /** The region has expanded to a new DataNode, now update the databasePartitionTable. */ + public TSStatus addRegionLocation(AddRegionLocationPlan req) { databasePartitionTables.values().stream() - .filter(databasePartitionTable -> databasePartitionTable.containRegionGroup(regionId)) + .filter( + databasePartitionTable -> databasePartitionTable.containRegionGroup(req.getRegionId())) .forEach( databasePartitionTable -> - databasePartitionTable.updateRegionLocation(regionId, oldNode, newNode)); + databasePartitionTable.addRegionNewLocation( + req.getRegionId(), req.getNewLocation())); + return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); + } - return status; + /** The region is no longer located on a DataNode, now update the databasePartitionTable. */ + public TSStatus removeRegionLocation(RemoveRegionLocationPlan req) { + databasePartitionTables.values().stream() + .filter( + databasePartitionTable -> databasePartitionTable.containRegionGroup(req.getRegionId())) + .forEach( + databasePartitionTable -> + databasePartitionTable.removeRegionLocation( + req.getRegionId(), req.getDeprecatedLocation())); + return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); } /** diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java index 78218dea7db..95ccf3de64e 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java @@ -32,7 +32,8 @@ import org.apache.iotdb.confignode.client.sync.SyncDataNodeClientPool; import org.apache.iotdb.confignode.conf.ConfigNodeConfig; import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor; import org.apache.iotdb.confignode.consensus.request.write.datanode.RemoveDataNodePlan; -import org.apache.iotdb.confignode.consensus.request.write.partition.UpdateRegionLocationPlan; +import org.apache.iotdb.confignode.consensus.request.write.partition.AddRegionLocationPlan; +import org.apache.iotdb.confignode.consensus.request.write.partition.RemoveRegionLocationPlan; import org.apache.iotdb.confignode.consensus.response.datanode.DataNodeToStatusResp; import org.apache.iotdb.confignode.manager.ConfigManager; import org.apache.iotdb.confignode.manager.partition.PartitionMetrics; @@ -324,31 +325,39 @@ public class DataNodeRemoveHandler { return status; } - /** - * Update region location cache - * - * @param regionId region id - * @param originalDataNode old location data node - * @param destDataNode dest data node - */ - public void updateRegionLocationCache( - TConsensusGroupId regionId, - TDataNodeLocation originalDataNode, - TDataNodeLocation destDataNode) { + public void addRegionLocation(TConsensusGroupId regionId, TDataNodeLocation newLocation) { LOGGER.info( - "Start to updateRegionLocationCache {} location from {} to {} when it migrate succeed", + "AddRegionLocation started, add region {} to {}", regionId, - getIdWithRpcEndpoint(originalDataNode), - getIdWithRpcEndpoint(destDataNode)); - UpdateRegionLocationPlan req = - new UpdateRegionLocationPlan(regionId, originalDataNode, destDataNode); - TSStatus status = configManager.getPartitionManager().updateRegionLocation(req); + getIdWithRpcEndpoint(newLocation)); + AddRegionLocationPlan req = new AddRegionLocationPlan(regionId, newLocation); + TSStatus status = configManager.getPartitionManager().addRegionLocation(req); LOGGER.info( - "UpdateRegionLocationCache finished, region:{}, result:{}, old:{}, new:{}", + "AddRegionLocation finished, add region {} to {}, result is {}", regionId, - status, - getIdWithRpcEndpoint(originalDataNode), - getIdWithRpcEndpoint(destDataNode)); + getIdWithRpcEndpoint(newLocation), + status); + + // Remove the RegionGroupCache of the regionId + configManager.getLoadManager().removeRegionGroupCache(regionId); + + // Broadcast the latest RegionRouteMap when Region migration finished + configManager.getLoadManager().broadcastLatestRegionRouteMap(); + } + + public void removeRegionLocation( + TConsensusGroupId regionId, TDataNodeLocation deprecatedLocation) { + LOGGER.info( + "RemoveRegionLocation started, add region {} to {}", + regionId, + getIdWithRpcEndpoint(deprecatedLocation)); + RemoveRegionLocationPlan req = new RemoveRegionLocationPlan(regionId, deprecatedLocation); + TSStatus status = configManager.getPartitionManager().removeRegionLocation(req); + LOGGER.info( + "AddRegionLocation finished, add region {} to {}, result is {}", + regionId, + getIdWithRpcEndpoint(deprecatedLocation), + status); // Remove the RegionGroupCache of the regionId configManager.getLoadManager().removeRegionGroupCache(regionId); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/AddRegionPeerProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/AddRegionPeerProcedure.java new file mode 100644 index 00000000000..b5b9448f771 --- /dev/null +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/AddRegionPeerProcedure.java @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.confignode.procedure.impl.statemachine; + +import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; +import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; +import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.commons.exception.runtime.ThriftSerDeException; +import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils; +import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv; +import org.apache.iotdb.confignode.procedure.env.DataNodeRemoveHandler; +import org.apache.iotdb.confignode.procedure.exception.ProcedureException; +import org.apache.iotdb.confignode.procedure.exception.ProcedureSuspendedException; +import org.apache.iotdb.confignode.procedure.exception.ProcedureYieldException; +import org.apache.iotdb.confignode.procedure.state.AddRegionPeerState; +import org.apache.iotdb.confignode.procedure.store.ProcedureType; +import org.apache.iotdb.confignode.rpc.thrift.TRegionMigrateResultReportReq; +import org.apache.iotdb.rpc.TSStatusCode; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; + +import static org.apache.iotdb.commons.utils.FileUtils.logBreakpoint; +import static org.apache.iotdb.confignode.conf.ConfigNodeConstant.ADD_REGION_PEER_PROGRESS; +import static org.apache.iotdb.confignode.conf.ConfigNodeConstant.REGION_MIGRATE_PROCESS; +import static org.apache.iotdb.confignode.procedure.state.AddRegionPeerState.UPDATE_REGION_LOCATION_CACHE; +import static org.apache.iotdb.rpc.TSStatusCode.SUCCESS_STATUS; + +public class AddRegionPeerProcedure + extends StateMachineProcedure<ConfigNodeProcedureEnv, AddRegionPeerState> { + private static final Logger LOGGER = LoggerFactory.getLogger(AddRegionPeerProcedure.class); + private TConsensusGroupId consensusGroupId; + + private TDataNodeLocation coordinator; + + private TDataNodeLocation destDataNode; + + private boolean addRegionPeerSuccess = true; + private String addRegionPeerResult; + + private final Object addRegionPeerLock = new Object(); + + public AddRegionPeerProcedure() { + super(); + } + + public AddRegionPeerProcedure( + TConsensusGroupId consensusGroupId, + TDataNodeLocation coordinator, + TDataNodeLocation destDataNode) { + super(); + this.consensusGroupId = consensusGroupId; + this.coordinator = coordinator; + this.destDataNode = destDataNode; + } + + @Override + protected Flow executeFromState(ConfigNodeProcedureEnv env, AddRegionPeerState state) + throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException { + if (consensusGroupId == null) { + return Flow.NO_MORE_STATE; + } + DataNodeRemoveHandler handler = env.getDataNodeRemoveHandler(); + try { + switch (state) { + case CREATE_NEW_REGION_PEER: + handler.createNewRegionPeer(consensusGroupId, destDataNode); + logBreakpoint(state.name()); + setNextState(AddRegionPeerState.DO_ADD_REGION_PEER); + break; + case DO_ADD_REGION_PEER: + TSStatus tsStatus = handler.addRegionPeer(destDataNode, consensusGroupId, coordinator); + if (tsStatus.getCode() == SUCCESS_STATUS.getStatusCode()) { + waitForOneMigrationStepFinished(consensusGroupId, state); + } else { + throw new ProcedureException("ADD_REGION_PEER executed failed in DataNode"); + } + logBreakpoint(state.name()); + setNextState(UPDATE_REGION_LOCATION_CACHE); + break; + case UPDATE_REGION_LOCATION_CACHE: + handler.addRegionLocation(consensusGroupId, destDataNode); + return Flow.NO_MORE_STATE; + default: + throw new ProcedureException("Unsupported state: " + state.name()); + } + } catch (Exception e) { + return Flow.NO_MORE_STATE; + } + return Flow.HAS_MORE_STATE; + } + + // TODO: Clear all remaining information related to 'migrate' and 'migration' + public TSStatus waitForOneMigrationStepFinished( + TConsensusGroupId consensusGroupId, AddRegionPeerState state) throws Exception { + LOGGER.info( + "{}, Wait for state {} finished, regionId: {}", + REGION_MIGRATE_PROCESS, + state, + consensusGroupId); + + TSStatus status = new TSStatus(SUCCESS_STATUS.getStatusCode()); + synchronized (addRegionPeerLock) { + try { + addRegionPeerLock.wait(); + + if (!addRegionPeerSuccess) { + throw new ProcedureException( + String.format("Region migration failed, regionId: %s", consensusGroupId)); + } + } catch (InterruptedException e) { + LOGGER.error( + "{}, region migration {} interrupt", REGION_MIGRATE_PROCESS, consensusGroupId, e); + Thread.currentThread().interrupt(); + status.setCode(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode()); + status.setMessage("Waiting for region migration interruption," + e.getMessage()); + } + } + return status; + } + + public void notifyAddPeerFinished(TRegionMigrateResultReportReq req) { + + LOGGER.info( + "{}, ConfigNode received region migration result reported by DataNode: {}", + ADD_REGION_PEER_PROGRESS, + req); + + // TODO the req is used in roll back + synchronized (addRegionPeerLock) { + TSStatus migrateStatus = req.getMigrateResult(); + // Migration failed + if (migrateStatus.getCode() != SUCCESS_STATUS.getStatusCode()) { + LOGGER.info( + "{}, Region migration failed in DataNode, migrateStatus: {}", + ADD_REGION_PEER_PROGRESS, + migrateStatus); + addRegionPeerSuccess = false; + addRegionPeerResult = migrateStatus.toString(); + } + addRegionPeerLock.notifyAll(); + } + } + + @Override + protected boolean isRollbackSupported(AddRegionPeerState state) { + return false; + } + + @Override + protected void rollbackState( + ConfigNodeProcedureEnv configNodeProcedureEnv, AddRegionPeerState addRegionPeerState) + throws IOException, InterruptedException, ProcedureException {} + + @Override + protected AddRegionPeerState getState(int stateId) { + return AddRegionPeerState.values()[stateId]; + } + + @Override + protected int getStateId(AddRegionPeerState addRegionPeerState) { + return addRegionPeerState.ordinal(); + } + + @Override + protected AddRegionPeerState getInitialState() { + return AddRegionPeerState.CREATE_NEW_REGION_PEER; + } + + @Override + public void serialize(DataOutputStream stream) throws IOException { + stream.writeShort(ProcedureType.ADD_REGION_PEER_PROCEDURE.getTypeCode()); + super.serialize(stream); + ThriftCommonsSerDeUtils.serializeTConsensusGroupId(consensusGroupId, stream); + ThriftCommonsSerDeUtils.serializeTDataNodeLocation(destDataNode, stream); + ThriftCommonsSerDeUtils.serializeTDataNodeLocation(coordinator, stream); + } + + @Override + public void deserialize(ByteBuffer byteBuffer) { + super.deserialize(byteBuffer); + try { + consensusGroupId = ThriftCommonsSerDeUtils.deserializeTConsensusGroupId(byteBuffer); + destDataNode = ThriftCommonsSerDeUtils.deserializeTDataNodeLocation(byteBuffer); + coordinator = ThriftCommonsSerDeUtils.deserializeTDataNodeLocation(byteBuffer); + } catch (ThriftSerDeException e) { + LOGGER.error("Error in deserialize {}", this.getClass(), e); + } + } + + public TConsensusGroupId getConsensusGroupId() { + return consensusGroupId; + } + + public TDataNodeLocation getCoordinator() { + return coordinator; + } + + public TDataNodeLocation getDestDataNode() { + return destDataNode; + } +} diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/RegionMigrateProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/RegionMigrateProcedure.java index 30555f33fd5..35a51008ea6 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/RegionMigrateProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/RegionMigrateProcedure.java @@ -21,7 +21,6 @@ package org.apache.iotdb.confignode.procedure.impl.statemachine; import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; -import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.exception.runtime.ThriftSerDeException; import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils; import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv; @@ -30,8 +29,6 @@ import org.apache.iotdb.confignode.procedure.exception.ProcedureException; import org.apache.iotdb.confignode.procedure.state.ProcedureLockState; import org.apache.iotdb.confignode.procedure.state.RegionTransitionState; import org.apache.iotdb.confignode.procedure.store.ProcedureType; -import org.apache.iotdb.confignode.rpc.thrift.TRegionMigrateResultReportReq; -import org.apache.iotdb.rpc.TSStatusCode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,7 +41,6 @@ import java.util.Objects; import static org.apache.iotdb.commons.utils.FileUtils.logBreakpoint; import static org.apache.iotdb.confignode.conf.ConfigNodeConstant.REGION_MIGRATE_PROCESS; import static org.apache.iotdb.confignode.procedure.env.DataNodeRemoveHandler.getIdWithRpcEndpoint; -import static org.apache.iotdb.rpc.TSStatusCode.SUCCESS_STATUS; /** Region migrate procedure */ public class RegionMigrateProcedure @@ -52,7 +48,7 @@ public class RegionMigrateProcedure // TODO: Reach an agreement on RegionMigrateProcedure - private static final Logger LOG = LoggerFactory.getLogger(RegionMigrateProcedure.class); + private static final Logger LOGGER = LoggerFactory.getLogger(RegionMigrateProcedure.class); private static final int RETRY_THRESHOLD = 5; /** Wait region migrate finished */ @@ -94,27 +90,16 @@ public class RegionMigrateProcedure if (consensusGroupId == null) { return Flow.NO_MORE_STATE; } - TSStatus tsStatus; DataNodeRemoveHandler handler = env.getDataNodeRemoveHandler(); try { switch (state) { case REGION_MIGRATE_PREPARE: - logBreakpoint(state.name()); - setNextState(RegionTransitionState.CREATE_NEW_REGION_PEER); - break; - case CREATE_NEW_REGION_PEER: - handler.createNewRegionPeer(consensusGroupId, destDataNode); logBreakpoint(state.name()); setNextState(RegionTransitionState.ADD_REGION_PEER); break; case ADD_REGION_PEER: - tsStatus = handler.addRegionPeer(destDataNode, consensusGroupId, coordinatorForAddPeer); - if (tsStatus.getCode() == SUCCESS_STATUS.getStatusCode()) { - waitForOneMigrationStepFinished(consensusGroupId, state); - } else { - throw new ProcedureException("ADD_REGION_PEER executed failed in DataNode"); - } - logBreakpoint(state.name()); + addChildProcedure( + new AddRegionPeerProcedure(consensusGroupId, coordinatorForAddPeer, destDataNode)); setNextState(RegionTransitionState.CHANGE_REGION_LEADER); break; case CHANGE_REGION_LEADER: @@ -123,34 +108,17 @@ public class RegionMigrateProcedure setNextState(RegionTransitionState.REMOVE_REGION_PEER); break; case REMOVE_REGION_PEER: - tsStatus = - handler.removeRegionPeer( - originalDataNode, consensusGroupId, coordinatorForRemovePeer); - if (tsStatus.getCode() == SUCCESS_STATUS.getStatusCode()) { - waitForOneMigrationStepFinished(consensusGroupId, state); - } else { - throw new ProcedureException("REMOVE_REGION_PEER executed failed in DataNode"); - } - logBreakpoint(state.name()); - setNextState(RegionTransitionState.DELETE_OLD_REGION_PEER); - break; - case DELETE_OLD_REGION_PEER: - tsStatus = handler.deleteOldRegionPeer(originalDataNode, consensusGroupId); - if (tsStatus.getCode() == SUCCESS_STATUS.getStatusCode()) { - waitForOneMigrationStepFinished(consensusGroupId, state); - } - logBreakpoint(state.name()); - // Remove consensus group after a node stop, which will be failed, but we will - // continuously execute. - setNextState(RegionTransitionState.UPDATE_REGION_LOCATION_CACHE); - break; - case UPDATE_REGION_LOCATION_CACHE: - handler.updateRegionLocationCache(consensusGroupId, originalDataNode, destDataNode); - logBreakpoint(state.name()); + addChildProcedure( + new RemoveRegionPeerProcedure( + consensusGroupId, coordinatorForRemovePeer, originalDataNode)); + setNextState(RegionTransitionState.PROCEDURE_FINISH); + case PROCEDURE_FINISH: return Flow.NO_MORE_STATE; + default: + throw new ProcedureException("Unsupported state: " + state.name()); } } catch (Exception e) { - LOG.error( + LOGGER.error( "{}, Meets error in region migrate state, " + "please do the rollback operation yourself manually according to the error message!!! " + "error state: {}, migrateResult: {}", @@ -160,7 +128,7 @@ public class RegionMigrateProcedure if (isRollbackSupported(state)) { setFailure(new ProcedureException("Region migrate failed at state: " + state)); } else { - LOG.error( + LOGGER.error( "{}, Failed state [{}] is not support rollback, originalDataNode: {}", REGION_MIGRATE_PROCESS, state, @@ -192,12 +160,12 @@ public class RegionMigrateProcedure configNodeProcedureEnv.getSchedulerLock().lock(); try { if (configNodeProcedureEnv.getRegionMigrateLock().tryLock(this)) { - LOG.info("procedureId {} acquire lock.", getProcId()); + LOGGER.info("procedureId {} acquire lock.", getProcId()); return ProcedureLockState.LOCK_ACQUIRED; } configNodeProcedureEnv.getRegionMigrateLock().waitProcedure(this); - LOG.info("procedureId {} wait for lock.", getProcId()); + LOGGER.info("procedureId {} wait for lock.", getProcId()); return ProcedureLockState.LOCK_EVENT_WAIT; } finally { configNodeProcedureEnv.getSchedulerLock().unlock(); @@ -208,7 +176,7 @@ public class RegionMigrateProcedure protected void releaseLock(ConfigNodeProcedureEnv configNodeProcedureEnv) { configNodeProcedureEnv.getSchedulerLock().lock(); try { - LOG.info("procedureId {} release lock.", getProcId()); + LOGGER.info("procedureId {} release lock.", getProcId()); if (configNodeProcedureEnv.getRegionMigrateLock().releaseLock(this)) { configNodeProcedureEnv .getRegionMigrateLock() @@ -241,6 +209,8 @@ public class RegionMigrateProcedure ThriftCommonsSerDeUtils.serializeTDataNodeLocation(originalDataNode, stream); ThriftCommonsSerDeUtils.serializeTDataNodeLocation(destDataNode, stream); ThriftCommonsSerDeUtils.serializeTConsensusGroupId(consensusGroupId, stream); + ThriftCommonsSerDeUtils.serializeTDataNodeLocation(coordinatorForAddPeer, stream); + ThriftCommonsSerDeUtils.serializeTDataNodeLocation(coordinatorForRemovePeer, stream); } @Override @@ -250,8 +220,10 @@ public class RegionMigrateProcedure originalDataNode = ThriftCommonsSerDeUtils.deserializeTDataNodeLocation(byteBuffer); destDataNode = ThriftCommonsSerDeUtils.deserializeTDataNodeLocation(byteBuffer); consensusGroupId = ThriftCommonsSerDeUtils.deserializeTConsensusGroupId(byteBuffer); + coordinatorForAddPeer = ThriftCommonsSerDeUtils.deserializeTDataNodeLocation(byteBuffer); + coordinatorForRemovePeer = ThriftCommonsSerDeUtils.deserializeTDataNodeLocation(byteBuffer); } catch (ThriftSerDeException e) { - LOG.error("Error in deserialize RemoveConfigNodeProcedure", e); + LOGGER.error("Error in deserialize {}", this.getClass(), e); } } @@ -273,59 +245,6 @@ public class RegionMigrateProcedure return Objects.hash(this.originalDataNode, this.destDataNode, this.consensusGroupId); } - public TSStatus waitForOneMigrationStepFinished( - TConsensusGroupId consensusGroupId, RegionTransitionState state) throws Exception { - - LOG.info( - "{}, Wait for state {} finished, regionId: {}", - REGION_MIGRATE_PROCESS, - state, - consensusGroupId); - - TSStatus status = new TSStatus(SUCCESS_STATUS.getStatusCode()); - synchronized (regionMigrateLock) { - try { - // TODO set timeOut? - regionMigrateLock.wait(); - - if (!migrateSuccess) { - throw new ProcedureException( - String.format("Region migration failed, regionId: %s", consensusGroupId)); - } - } catch (InterruptedException e) { - LOG.error("{}, region migration {} interrupt", REGION_MIGRATE_PROCESS, consensusGroupId, e); - Thread.currentThread().interrupt(); - status.setCode(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode()); - status.setMessage("Waiting for region migration interruption," + e.getMessage()); - } - } - return status; - } - - /** DataNode report region migrate result to ConfigNode, and continue */ - public void notifyTheRegionMigrateFinished(TRegionMigrateResultReportReq req) { - - LOG.info( - "{}, ConfigNode received region migration result reported by DataNode: {}", - REGION_MIGRATE_PROCESS, - req); - - // TODO the req is used in roll back - synchronized (regionMigrateLock) { - TSStatus migrateStatus = req.getMigrateResult(); - // Migration failed - if (migrateStatus.getCode() != SUCCESS_STATUS.getStatusCode()) { - LOG.info( - "{}, Region migration failed in DataNode, migrateStatus: {}", - REGION_MIGRATE_PROCESS, - migrateStatus); - migrateSuccess = false; - migrateResult = migrateStatus.toString(); - } - regionMigrateLock.notifyAll(); - } - } - public TConsensusGroupId getConsensusGroupId() { return consensusGroupId; } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/RemoveRegionPeerProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/RemoveRegionPeerProcedure.java new file mode 100644 index 00000000000..f656653a113 --- /dev/null +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/RemoveRegionPeerProcedure.java @@ -0,0 +1,226 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.confignode.procedure.impl.statemachine; + +import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; +import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; +import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.commons.exception.runtime.ThriftSerDeException; +import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils; +import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv; +import org.apache.iotdb.confignode.procedure.env.DataNodeRemoveHandler; +import org.apache.iotdb.confignode.procedure.exception.ProcedureException; +import org.apache.iotdb.confignode.procedure.exception.ProcedureSuspendedException; +import org.apache.iotdb.confignode.procedure.exception.ProcedureYieldException; +import org.apache.iotdb.confignode.procedure.state.RemoveRegionPeerState; +import org.apache.iotdb.confignode.procedure.store.ProcedureType; +import org.apache.iotdb.confignode.rpc.thrift.TRegionMigrateResultReportReq; +import org.apache.iotdb.rpc.TSStatusCode; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; + +import static org.apache.iotdb.commons.utils.FileUtils.logBreakpoint; +import static org.apache.iotdb.confignode.conf.ConfigNodeConstant.REGION_MIGRATE_PROCESS; +import static org.apache.iotdb.confignode.procedure.state.RemoveRegionPeerState.DELETE_OLD_REGION_PEER; +import static org.apache.iotdb.confignode.procedure.state.RemoveRegionPeerState.REMOVE_REGION_LOCATION_CACHE; +import static org.apache.iotdb.rpc.TSStatusCode.SUCCESS_STATUS; + +public class RemoveRegionPeerProcedure + extends StateMachineProcedure<ConfigNodeProcedureEnv, RemoveRegionPeerState> { + private static final Logger LOGGER = LoggerFactory.getLogger(RemoveRegionPeerProcedure.class); + private TConsensusGroupId consensusGroupId; + private TDataNodeLocation coordinator; + private TDataNodeLocation targetDataNode; + + private boolean removeRegionPeerSuccess = true; + private String removeRegionPeerResult; + private final Object removeRegionPeerLock = new Object(); + + public RemoveRegionPeerProcedure() { + super(); + } + + public RemoveRegionPeerProcedure( + TConsensusGroupId consensusGroupId, + TDataNodeLocation coordinator, + TDataNodeLocation targetDataNode) { + super(); + this.consensusGroupId = consensusGroupId; + this.coordinator = coordinator; + this.targetDataNode = targetDataNode; + } + + @Override + protected Flow executeFromState(ConfigNodeProcedureEnv env, RemoveRegionPeerState state) + throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException { + if (consensusGroupId == null) { + return Flow.NO_MORE_STATE; + } + TSStatus tsStatus; + DataNodeRemoveHandler handler = env.getDataNodeRemoveHandler(); + try { + switch (state) { + case REMOVE_REGION_PEER: + tsStatus = handler.removeRegionPeer(targetDataNode, consensusGroupId, coordinator); + if (tsStatus.getCode() == SUCCESS_STATUS.getStatusCode()) { + waitForOneMigrationStepFinished(consensusGroupId, state); + } else { + throw new ProcedureException("REMOVE_REGION_PEER executed failed in DataNode"); + } + logBreakpoint(state.name()); + setNextState(DELETE_OLD_REGION_PEER); + break; + case DELETE_OLD_REGION_PEER: + tsStatus = handler.deleteOldRegionPeer(targetDataNode, consensusGroupId); + if (tsStatus.getCode() == SUCCESS_STATUS.getStatusCode()) { + waitForOneMigrationStepFinished(consensusGroupId, state); + } + logBreakpoint(state.name()); + // Remove consensus group after a node stop, which will be failed, but we will + // continuously execute. + setNextState(REMOVE_REGION_LOCATION_CACHE); + break; + case REMOVE_REGION_LOCATION_CACHE: + handler.removeRegionLocation(consensusGroupId, targetDataNode); + logBreakpoint(state.name()); + return Flow.NO_MORE_STATE; + default: + throw new ProcedureException("Unsupported state: " + state.name()); + } + } catch (Exception e) { + return Flow.NO_MORE_STATE; + } + return Flow.HAS_MORE_STATE; + } + + public TSStatus waitForOneMigrationStepFinished( + TConsensusGroupId consensusGroupId, RemoveRegionPeerState state) throws Exception { + + LOGGER.info( + "{}, Wait for state {} finished, regionId: {}", + REGION_MIGRATE_PROCESS, + state, + consensusGroupId); + + TSStatus status = new TSStatus(SUCCESS_STATUS.getStatusCode()); + synchronized (removeRegionPeerLock) { + try { + // TODO set timeOut? + removeRegionPeerLock.wait(); + + if (!removeRegionPeerSuccess) { + throw new ProcedureException( + String.format("Region migration failed, regionId: %s", consensusGroupId)); + } + } catch (InterruptedException e) { + LOGGER.error( + "{}, region migration {} interrupt", REGION_MIGRATE_PROCESS, consensusGroupId, e); + Thread.currentThread().interrupt(); + status.setCode(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode()); + status.setMessage("Waiting for region migration interruption," + e.getMessage()); + } + } + return status; + } + + public void notifyRemovePeerFinished(TRegionMigrateResultReportReq req) { + LOGGER.info( + "{}, ConfigNode received region migration result reported by DataNode: {}", + REGION_MIGRATE_PROCESS, + req); + + // TODO the req is used in roll back + synchronized (removeRegionPeerLock) { + TSStatus migrateStatus = req.getMigrateResult(); + // Migration failed + if (migrateStatus.getCode() != SUCCESS_STATUS.getStatusCode()) { + LOGGER.info( + "{}, Region migration failed in DataNode, migrateStatus: {}", + REGION_MIGRATE_PROCESS, + migrateStatus); + removeRegionPeerSuccess = false; + removeRegionPeerResult = migrateStatus.toString(); + } + removeRegionPeerLock.notifyAll(); + } + } + + @Override + protected void rollbackState(ConfigNodeProcedureEnv env, RemoveRegionPeerState state) + throws IOException, InterruptedException, ProcedureException {} + + @Override + protected boolean isRollbackSupported(RemoveRegionPeerState state) { + return false; + } + + @Override + protected RemoveRegionPeerState getState(int stateId) { + return RemoveRegionPeerState.values()[stateId]; + } + + @Override + protected int getStateId(RemoveRegionPeerState RemoveRegionPeerState) { + return RemoveRegionPeerState.ordinal(); + } + + @Override + protected RemoveRegionPeerState getInitialState() { + return RemoveRegionPeerState.REMOVE_REGION_PEER; + } + + @Override + public void serialize(DataOutputStream stream) throws IOException { + stream.writeShort(ProcedureType.REMOVE_REGION_PEER_PROCEDURE.getTypeCode()); + super.serialize(stream); + ThriftCommonsSerDeUtils.serializeTConsensusGroupId(consensusGroupId, stream); + ThriftCommonsSerDeUtils.serializeTDataNodeLocation(targetDataNode, stream); + ThriftCommonsSerDeUtils.serializeTDataNodeLocation(coordinator, stream); + } + + @Override + public void deserialize(ByteBuffer byteBuffer) { + super.deserialize(byteBuffer); + try { + consensusGroupId = ThriftCommonsSerDeUtils.deserializeTConsensusGroupId(byteBuffer); + targetDataNode = ThriftCommonsSerDeUtils.deserializeTDataNodeLocation(byteBuffer); + coordinator = ThriftCommonsSerDeUtils.deserializeTDataNodeLocation(byteBuffer); + } catch (ThriftSerDeException e) { + LOGGER.error("Error in deserialize {}", this.getClass(), e); + } + } + + public TConsensusGroupId getConsensusGroupId() { + return consensusGroupId; + } + + public TDataNodeLocation getCoordinator() { + return coordinator; + } + + public TDataNodeLocation getTargetDataNode() { + return targetDataNode; + } +} diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/RegionTransitionState.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/AddRegionPeerState.java similarity index 82% copy from iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/RegionTransitionState.java copy to iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/AddRegionPeerState.java index a21a6af941b..0e3477626b6 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/RegionTransitionState.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/AddRegionPeerState.java @@ -19,12 +19,8 @@ package org.apache.iotdb.confignode.procedure.state; -public enum RegionTransitionState { - REGION_MIGRATE_PREPARE, +public enum AddRegionPeerState { CREATE_NEW_REGION_PEER, - ADD_REGION_PEER, - CHANGE_REGION_LEADER, - REMOVE_REGION_PEER, - DELETE_OLD_REGION_PEER, - UPDATE_REGION_LOCATION_CACHE + DO_ADD_REGION_PEER, + UPDATE_REGION_LOCATION_CACHE, } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/RegionTransitionState.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/RegionTransitionState.java index a21a6af941b..01096319c05 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/RegionTransitionState.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/RegionTransitionState.java @@ -21,10 +21,8 @@ package org.apache.iotdb.confignode.procedure.state; public enum RegionTransitionState { REGION_MIGRATE_PREPARE, - CREATE_NEW_REGION_PEER, ADD_REGION_PEER, CHANGE_REGION_LEADER, REMOVE_REGION_PEER, - DELETE_OLD_REGION_PEER, - UPDATE_REGION_LOCATION_CACHE + PROCEDURE_FINISH, } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/RegionTransitionState.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/RemoveRegionPeerState.java similarity index 84% copy from iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/RegionTransitionState.java copy to iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/RemoveRegionPeerState.java index a21a6af941b..f4b9002193a 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/RegionTransitionState.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/RemoveRegionPeerState.java @@ -19,12 +19,8 @@ package org.apache.iotdb.confignode.procedure.state; -public enum RegionTransitionState { - REGION_MIGRATE_PREPARE, - CREATE_NEW_REGION_PEER, - ADD_REGION_PEER, - CHANGE_REGION_LEADER, +public enum RemoveRegionPeerState { REMOVE_REGION_PEER, DELETE_OLD_REGION_PEER, - UPDATE_REGION_LOCATION_CACHE + REMOVE_REGION_LOCATION_CACHE, } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java index 0ad156e8253..3033b6046ca 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java @@ -42,8 +42,10 @@ import org.apache.iotdb.confignode.procedure.impl.schema.DeleteLogicalViewProced import org.apache.iotdb.confignode.procedure.impl.schema.DeleteTimeSeriesProcedure; import org.apache.iotdb.confignode.procedure.impl.schema.SetTemplateProcedure; import org.apache.iotdb.confignode.procedure.impl.schema.UnsetTemplateProcedure; +import org.apache.iotdb.confignode.procedure.impl.statemachine.AddRegionPeerProcedure; import org.apache.iotdb.confignode.procedure.impl.statemachine.CreateRegionGroupsProcedure; import org.apache.iotdb.confignode.procedure.impl.statemachine.RegionMigrateProcedure; +import org.apache.iotdb.confignode.procedure.impl.statemachine.RemoveRegionPeerProcedure; import org.apache.iotdb.confignode.procedure.impl.sync.AuthOperationProcedure; import org.apache.iotdb.confignode.procedure.impl.sync.CreatePipeProcedure; import org.apache.iotdb.confignode.procedure.impl.sync.DropPipeProcedure; @@ -89,6 +91,12 @@ public class ProcedureFactory implements IProcedureFactory { case REGION_MIGRATE_PROCEDURE: procedure = new RegionMigrateProcedure(); break; + case ADD_REGION_PEER_PROCEDURE: + procedure = new AddRegionPeerProcedure(); + break; + case REMOVE_REGION_PEER_PROCEDURE: + procedure = new RemoveRegionPeerProcedure(); + break; case CREATE_REGION_GROUPS: procedure = new CreateRegionGroupsProcedure(); break; @@ -188,6 +196,10 @@ public class ProcedureFactory implements IProcedureFactory { return ProcedureType.REMOVE_DATA_NODE_PROCEDURE; } else if (procedure instanceof RegionMigrateProcedure) { return ProcedureType.REGION_MIGRATE_PROCEDURE; + } else if (procedure instanceof AddRegionPeerProcedure) { + return ProcedureType.ADD_REGION_PEER_PROCEDURE; + } else if (procedure instanceof RemoveRegionPeerProcedure) { + return ProcedureType.REMOVE_REGION_PEER_PROCEDURE; } else if (procedure instanceof CreateRegionGroupsProcedure) { return ProcedureType.CREATE_REGION_GROUPS; } else if (procedure instanceof DeleteTimeSeriesProcedure) { diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureType.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureType.java index f7e7c68a584..b7a53e81ba3 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureType.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureType.java @@ -35,6 +35,8 @@ public enum ProcedureType { REGION_MIGRATE_PROCEDURE((short) 201), CREATE_REGION_GROUPS((short) 202), CREATE_MANY_DATABASES_PROCEDURE((short) 203), + ADD_REGION_PEER_PROCEDURE((short) 204), + REMOVE_REGION_PEER_PROCEDURE((short) 205), /** Timeseries */ DELETE_TIMESERIES_PROCEDURE((short) 300),
