This is an automated email from the ASF dual-hosted git repository. jt2594838 pushed a commit to branch check_consensus_before_answering_region_request in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 6338581e6be087620ef54a1c14cf0fa17c8fb5a6 Author: Tian Jiang <[email protected]> AuthorDate: Thu Apr 23 18:21:25 2026 +0800 Wait for consensus start before answering region requests --- .../impl/DataNodeInternalRPCServiceImpl.java | 62 ++++++++++++++++++++++ .../DataNodeInternalRPCServiceImplTest.java | 6 +-- 2 files changed, 65 insertions(+), 3 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java index 28422bbd837..bd420fab06a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java @@ -49,6 +49,8 @@ import org.apache.iotdb.commons.audit.UserEntity; import org.apache.iotdb.commons.auth.entity.PrivilegeType; import org.apache.iotdb.commons.client.request.AsyncRequestContext; import org.apache.iotdb.commons.cluster.NodeStatus; +import org.apache.iotdb.commons.concurrent.Await; +import org.apache.iotdb.commons.concurrent.AwaitTimeoutException; import org.apache.iotdb.commons.concurrent.IoTThreadFactory; import org.apache.iotdb.commons.concurrent.ThreadName; import org.apache.iotdb.commons.concurrent.threadpool.WrappedThreadPoolExecutor; @@ -440,6 +442,24 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface this.dataNodeContext = dataNodeContext; } + private TSStatus waitForConsensusStarted() { + if (dataNodeContext.isAllConsensusStarted()) { + return null; + } + try { + Await.await() + .atMost(30, TimeUnit.SECONDS) + .pollInterval(100, TimeUnit.MILLISECONDS) + .until(dataNodeContext::isAllConsensusStarted); + return null; + } catch (AwaitTimeoutException e) { + LOGGER.warn("Consensus has not been started after 30 seconds, rejecting region request"); + return RpcUtils.getStatus( + TSStatusCode.CONSENSUS_NOT_INITIALIZED, + "Consensus has not been started after 30 seconds"); + } + } + @Override public TSendFragmentInstanceResp sendFragmentInstance(final TSendFragmentInstanceReq req) { LOGGER.debug("receive FragmentInstance to group[{}]", req.getConsensusGroupId()); @@ -628,11 +648,19 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface @Override public TSStatus createSchemaRegion(final TCreateSchemaRegionReq req) { + TSStatus consensusStatus = waitForConsensusStarted(); + if (consensusStatus != null) { + return consensusStatus; + } return regionManager.createSchemaRegion(req.getRegionReplicaSet(), req.getStorageGroup()); } @Override public TSStatus createDataRegion(TCreateDataRegionReq req) { + TSStatus consensusStatus = waitForConsensusStarted(); + if (consensusStatus != null) { + return consensusStatus; + } return regionManager.createDataRegion(req.getRegionReplicaSet(), req.getStorageGroup()); } @@ -2620,6 +2648,10 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface @Override public TSStatus deleteRegion(TConsensusGroupId tconsensusGroupId) { + TSStatus consensusStatus = waitForConsensusStarted(); + if (consensusStatus != null) { + return consensusStatus; + } ConsensusGroupId consensusGroupId = ConsensusGroupId.Factory.createFromTConsensusGroupId(tconsensusGroupId); if (consensusGroupId instanceof DataRegionId) { @@ -2648,6 +2680,12 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface LOGGER.info("[ChangeRegionLeader] {}", req); TRegionLeaderChangeResp resp = new TRegionLeaderChangeResp(); + TSStatus consensusStatus = waitForConsensusStarted(); + if (consensusStatus != null) { + resp.setStatus(consensusStatus); + return resp; + } + TSStatus successStatus = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); TConsensusGroupId tgId = req.getRegionId(); ConsensusGroupId regionId = ConsensusGroupId.Factory.createFromTConsensusGroupId(tgId); @@ -2717,6 +2755,10 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface @Override public TSStatus createNewRegionPeer(TCreatePeerReq req) { + TSStatus consensusStatus = waitForConsensusStarted(); + if (consensusStatus != null) { + return consensusStatus; + } ConsensusGroupId regionId = ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getRegionId()); List<Peer> peers = @@ -2737,6 +2779,10 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface @Override public TSStatus addRegionPeer(TMaintainPeerReq req) { + TSStatus consensusStatus = waitForConsensusStarted(); + if (consensusStatus != null) { + return consensusStatus; + } TConsensusGroupId regionId = req.getRegionId(); String selectedDataNodeIP = req.getDestNode().getInternalEndPoint().getIp(); boolean submitSucceed = RegionMigrateService.getInstance().submitAddRegionPeerTask(req); @@ -2755,6 +2801,10 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface @Override public TSStatus removeRegionPeer(TMaintainPeerReq req) { + TSStatus consensusStatus = waitForConsensusStarted(); + if (consensusStatus != null) { + return consensusStatus; + } TConsensusGroupId regionId = req.getRegionId(); String selectedDataNodeIP = req.getDestNode().getInternalEndPoint().getIp(); boolean submitSucceed = RegionMigrateService.getInstance().submitRemoveRegionPeerTask(req); @@ -2773,6 +2823,10 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface @Override public TSStatus deleteOldRegionPeer(TMaintainPeerReq req) { + TSStatus consensusStatus = waitForConsensusStarted(); + if (consensusStatus != null) { + return consensusStatus; + } TConsensusGroupId regionId = req.getRegionId(); String selectedDataNodeIP = req.getDestNode().getInternalEndPoint().getIp(); boolean submitSucceed = RegionMigrateService.getInstance().submitDeleteOldRegionPeerTask(req); @@ -2792,6 +2846,10 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface // TODO: return which DataNode fail @Override public TSStatus resetPeerList(TResetPeerListReq req) throws TException { + TSStatus consensusStatus = waitForConsensusStarted(); + if (consensusStatus != null) { + return consensusStatus; + } return RegionMigrateService.getInstance().resetPeerList(req); } @@ -2802,6 +2860,10 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface @Override public TSStatus notifyRegionMigration(TNotifyRegionMigrationReq req) throws TException { + TSStatus consensusStatus = waitForConsensusStarted(); + if (consensusStatus != null) { + return consensusStatus; + } RegionMigrateService.getInstance().notifyRegionMigration(req); return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/service/DataNodeInternalRPCServiceImplTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/service/DataNodeInternalRPCServiceImplTest.java index f601c9bd1fc..63dda4537f9 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/service/DataNodeInternalRPCServiceImplTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/service/DataNodeInternalRPCServiceImplTest.java @@ -19,8 +19,6 @@ package org.apache.iotdb.db.service; -import static org.mockito.Mockito.when; - import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType; import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; @@ -71,6 +69,7 @@ import org.junit.Assert; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; +import org.mockito.Mockito; import java.io.File; import java.io.IOException; @@ -82,7 +81,8 @@ import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; -import org.mockito.Mockito; + +import static org.mockito.Mockito.when; public class DataNodeInternalRPCServiceImplTest {
