This is an automated email from the ASF dual-hosted git repository.

jt2594838 pushed a commit to branch 
check_consensus_before_answering_region_request
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 6338581e6be087620ef54a1c14cf0fa17c8fb5a6
Author: Tian Jiang <[email protected]>
AuthorDate: Thu Apr 23 18:21:25 2026 +0800

    Wait for consensus start before answering region requests
---
 .../impl/DataNodeInternalRPCServiceImpl.java       | 62 ++++++++++++++++++++++
 .../DataNodeInternalRPCServiceImplTest.java        |  6 +--
 2 files changed, 65 insertions(+), 3 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
index 28422bbd837..bd420fab06a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -49,6 +49,8 @@ import org.apache.iotdb.commons.audit.UserEntity;
 import org.apache.iotdb.commons.auth.entity.PrivilegeType;
 import org.apache.iotdb.commons.client.request.AsyncRequestContext;
 import org.apache.iotdb.commons.cluster.NodeStatus;
+import org.apache.iotdb.commons.concurrent.Await;
+import org.apache.iotdb.commons.concurrent.AwaitTimeoutException;
 import org.apache.iotdb.commons.concurrent.IoTThreadFactory;
 import org.apache.iotdb.commons.concurrent.ThreadName;
 import 
org.apache.iotdb.commons.concurrent.threadpool.WrappedThreadPoolExecutor;
@@ -440,6 +442,24 @@ public class DataNodeInternalRPCServiceImpl implements 
IDataNodeRPCService.Iface
     this.dataNodeContext = dataNodeContext;
   }
 
