This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new ef54f795902 [Region Migration] Wait for all region-related resources 
to be released before deleting (#13148)
ef54f795902 is described below

commit ef54f795902bc27bc068c4410c950c0446141ff8
Author: Xiangpeng Hu <[email protected]>
AuthorDate: Wed Aug 14 15:28:50 2024 +0800

    [Region Migration] Wait for all region-related resources to be released 
before deleting (#13148)
    
    * add waitUserPipeAllowRemovePeer
    
    * rollback conf
    
    * change name
    
    * fix comment
    
    * refactor
    
    * refactor
---
 .../org/apache/iotdb/consensus/IStateMachine.java  |  9 +++++
 .../apache/iotdb/consensus/iot/IoTConsensus.java   |  2 ++
 .../consensus/iot/IoTConsensusServerImpl.java      | 39 ++++++++++++++++++++++
 .../service/IoTConsensusRPCServiceProcessor.java   | 21 ++++++++++--
 .../dataregion/DataRegionStateMachine.java         | 12 +++++++
 .../src/main/thrift/iotconsensus.thrift            |  9 +++++
 6 files changed, 90 insertions(+), 2 deletions(-)

diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/IStateMachine.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/IStateMachine.java
index d9db3a836b2..bad1c4d9b7e 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/IStateMachine.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/IStateMachine.java
@@ -67,6 +67,15 @@ public interface IStateMachine {
    */
   DataSet read(IConsensusRequest request);
 
+  /**
+   * Release all resources related to the region. Currently, we only check 
pipe related resources.
+   *
+   * @return true if all resources are released successfully
+   */
+  default boolean hasReleaseAllRegionRelatedResource() {
+    return true;
+  }
+
   /**
    * Take a snapshot of current statemachine. All files are required to be 
stored under snapshotDir,
    * which is a subdirectory of the StorageDir in Consensus
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
index b659db9e3db..89894a4c01a 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
@@ -368,6 +368,8 @@ public class IoTConsensus implements IConsensus {
       
KillPoint.setKillPoint(IoTConsensusRemovePeerCoordinatorKillPoints.AFTER_INACTIVE_PEER);
       // wait its SyncLog to complete
       impl.waitTargetPeerUntilSyncLogCompleted(peer);
+      // wait its region related resource to release
+      impl.waitReleaseAllRegionRelatedResource(peer);
     } catch (ConsensusGroupModifyPeerException e) {
       throw new ConsensusException(e.getMessage());
     }
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
index 6ef3f276585..c077e978ea6 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
@@ -59,6 +59,8 @@ import 
org.apache.iotdb.consensus.iot.thrift.TSendSnapshotFragmentReq;
 import org.apache.iotdb.consensus.iot.thrift.TSendSnapshotFragmentRes;
 import org.apache.iotdb.consensus.iot.thrift.TTriggerSnapshotLoadReq;
 import org.apache.iotdb.consensus.iot.thrift.TTriggerSnapshotLoadRes;
+import 
org.apache.iotdb.consensus.iot.thrift.TWaitReleaseAllRegionRelatedResourceReq;
+import 
org.apache.iotdb.consensus.iot.thrift.TWaitReleaseAllRegionRelatedResourceRes;
 import org.apache.iotdb.consensus.iot.thrift.TWaitSyncLogCompleteReq;
 import org.apache.iotdb.consensus.iot.thrift.TWaitSyncLogCompleteRes;
 import org.apache.iotdb.rpc.RpcUtils;
@@ -619,6 +621,43 @@ public class IoTConsensusServerImpl {
     }
   }
 
+  public boolean hasReleaseAllRegionRelatedResource() {
+    return stateMachine.hasReleaseAllRegionRelatedResource();
+  }
+
+  public void waitReleaseAllRegionRelatedResource(Peer targetPeer)
+      throws ConsensusGroupModifyPeerException {
+    long checkIntervalInMs = 10_000L;
+    try (SyncIoTConsensusServiceClient client =
+        syncClientManager.borrowClient(targetPeer.getEndpoint())) {
+      while (true) {
+        TWaitReleaseAllRegionRelatedResourceRes res =
+            client.waitReleaseAllRegionRelatedResource(
+                new TWaitReleaseAllRegionRelatedResourceReq(
+                    targetPeer.getGroupId().convertToTConsensusGroupId()));
+        if (res.releaseAllResource) {
+          logger.info("[WAIT RELEASE] {} has released all region related 
resource", targetPeer);
+          return;
+        }
+        logger.info("[WAIT RELEASE] {} is still releasing all region related 
resource", targetPeer);
+        Thread.sleep(checkIntervalInMs);
+      }
+    } catch (ClientManagerException | TException e) {
+      throw new ConsensusGroupModifyPeerException(
+          String.format(
+              "error when waiting %s to release all region related resource. 
%s",
+              targetPeer, e.getMessage()),
+          e);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new ConsensusGroupModifyPeerException(
+          String.format(
+              "thread interrupted when waiting %s to release all region 
related resource. %s",
+              targetPeer, e.getMessage()),
+          e);
+    }
+  }
+
   private boolean isSuccess(TSStatus status) {
     return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode();
   }
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java
index 6201ec06ef6..ffc60568c7b 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java
@@ -50,6 +50,8 @@ import 
org.apache.iotdb.consensus.iot.thrift.TSyncLogEntriesReq;
 import org.apache.iotdb.consensus.iot.thrift.TSyncLogEntriesRes;
 import org.apache.iotdb.consensus.iot.thrift.TTriggerSnapshotLoadReq;
 import org.apache.iotdb.consensus.iot.thrift.TTriggerSnapshotLoadRes;
+import 
org.apache.iotdb.consensus.iot.thrift.TWaitReleaseAllRegionRelatedResourceReq;
+import 
org.apache.iotdb.consensus.iot.thrift.TWaitReleaseAllRegionRelatedResourceRes;
 import org.apache.iotdb.consensus.iot.thrift.TWaitSyncLogCompleteReq;
 import org.apache.iotdb.consensus.iot.thrift.TWaitSyncLogCompleteRes;
 import org.apache.iotdb.rpc.TSStatusCode;
@@ -227,8 +229,6 @@ public class IoTConsensusRPCServiceProcessor implements 
IoTConsensusIService.Ifa
       String message =
           String.format("unexpected consensusGroupId %s for 
waitSyncLogComplete request", groupId);
       LOGGER.error(message);
-      TSStatus status = new 
TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
-      status.setMessage(message);
       return new TWaitSyncLogCompleteRes(true, 0, 0);
     }
     long searchIndex = impl.getSearchIndex();
@@ -236,6 +236,23 @@ public class IoTConsensusRPCServiceProcessor implements 
IoTConsensusIService.Ifa
     return new TWaitSyncLogCompleteRes(searchIndex == safeIndex, searchIndex, 
safeIndex);
   }
 
+  @Override
+  public TWaitReleaseAllRegionRelatedResourceRes 
waitReleaseAllRegionRelatedResource(
+      TWaitReleaseAllRegionRelatedResourceReq req) throws TException {
+    ConsensusGroupId groupId =
+        
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId());
+    IoTConsensusServerImpl impl = consensus.getImpl(groupId);
+    if (impl == null) {
+      String message =
+          String.format(
+              "unexpected consensusGroupId %s for 
TWaitReleaseAllRegionRelatedResourceRes request",
+              groupId);
+      LOGGER.error(message);
+      return new TWaitReleaseAllRegionRelatedResourceRes(true);
+    }
+    return new 
TWaitReleaseAllRegionRelatedResourceRes(impl.hasReleaseAllRegionRelatedResource());
+  }
+
   @Override
   public TSendSnapshotFragmentRes 
sendSnapshotFragment(TSendSnapshotFragmentReq req)
       throws TException {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataRegionStateMachine.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataRegionStateMachine.java
index 240e77580b2..7a778ceb7f9 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataRegionStateMachine.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataRegionStateMachine.java
@@ -311,6 +311,18 @@ public class DataRegionStateMachine extends 
BaseStateMachine {
     }
   }
 
+  public boolean hasPipeReleaseRegionRelatedResource() {
+    // TODO: implement the method to check whether the user pipe has released 
all resources related
+    return true;
+  }
+
+  @Override
+  public boolean hasReleaseAllRegionRelatedResource() {
+    boolean releaseAllResource = true;
+    releaseAllResource &= hasPipeReleaseRegionRelatedResource();
+    return releaseAllResource;
+  }
+
   @Override
   public File getSnapshotRoot() {
     String snapshotDir = "";
diff --git 
a/iotdb-protocol/thrift-consensus/src/main/thrift/iotconsensus.thrift 
b/iotdb-protocol/thrift-consensus/src/main/thrift/iotconsensus.thrift
index c1957eaf114..36dbd2dc35e 100644
--- a/iotdb-protocol/thrift-consensus/src/main/thrift/iotconsensus.thrift
+++ b/iotdb-protocol/thrift-consensus/src/main/thrift/iotconsensus.thrift
@@ -94,6 +94,14 @@ struct TWaitSyncLogCompleteRes {
   3: required i64 safeIndex
 }
 
+struct TWaitReleaseAllRegionRelatedResourceReq {
+  1: required common.TConsensusGroupId consensusGroupId
+}
+
+struct TWaitReleaseAllRegionRelatedResourceRes {
+  1: required bool releaseAllResource
+}
+
 struct TSendSnapshotFragmentRes {
   1: required common.TSStatus status
 }
@@ -123,6 +131,7 @@ service IoTConsensusIService {
   TBuildSyncLogChannelRes buildSyncLogChannel(TBuildSyncLogChannelReq req)
   TRemoveSyncLogChannelRes removeSyncLogChannel(TRemoveSyncLogChannelReq req)
   TWaitSyncLogCompleteRes waitSyncLogComplete(TWaitSyncLogCompleteReq req)
+  TWaitReleaseAllRegionRelatedResourceRes 
waitReleaseAllRegionRelatedResource(TWaitReleaseAllRegionRelatedResourceReq req)
   TSendSnapshotFragmentRes sendSnapshotFragment(TSendSnapshotFragmentReq req)
   TTriggerSnapshotLoadRes triggerSnapshotLoad(TTriggerSnapshotLoadReq req)
   TCleanupTransferredSnapshotRes 
cleanupTransferredSnapshot(TCleanupTransferredSnapshotReq req)

Reply via email to