This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 9af414b2d70 Add retry for inactivePeer of region migration (#15294)
(#15303)
9af414b2d70 is described below
commit 9af414b2d70817049a1f17d6071baa248ab03912
Author: Li Yu Heng <[email protected]>
AuthorDate: Wed Apr 9 14:15:16 2025 +0800
Add retry for inactivePeer of region migration (#15294) (#15303)
* done
* fix
(cherry picked from commit b1a0dee36e7d87a436010faf5af4add3e298b963)
---
.../apache/iotdb/consensus/iot/IoTConsensus.java | 4 +-
.../consensus/iot/IoTConsensusServerImpl.java | 44 ++++++++++++++--------
2 files changed, 30 insertions(+), 18 deletions(-)
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
index e3a356bdb1b..d15d6e365a7 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
@@ -331,7 +331,7 @@ public class IoTConsensus implements IConsensus {
try {
// step 1: inactive new Peer to prepare for following steps
logger.info("[IoTConsensus] inactivate new peer: {}", peer);
- impl.inactivePeer(peer, false);
+ impl.inactivatePeer(peer, false);
// step 2: notify all the other Peers to build the sync connection to
newPeer
logger.info("[IoTConsensus] notify current peers to build sync
log...");
@@ -397,7 +397,7 @@ public class IoTConsensus implements IConsensus {
try {
// let target peer reject new write
- impl.inactivePeer(peer, true);
+ impl.inactivatePeer(peer, true);
KillPoint.setKillPoint(IoTConsensusRemovePeerCoordinatorKillPoints.AFTER_INACTIVE_PEER);
// wait its SyncLog to complete
impl.waitTargetPeerUntilSyncLogCompleted(peer);
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
index a13a19dde8d..dab767bc9da 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
@@ -413,26 +413,38 @@ public class IoTConsensusServerImpl {
R apply(T t) throws Exception;
}
- public void inactivePeer(Peer peer, boolean forDeletionPurpose)
+ public void inactivatePeer(Peer peer, boolean forDeletionPurpose)
throws ConsensusGroupModifyPeerException {
- try (SyncIoTConsensusServiceClient client =
- syncClientManager.borrowClient(peer.getEndpoint())) {
- try {
- TInactivatePeerRes res =
- client.inactivatePeer(
- new
TInactivatePeerReq(peer.getGroupId().convertToTConsensusGroupId())
- .setForDeletionPurpose(forDeletionPurpose));
- if (!isSuccess(res.status)) {
- throw new ConsensusGroupModifyPeerException(
- String.format("error when inactivating %s. %s", peer,
res.getStatus()));
+ ConsensusGroupModifyPeerException lastException = null;
+ // In region migration, if the target node restarts before the
“addRegionPeer” phase within 1
+ // minutes,
+ // the client in the ClientManager will become invalid.
+ // This PR adds 1 retry at this point to ensure that region migration can
still proceed
+ // correctly in such cases.
+ for (int i = 0; i < 2; i++) {
+ try (SyncIoTConsensusServiceClient client =
+ syncClientManager.borrowClient(peer.getEndpoint())) {
+ try {
+ TInactivatePeerRes res =
+ client.inactivatePeer(
+ new
TInactivatePeerReq(peer.getGroupId().convertToTConsensusGroupId())
+ .setForDeletionPurpose(forDeletionPurpose));
+ if (isSuccess(res.status)) {
+ return;
+ }
+ lastException =
+ new ConsensusGroupModifyPeerException(
+ String.format("error when inactivating %s. %s", peer,
res.getStatus()));
+ } catch (Exception e) {
+ lastException =
+ new ConsensusGroupModifyPeerException(
+ String.format("error when inactivating %s", peer), e);
}
- } catch (Exception e) {
- throw new ConsensusGroupModifyPeerException(
- String.format("error when inactivating %s", peer), e);
+ } catch (ClientManagerException e) {
+ lastException = new ConsensusGroupModifyPeerException(e);
}
- } catch (ClientManagerException e) {
- throw new ConsensusGroupModifyPeerException(e);
}
+ throw lastException;
}
public void triggerSnapshotLoad(Peer peer) throws
ConsensusGroupModifyPeerException {