+  private TSStatus waitForConsensusStarted() {
+    if (dataNodeContext.isAllConsensusStarted()) {
+      return null;
+    }
+    try {
+      Await.await()
+          .atMost(30, TimeUnit.SECONDS)
+          .pollInterval(100, TimeUnit.MILLISECONDS)
+          .until(dataNodeContext::isAllConsensusStarted);
+      return null;
+    } catch (AwaitTimeoutException e) {
+      LOGGER.warn("Consensus has not been started after 30 seconds, rejecting 
region request");
+      return RpcUtils.getStatus(
+          TSStatusCode.CONSENSUS_NOT_INITIALIZED,
+          "Consensus has not been started after 30 seconds");
+    }
+  }
+
   @Override
   public TSendFragmentInstanceResp sendFragmentInstance(final 
TSendFragmentInstanceReq req) {
     LOGGER.debug("receive FragmentInstance to group[{}]", 
req.getConsensusGroupId());
@@ -628,11 +648,19 @@ public class DataNodeInternalRPCServiceImpl implements 
IDataNodeRPCService.Iface
 
   @Override
   public TSStatus createSchemaRegion(final TCreateSchemaRegionReq req) {
+    TSStatus consensusStatus = waitForConsensusStarted();
+    if (consensusStatus != null) {
+      return consensusStatus;
+    }
     return regionManager.createSchemaRegion(req.getRegionReplicaSet(), 
req.getStorageGroup());
   }
 
   @Override
   public TSStatus createDataRegion(TCreateDataRegionReq req) {
+    TSStatus consensusStatus = waitForConsensusStarted();
+    if (consensusStatus != null) {
+      return consensusStatus;
+    }
     return regionManager.createDataRegion(req.getRegionReplicaSet(), 
req.getStorageGroup());
   }
 
@@ -2620,6 +2648,10 @@ public class DataNodeInternalRPCServiceImpl implements 
IDataNodeRPCService.Iface
 
   @Override
   public TSStatus deleteRegion(TConsensusGroupId tconsensusGroupId) {
+    TSStatus consensusStatus = waitForConsensusStarted();
+    if (consensusStatus != null) {
+      return consensusStatus;
+    }
     ConsensusGroupId consensusGroupId =
         
ConsensusGroupId.Factory.createFromTConsensusGroupId(tconsensusGroupId);
     if (consensusGroupId instanceof DataRegionId) {
@@ -2648,6 +2680,12 @@ public class DataNodeInternalRPCServiceImpl implements 
IDataNodeRPCService.Iface
     LOGGER.info("[ChangeRegionLeader] {}", req);
     TRegionLeaderChangeResp resp = new TRegionLeaderChangeResp();
 
+    TSStatus consensusStatus = waitForConsensusStarted();
+    if (consensusStatus != null) {
+      resp.setStatus(consensusStatus);
+      return resp;
+    }
+
     TSStatus successStatus = new 
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
     TConsensusGroupId tgId = req.getRegionId();
     ConsensusGroupId regionId = 
ConsensusGroupId.Factory.createFromTConsensusGroupId(tgId);
@@ -2717,6 +2755,10 @@ public class DataNodeInternalRPCServiceImpl implements 
IDataNodeRPCService.Iface
 
   @Override
   public TSStatus createNewRegionPeer(TCreatePeerReq req) {
+    TSStatus consensusStatus = waitForConsensusStarted();
+    if (consensusStatus != null) {
+      return consensusStatus;
+    }
     ConsensusGroupId regionId =
         
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getRegionId());
     List<Peer> peers =
@@ -2737,6 +2779,10 @@ public class DataNodeInternalRPCServiceImpl implements 
IDataNodeRPCService.Iface
 
   @Override
   public TSStatus addRegionPeer(TMaintainPeerReq req) {
+    TSStatus consensusStatus = waitForConsensusStarted();
+    if (consensusStatus != null) {
+      return consensusStatus;
+    }
     TConsensusGroupId regionId = req.getRegionId();
     String selectedDataNodeIP = 
req.getDestNode().getInternalEndPoint().getIp();
     boolean submitSucceed = 
RegionMigrateService.getInstance().submitAddRegionPeerTask(req);
@@ -2755,6 +2801,10 @@ public class DataNodeInternalRPCServiceImpl implements 
IDataNodeRPCService.Iface
 
   @Override
   public TSStatus removeRegionPeer(TMaintainPeerReq req) {
+    TSStatus consensusStatus = waitForConsensusStarted();
+    if (consensusStatus != null) {
+      return consensusStatus;
+    }
     TConsensusGroupId regionId = req.getRegionId();
     String selectedDataNodeIP = 
req.getDestNode().getInternalEndPoint().getIp();
     boolean submitSucceed = 
RegionMigrateService.getInstance().submitRemoveRegionPeerTask(req);
@@ -2773,6 +2823,10 @@ public class DataNodeInternalRPCServiceImpl implements 
IDataNodeRPCService.Iface
 
   @Override
   public TSStatus deleteOldRegionPeer(TMaintainPeerReq req) {
+    TSStatus consensusStatus = waitForConsensusStarted();
+    if (consensusStatus != null) {
+      return consensusStatus;
+    }
     TConsensusGroupId regionId = req.getRegionId();
     String selectedDataNodeIP = 
req.getDestNode().getInternalEndPoint().getIp();
     boolean submitSucceed = 
RegionMigrateService.getInstance().submitDeleteOldRegionPeerTask(req);
@@ -2792,6 +2846,10 @@ public class DataNodeInternalRPCServiceImpl implements 
IDataNodeRPCService.Iface
   // TODO: return which DataNode fail
   @Override
   public TSStatus resetPeerList(TResetPeerListReq req) throws TException {
+    TSStatus consensusStatus = waitForConsensusStarted();
+    if (consensusStatus != null) {
+      return consensusStatus;
+    }
     return RegionMigrateService.getInstance().resetPeerList(req);
   }
 
@@ -2802,6 +2860,10 @@ public class DataNodeInternalRPCServiceImpl implements 
IDataNodeRPCService.Iface
 
   @Override
   public TSStatus notifyRegionMigration(TNotifyRegionMigrationReq req) throws 
TException {
+    TSStatus consensusStatus = waitForConsensusStarted();
+    if (consensusStatus != null) {
+      return consensusStatus;
+    }
     RegionMigrateService.getInstance().notifyRegionMigration(req);
     return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
   }
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/service/DataNodeInternalRPCServiceImplTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/service/DataNodeInternalRPCServiceImplTest.java
index f601c9bd1fc..63dda4537f9 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/service/DataNodeInternalRPCServiceImplTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/service/DataNodeInternalRPCServiceImplTest.java
@@ -19,8 +19,6 @@
 
 package org.apache.iotdb.db.service;
 
-import static org.mockito.Mockito.when;
-
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
@@ -71,6 +69,7 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.mockito.Mockito;
 
 import java.io.File;
 import java.io.IOException;
@@ -82,7 +81,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
-import org.mockito.Mockito;
+
+import static org.mockito.Mockito.when;
 
 public class DataNodeInternalRPCServiceImplTest {
 

Reply via email to