This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new ef54f795902 [Region Migration] Wait for all region-related resources
to be released before deleting (#13148)
ef54f795902 is described below
commit ef54f795902bc27bc068c4410c950c0446141ff8
Author: Xiangpeng Hu <[email protected]>
AuthorDate: Wed Aug 14 15:28:50 2024 +0800
[Region Migration] Wait for all region-related resources to be released
before deleting (#13148)
* add waitUserPipeAllowRemovePeer
* rollback conf
* change name
* fix comment
* refactor
* refactor
---
.../org/apache/iotdb/consensus/IStateMachine.java | 9 +++++
.../apache/iotdb/consensus/iot/IoTConsensus.java | 2 ++
.../consensus/iot/IoTConsensusServerImpl.java | 39 ++++++++++++++++++++++
.../service/IoTConsensusRPCServiceProcessor.java | 21 ++++++++++--
.../dataregion/DataRegionStateMachine.java | 12 +++++++
.../src/main/thrift/iotconsensus.thrift | 9 +++++
6 files changed, 90 insertions(+), 2 deletions(-)
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/IStateMachine.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/IStateMachine.java
index d9db3a836b2..bad1c4d9b7e 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/IStateMachine.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/IStateMachine.java
@@ -67,6 +67,15 @@ public interface IStateMachine {
*/
DataSet read(IConsensusRequest request);
+ /**
+ * Release all resources related to the region. Currently, we only check
pipe related resources.
+ *
+ * @return true if all resources are released successfully
+ */
+ default boolean hasReleaseAllRegionRelatedResource() {
+ return true;
+ }
+
/**
* Take a snapshot of current statemachine. All files are required to be
stored under snapshotDir,
* which is a subdirectory of the StorageDir in Consensus
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
index b659db9e3db..89894a4c01a 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
@@ -368,6 +368,8 @@ public class IoTConsensus implements IConsensus {
KillPoint.setKillPoint(IoTConsensusRemovePeerCoordinatorKillPoints.AFTER_INACTIVE_PEER);
// wait its SyncLog to complete
impl.waitTargetPeerUntilSyncLogCompleted(peer);
+ // wait its region related resource to release
+ impl.waitReleaseAllRegionRelatedResource(peer);
} catch (ConsensusGroupModifyPeerException e) {
throw new ConsensusException(e.getMessage());
}
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
index 6ef3f276585..c077e978ea6 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
@@ -59,6 +59,8 @@ import
org.apache.iotdb.consensus.iot.thrift.TSendSnapshotFragmentReq;
import org.apache.iotdb.consensus.iot.thrift.TSendSnapshotFragmentRes;
import org.apache.iotdb.consensus.iot.thrift.TTriggerSnapshotLoadReq;
import org.apache.iotdb.consensus.iot.thrift.TTriggerSnapshotLoadRes;
+import
org.apache.iotdb.consensus.iot.thrift.TWaitReleaseAllRegionRelatedResourceReq;
+import
org.apache.iotdb.consensus.iot.thrift.TWaitReleaseAllRegionRelatedResourceRes;
import org.apache.iotdb.consensus.iot.thrift.TWaitSyncLogCompleteReq;
import org.apache.iotdb.consensus.iot.thrift.TWaitSyncLogCompleteRes;
import org.apache.iotdb.rpc.RpcUtils;
@@ -619,6 +621,43 @@ public class IoTConsensusServerImpl {
}
}
+ public boolean hasReleaseAllRegionRelatedResource() {
+ return stateMachine.hasReleaseAllRegionRelatedResource();
+ }
+
+ public void waitReleaseAllRegionRelatedResource(Peer targetPeer)
+ throws ConsensusGroupModifyPeerException {
+ long checkIntervalInMs = 10_000L;
+ try (SyncIoTConsensusServiceClient client =
+ syncClientManager.borrowClient(targetPeer.getEndpoint())) {
+ while (true) {
+ TWaitReleaseAllRegionRelatedResourceRes res =
+ client.waitReleaseAllRegionRelatedResource(
+ new TWaitReleaseAllRegionRelatedResourceReq(
+ targetPeer.getGroupId().convertToTConsensusGroupId()));
+ if (res.releaseAllResource) {
+ logger.info("[WAIT RELEASE] {} has released all region related
resource", targetPeer);
+ return;
+ }
+ logger.info("[WAIT RELEASE] {} is still releasing all region related
resource", targetPeer);
+ Thread.sleep(checkIntervalInMs);
+ }
+ } catch (ClientManagerException | TException e) {
+ throw new ConsensusGroupModifyPeerException(
+ String.format(
+ "error when waiting %s to release all region related resource.
%s",
+ targetPeer, e.getMessage()),
+ e);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new ConsensusGroupModifyPeerException(
+ String.format(
+ "thread interrupted when waiting %s to release all region
related resource. %s",
+ targetPeer, e.getMessage()),
+ e);
+ }
+ }
+
private boolean isSuccess(TSStatus status) {
return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode();
}
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java
index 6201ec06ef6..ffc60568c7b 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java
@@ -50,6 +50,8 @@ import
org.apache.iotdb.consensus.iot.thrift.TSyncLogEntriesReq;
import org.apache.iotdb.consensus.iot.thrift.TSyncLogEntriesRes;
import org.apache.iotdb.consensus.iot.thrift.TTriggerSnapshotLoadReq;
import org.apache.iotdb.consensus.iot.thrift.TTriggerSnapshotLoadRes;
+import
org.apache.iotdb.consensus.iot.thrift.TWaitReleaseAllRegionRelatedResourceReq;
+import
org.apache.iotdb.consensus.iot.thrift.TWaitReleaseAllRegionRelatedResourceRes;
import org.apache.iotdb.consensus.iot.thrift.TWaitSyncLogCompleteReq;
import org.apache.iotdb.consensus.iot.thrift.TWaitSyncLogCompleteRes;
import org.apache.iotdb.rpc.TSStatusCode;
@@ -227,8 +229,6 @@ public class IoTConsensusRPCServiceProcessor implements
IoTConsensusIService.Ifa
String message =
String.format("unexpected consensusGroupId %s for
waitSyncLogComplete request", groupId);
LOGGER.error(message);
- TSStatus status = new
TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
- status.setMessage(message);
return new TWaitSyncLogCompleteRes(true, 0, 0);
}
long searchIndex = impl.getSearchIndex();
@@ -236,6 +236,23 @@ public class IoTConsensusRPCServiceProcessor implements
IoTConsensusIService.Ifa
return new TWaitSyncLogCompleteRes(searchIndex == safeIndex, searchIndex,
safeIndex);
}
+ @Override
+ public TWaitReleaseAllRegionRelatedResourceRes
waitReleaseAllRegionRelatedResource(
+ TWaitReleaseAllRegionRelatedResourceReq req) throws TException {
+ ConsensusGroupId groupId =
+
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId());
+ IoTConsensusServerImpl impl = consensus.getImpl(groupId);
+ if (impl == null) {
+ String message =
+ String.format(
+ "unexpected consensusGroupId %s for
TWaitReleaseAllRegionRelatedResourceRes request",
+ groupId);
+ LOGGER.error(message);
+ return new TWaitReleaseAllRegionRelatedResourceRes(true);
+ }
+ return new
TWaitReleaseAllRegionRelatedResourceRes(impl.hasReleaseAllRegionRelatedResource());
+ }
+
@Override
public TSendSnapshotFragmentRes
sendSnapshotFragment(TSendSnapshotFragmentReq req)
throws TException {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataRegionStateMachine.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataRegionStateMachine.java
index 240e77580b2..7a778ceb7f9 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataRegionStateMachine.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataRegionStateMachine.java
@@ -311,6 +311,18 @@ public class DataRegionStateMachine extends
BaseStateMachine {
}
}
+ public boolean hasPipeReleaseRegionRelatedResource() {
+ // TODO: implement the method to check whether the user pipe has released
all resources related
+ return true;
+ }
+
+ @Override
+ public boolean hasReleaseAllRegionRelatedResource() {
+ boolean releaseAllResource = true;
+ releaseAllResource &= hasPipeReleaseRegionRelatedResource();
+ return releaseAllResource;
+ }
+
@Override
public File getSnapshotRoot() {
String snapshotDir = "";
diff --git
a/iotdb-protocol/thrift-consensus/src/main/thrift/iotconsensus.thrift
b/iotdb-protocol/thrift-consensus/src/main/thrift/iotconsensus.thrift
index c1957eaf114..36dbd2dc35e 100644
--- a/iotdb-protocol/thrift-consensus/src/main/thrift/iotconsensus.thrift
+++ b/iotdb-protocol/thrift-consensus/src/main/thrift/iotconsensus.thrift
@@ -94,6 +94,14 @@ struct TWaitSyncLogCompleteRes {
3: required i64 safeIndex
}
+struct TWaitReleaseAllRegionRelatedResourceReq {
+ 1: required common.TConsensusGroupId consensusGroupId
+}
+
+struct TWaitReleaseAllRegionRelatedResourceRes {
+ 1: required bool releaseAllResource
+}
+
struct TSendSnapshotFragmentRes {
1: required common.TSStatus status
}
@@ -123,6 +131,7 @@ service IoTConsensusIService {
TBuildSyncLogChannelRes buildSyncLogChannel(TBuildSyncLogChannelReq req)
TRemoveSyncLogChannelRes removeSyncLogChannel(TRemoveSyncLogChannelReq req)
TWaitSyncLogCompleteRes waitSyncLogComplete(TWaitSyncLogCompleteReq req)
+ TWaitReleaseAllRegionRelatedResourceRes
waitReleaseAllRegionRelatedResource(TWaitReleaseAllRegionRelatedResourceReq req)
TSendSnapshotFragmentRes sendSnapshotFragment(TSendSnapshotFragmentReq req)
TTriggerSnapshotLoadRes triggerSnapshotLoad(TTriggerSnapshotLoadReq req)
TCleanupTransferredSnapshotRes
cleanupTransferredSnapshot(TCleanupTransferredSnapshotReq req)