This is an automated email from the ASF dual-hosted git repository.
caogaofei pushed a commit to branch beyyes/master2
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/beyyes/master2 by this push:
new 1e106c97fe fix
1e106c97fe is described below
commit 1e106c97fe6ecbf9157e858b43843b755602d4a0
Author: Beyyes <[email protected]>
AuthorDate: Wed Nov 9 15:09:41 2022 +0800
fix
---
.../client/sync/SyncDataNodeClientPool.java | 24 ++++++-------
.../manager/load/balancer/RouteBalancer.java | 5 +++
.../procedure/env/DataNodeRemoveHandler.java | 40 +++++++++++++++++++---
.../impl/statemachine/RegionMigrateProcedure.java | 19 +++++-----
4 files changed, 61 insertions(+), 27 deletions(-)
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncDataNodeClientPool.java
b/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncDataNodeClientPool.java
index f70201db65..caa8528b4c 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncDataNodeClientPool.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncDataNodeClientPool.java
@@ -84,7 +84,7 @@ public class SyncDataNodeClientPool {
}
public TSStatus sendSyncRequestToDataNodeWithGivenRetry(
- TEndPoint endPoint, Object req, DataNodeRequestType requestType, int
retryNum) {
+ TEndPoint endPoint, Object req, DataNodeRequestType requestType, int
retryNum) {
Throwable lastException = new TException();
for (int retry = 0; retry < retryNum; retry++) {
try (SyncDataNodeInternalServiceClient client =
clientManager.borrowClient(endPoint)) {
@@ -92,22 +92,22 @@ public class SyncDataNodeClientPool {
} catch (TException | IOException e) {
lastException = e;
LOGGER.warn(
- "{} failed on DataNode {}, because {}, retrying {}...",
- requestType,
- endPoint,
- e.getMessage(),
- retry);
+ "{} failed on DataNode {}, because {}, retrying {}...",
+ requestType,
+ endPoint,
+ e.getMessage(),
+ retry);
doRetryWait(retry);
}
}
LOGGER.error("{} failed on DataNode {}", requestType, endPoint,
lastException);
return new TSStatus(TSStatusCode.ALL_RETRY_FAILED.getStatusCode())
- .setMessage("All retry failed due to: " +
lastException.getMessage());
+ .setMessage("All retry failed due to: " + lastException.getMessage());
}
- private TSStatus executeSyncRequest(DataNodeRequestType requestType,
- SyncDataNodeInternalServiceClient client,
- Object req) throws TException {
+ private TSStatus executeSyncRequest(
+ DataNodeRequestType requestType, SyncDataNodeInternalServiceClient
client, Object req)
+ throws TException {
switch (requestType) {
case INVALIDATE_PARTITION_CACHE:
return client.invalidatePartitionCache((TInvalidateCacheReq) req);
@@ -139,7 +139,7 @@ public class SyncDataNodeClientPool {
return client.deleteOldRegionPeer((TMaintainPeerReq) req);
default:
return RpcUtils.getStatus(
- TSStatusCode.EXECUTE_STATEMENT_ERROR, "Unknown request type: "
+ requestType);
+ TSStatusCode.EXECUTE_STATEMENT_ERROR, "Unknown request type: " +
requestType);
}
}
@@ -162,7 +162,7 @@ public class SyncDataNodeClientPool {
*/
public TSStatus changeRegionLeader(
TConsensusGroupId regionId, TEndPoint dataNode, TDataNodeLocation
newLeaderNode) {
- LOGGER.info("send RPC to data node: {} for changing regions leader on it",
dataNode);
+ LOGGER.info("Send RPC to data node: {} for changing regions leader on it",
dataNode);
TSStatus status;
try (SyncDataNodeInternalServiceClient client =
clientManager.borrowClient(dataNode)) {
TRegionLeaderChangeReq req = new TRegionLeaderChangeReq(regionId,
newLeaderNode);
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
index e9d21891a8..76b37dff45 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
@@ -313,6 +313,11 @@ public class RouteBalancer {
}
}
+ public void changeLeaderForMultiLeaderConsensus(
+ TConsensusGroupId regionGroupId, int newLeaderId) {
+ regionRouteMap.setLeader(regionGroupId, newLeaderId);
+ }
+
private void changeRegionLeader(
String consensusProtocolClass,
AtomicInteger requestId,
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java
index 525d3fb783..394ed19a59 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java
@@ -38,6 +38,7 @@ import
org.apache.iotdb.confignode.manager.node.heartbeat.BaseNodeCache;
import org.apache.iotdb.confignode.persistence.node.NodeInfo;
import org.apache.iotdb.confignode.procedure.scheduler.LockQueue;
import org.apache.iotdb.consensus.ConsensusFactory;
+import org.apache.iotdb.consensus.multileader.MultiLeaderConsensus;
import org.apache.iotdb.mpp.rpc.thrift.TCreatePeerReq;
import org.apache.iotdb.mpp.rpc.thrift.TDisableDataNodeReq;
import org.apache.iotdb.mpp.rpc.thrift.TMaintainPeerReq;
@@ -49,10 +50,13 @@ import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
import static
org.apache.iotdb.confignode.conf.ConfigNodeConstant.REMOVE_DATANODE_PROCESS;
+import static
org.apache.iotdb.consensus.ConsensusFactory.MULTI_LEADER_CONSENSUS;
+import static org.apache.iotdb.consensus.ConsensusFactory.SIMPLE_CONSENSUS;
public class DataNodeRemoveHandler {
private static final Logger LOGGER =
LoggerFactory.getLogger(DataNodeRemoveHandler.class);
@@ -544,15 +548,43 @@ public class DataNodeRemoveHandler {
configManager.getConsensusManager().write(new
RemoveDataNodePlan(removeDataNodes));
}
- public void changeRegionLeader(TConsensusGroupId regionId, TDataNodeLocation
originalDataNode) {
+ /**
+ * Change the leader of given Region
+ *
+ * @param regionId The region to be migrated
+ * @param originalDataNode The DataNode where the region locates
+ * @param migrateDestDataNode The DataNode where the region is to be migrated
+ */
+ public void changeRegionLeader(TConsensusGroupId regionId,
+ TDataNodeLocation originalDataNode,
+ TDataNodeLocation migrateDestDataNode) {
Optional<TDataNodeLocation> newLeaderNode =
filterDataNodeWithOtherRegionReplica(regionId, originalDataNode);
+
+ if (TConsensusGroupType.DataRegion.equals(regionId.getType()) &&
+
MULTI_LEADER_CONSENSUS.equals(CONF.getDataRegionConsensusProtocolClass())) {
+ if (CONF.getDataReplicationFactor() == 1) {
+ configManager.getLoadManager().getRouteBalancer().
+ changeLeaderForMultiLeaderConsensus(regionId,
migrateDestDataNode.getDataNodeId());
+ } else if (newLeaderNode.isPresent()) {
+ configManager.getLoadManager().getRouteBalancer().
+ changeLeaderForMultiLeaderConsensus(regionId,
newLeaderNode.get().getDataNodeId());
+ }
+ LOGGER.info(
+ "{}, Change region leader finished for MULTI_LEADER_CONSENSUS,
regionId: {}, newLeaderNode: {}",
+ REMOVE_DATANODE_PROCESS,
+ regionId,
+ newLeaderNode);
+
+ return;
+ }
+
if (newLeaderNode.isPresent()) {
SyncDataNodeClientPool.getInstance()
.changeRegionLeader(
regionId, originalDataNode.getInternalEndPoint(),
newLeaderNode.get());
LOGGER.info(
- "{}, Change region leader finished, regionId: {}, newLeaderNode: {}",
+ "{}, Change region leader finished for RATIS_CONSENSUS, regionId:
{}, newLeaderNode: {}",
REMOVE_DATANODE_PROCESS,
regionId,
newLeaderNode);
@@ -610,8 +642,8 @@ public class DataNodeRemoveHandler {
*/
private TSStatus checkClusterProtocol() {
TSStatus status = new
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
- if
(CONF.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.SIMPLE_CONSENSUS)
- ||
CONF.getSchemaRegionConsensusProtocolClass().equals(ConsensusFactory.SIMPLE_CONSENSUS))
{
+ if (CONF.getDataRegionConsensusProtocolClass().equals(SIMPLE_CONSENSUS)
+ ||
CONF.getSchemaRegionConsensusProtocolClass().equals(SIMPLE_CONSENSUS)) {
status.setCode(TSStatusCode.REMOVE_DATANODE_FAILED.getStatusCode());
status.setMessage("SimpleConsensus protocol is not supported to remove
data node");
}
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/RegionMigrateProcedure.java
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/RegionMigrateProcedure.java
index 27e0d120b0..f15ed67489 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/RegionMigrateProcedure.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/RegionMigrateProcedure.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.exception.runtime.ThriftSerDeException;
import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils;
import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
+import org.apache.iotdb.confignode.procedure.env.DataNodeRemoveHandler;
import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
import org.apache.iotdb.confignode.procedure.state.ProcedureLockState;
import org.apache.iotdb.confignode.procedure.state.RegionTransitionState;
@@ -82,17 +83,18 @@ public class RegionMigrateProcedure
return Flow.NO_MORE_STATE;
}
TSStatus tsStatus;
+ DataNodeRemoveHandler handler = env.getDataNodeRemoveHandler();
try {
switch (state) {
case REGION_MIGRATE_PREPARE:
setNextState(RegionTransitionState.CREATE_NEW_REGION_PEER);
break;
case CREATE_NEW_REGION_PEER:
- env.getDataNodeRemoveHandler().createNewRegionPeer(consensusGroupId,
destDataNode);
+ handler.createNewRegionPeer(consensusGroupId, destDataNode);
setNextState(RegionTransitionState.ADD_REGION_PEER);
break;
case ADD_REGION_PEER:
- tsStatus =
env.getDataNodeRemoveHandler().addRegionPeer(destDataNode, consensusGroupId);
+ tsStatus = handler.addRegionPeer(destDataNode, consensusGroupId);
if (tsStatus.getCode() == SUCCESS_STATUS.getStatusCode()) {
waitForOneMigrationStepFinished(consensusGroupId, state);
} else {
@@ -101,13 +103,11 @@ public class RegionMigrateProcedure
setNextState(RegionTransitionState.CHANGE_REGION_LEADER);
break;
case CHANGE_REGION_LEADER:
- env.getDataNodeRemoveHandler().changeRegionLeader(consensusGroupId,
originalDataNode);
+ handler.changeRegionLeader(consensusGroupId, originalDataNode,
destDataNode);
setNextState(RegionTransitionState.REMOVE_REGION_PEER);
break;
case REMOVE_REGION_PEER:
- tsStatus =
- env.getDataNodeRemoveHandler()
- .removeRegionPeer(originalDataNode, destDataNode,
consensusGroupId);
+ tsStatus = handler.removeRegionPeer(originalDataNode, destDataNode,
consensusGroupId);
if (tsStatus.getCode() == SUCCESS_STATUS.getStatusCode()) {
waitForOneMigrationStepFinished(consensusGroupId, state);
} else {
@@ -116,9 +116,7 @@ public class RegionMigrateProcedure
setNextState(RegionTransitionState.DELETE_OLD_REGION_PEER);
break;
case DELETE_OLD_REGION_PEER:
- tsStatus =
- env.getDataNodeRemoveHandler()
- .deleteOldRegionPeer(originalDataNode, consensusGroupId);
+ tsStatus = handler.deleteOldRegionPeer(originalDataNode,
consensusGroupId);
if (tsStatus.getCode() == SUCCESS_STATUS.getStatusCode()) {
waitForOneMigrationStepFinished(consensusGroupId, state);
}
@@ -127,8 +125,7 @@ public class RegionMigrateProcedure
setNextState(RegionTransitionState.UPDATE_REGION_LOCATION_CACHE);
break;
case UPDATE_REGION_LOCATION_CACHE:
- env.getDataNodeRemoveHandler()
- .updateRegionLocationCache(consensusGroupId, originalDataNode,
destDataNode);
+ handler.updateRegionLocationCache(consensusGroupId,
originalDataNode, destDataNode);
return Flow.NO_MORE_STATE;
}
} catch (Exception e) {