This is an automated email from the ASF dual-hosted git repository.
CRZbulabula pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 859c719b9a4 Wait for consensus start before answering region requests
(#17546)
859c719b9a4 is described below
commit 859c719b9a4a728b8785eb70ab5e176b445dbbf7
Author: Jiang Tian <[email protected]>
AuthorDate: Sun Apr 26 15:55:28 2026 +0800
Wait for consensus start before answering region requests (#17546)
---
.../impl/DataNodeInternalRPCServiceImpl.java | 76 ++++++++-
.../java/org/apache/iotdb/db/service/DataNode.java | 16 +-
.../db/service/DataNodeInternalRPCService.java | 12 +-
.../db/protocol/thrift/impl/ConsensusWaitTest.java | 131 ++++++++++++++
.../DataNodeInternalRPCServiceImplTest.java | 8 +-
.../org/apache/iotdb/commons/concurrent/Await.java | 53 ++++++
.../commons/concurrent/AwaitTimeoutException.java | 31 ++++
.../iotdb/commons/concurrent/ConditionAwaiter.java | 154 +++++++++++++++++
.../java/org/apache/iotdb/commons/AwaitTest.java | 190 +++++++++++++++++++++
9 files changed, 663 insertions(+), 8 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
index b047463cf59..84479c9dcd2 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -50,6 +50,8 @@ import org.apache.iotdb.commons.audit.UserEntity;
import org.apache.iotdb.commons.auth.entity.PrivilegeType;
import org.apache.iotdb.commons.client.request.AsyncRequestContext;
import org.apache.iotdb.commons.cluster.NodeStatus;
+import org.apache.iotdb.commons.concurrent.Await;
+import org.apache.iotdb.commons.concurrent.AwaitTimeoutException;
import org.apache.iotdb.commons.concurrent.IoTThreadFactory;
import org.apache.iotdb.commons.concurrent.ThreadName;
import
org.apache.iotdb.commons.concurrent.threadpool.WrappedThreadPoolExecutor;
@@ -193,6 +195,7 @@ import
org.apache.iotdb.db.schemaengine.template.ClusterTemplateManager;
import org.apache.iotdb.db.schemaengine.template.TemplateInternalRPCUpdateType;
import org.apache.iotdb.db.schemaengine.template.TemplateInternalRPCUtil;
import org.apache.iotdb.db.service.DataNode;
+import org.apache.iotdb.db.service.DataNode.DataNodeContext;
import org.apache.iotdb.db.service.RegionMigrateService;
import
org.apache.iotdb.db.service.externalservice.ExternalServiceManagementService;
import org.apache.iotdb.db.service.metrics.FileMetrics;
@@ -416,6 +419,8 @@ public class DataNodeInternalRPCServiceImpl implements
IDataNodeRPCService.Iface
private final CommonConfig commonConfig =
CommonDescriptor.getInstance().getConfig();
+ private final DataNodeContext dataNodeContext;
+
private final ExecutorService schemaExecutor =
new WrappedThreadPoolExecutor(
0,
@@ -430,10 +435,33 @@ public class DataNodeInternalRPCServiceImpl implements
IDataNodeRPCService.Iface
private static final String SYSTEM = "system";
- public DataNodeInternalRPCServiceImpl() {
+ public DataNodeInternalRPCServiceImpl(DataNodeContext dataNodeContext) {
super();
partitionFetcher = ClusterPartitionFetcher.getInstance();
schemaFetcher = ClusterSchemaFetcher.getInstance();
+ this.dataNodeContext = dataNodeContext;
+ }
+
+ private long consensusWaitTimeoutSeconds = 30;
+
+ private TSStatus waitForConsensusStarted() {
+ if (dataNodeContext.isAllConsensusStarted()) {
+ return null;
+ }
+ try {
+ Await.await()
+ .atMost(consensusWaitTimeoutSeconds, TimeUnit.SECONDS)
+ .pollInterval(100, TimeUnit.MILLISECONDS)
+ .until(dataNodeContext::isAllConsensusStarted);
+ return null;
+ } catch (AwaitTimeoutException e) {
+ LOGGER.warn(
+ "Consensus has not been started after {} seconds, rejecting region
request",
+ consensusWaitTimeoutSeconds);
+ return RpcUtils.getStatus(
+ TSStatusCode.CONSENSUS_NOT_INITIALIZED,
+ "Consensus has not been started after " +
consensusWaitTimeoutSeconds + " seconds");
+ }
}
@Override
@@ -624,11 +652,19 @@ public class DataNodeInternalRPCServiceImpl implements
IDataNodeRPCService.Iface
@Override
public TSStatus createSchemaRegion(final TCreateSchemaRegionReq req) {
+ TSStatus consensusStatus = waitForConsensusStarted();
+ if (consensusStatus != null) {
+ return consensusStatus;
+ }
return regionManager.createSchemaRegion(req.getRegionReplicaSet(),
req.getStorageGroup());
}
@Override
public TSStatus createDataRegion(TCreateDataRegionReq req) {
+ TSStatus consensusStatus = waitForConsensusStarted();
+ if (consensusStatus != null) {
+ return consensusStatus;
+ }
return regionManager.createDataRegion(req.getRegionReplicaSet(),
req.getStorageGroup());
}
@@ -2616,6 +2652,10 @@ public class DataNodeInternalRPCServiceImpl implements
IDataNodeRPCService.Iface
@Override
public TSStatus deleteRegion(TConsensusGroupId tconsensusGroupId) {
+ TSStatus consensusStatus = waitForConsensusStarted();
+ if (consensusStatus != null) {
+ return consensusStatus;
+ }
ConsensusGroupId consensusGroupId =
ConsensusGroupId.Factory.createFromTConsensusGroupId(tconsensusGroupId);
if (consensusGroupId instanceof DataRegionId) {
@@ -2644,6 +2684,12 @@ public class DataNodeInternalRPCServiceImpl implements
IDataNodeRPCService.Iface
LOGGER.info("[ChangeRegionLeader] {}", req);
TRegionLeaderChangeResp resp = new TRegionLeaderChangeResp();
+ TSStatus consensusStatus = waitForConsensusStarted();
+ if (consensusStatus != null) {
+ resp.setStatus(consensusStatus);
+ return resp;
+ }
+
TSStatus successStatus = new
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
TConsensusGroupId tgId = req.getRegionId();
ConsensusGroupId regionId =
ConsensusGroupId.Factory.createFromTConsensusGroupId(tgId);
@@ -2713,6 +2759,10 @@ public class DataNodeInternalRPCServiceImpl implements
IDataNodeRPCService.Iface
@Override
public TSStatus createNewRegionPeer(TCreatePeerReq req) {
+ TSStatus consensusStatus = waitForConsensusStarted();
+ if (consensusStatus != null) {
+ return consensusStatus;
+ }
ConsensusGroupId regionId =
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getRegionId());
List<Peer> peers =
@@ -2733,6 +2783,10 @@ public class DataNodeInternalRPCServiceImpl implements
IDataNodeRPCService.Iface
@Override
public TSStatus addRegionPeer(TMaintainPeerReq req) {
+ TSStatus consensusStatus = waitForConsensusStarted();
+ if (consensusStatus != null) {
+ return consensusStatus;
+ }
TConsensusGroupId regionId = req.getRegionId();
String selectedDataNodeIP =
req.getDestNode().getInternalEndPoint().getIp();
boolean submitSucceed =
RegionMigrateService.getInstance().submitAddRegionPeerTask(req);
@@ -2751,6 +2805,10 @@ public class DataNodeInternalRPCServiceImpl implements
IDataNodeRPCService.Iface
@Override
public TSStatus removeRegionPeer(TMaintainPeerReq req) {
+ TSStatus consensusStatus = waitForConsensusStarted();
+ if (consensusStatus != null) {
+ return consensusStatus;
+ }
TConsensusGroupId regionId = req.getRegionId();
String selectedDataNodeIP =
req.getDestNode().getInternalEndPoint().getIp();
boolean submitSucceed =
RegionMigrateService.getInstance().submitRemoveRegionPeerTask(req);
@@ -2769,6 +2827,10 @@ public class DataNodeInternalRPCServiceImpl implements
IDataNodeRPCService.Iface
@Override
public TSStatus deleteOldRegionPeer(TMaintainPeerReq req) {
+ TSStatus consensusStatus = waitForConsensusStarted();
+ if (consensusStatus != null) {
+ return consensusStatus;
+ }
TConsensusGroupId regionId = req.getRegionId();
String selectedDataNodeIP =
req.getDestNode().getInternalEndPoint().getIp();
boolean submitSucceed =
RegionMigrateService.getInstance().submitDeleteOldRegionPeerTask(req);
@@ -2788,6 +2850,10 @@ public class DataNodeInternalRPCServiceImpl implements
IDataNodeRPCService.Iface
// TODO: return which DataNode fail
@Override
public TSStatus resetPeerList(TResetPeerListReq req) throws TException {
+ TSStatus consensusStatus = waitForConsensusStarted();
+ if (consensusStatus != null) {
+ return consensusStatus;
+ }
return RegionMigrateService.getInstance().resetPeerList(req);
}
@@ -2798,6 +2864,10 @@ public class DataNodeInternalRPCServiceImpl implements
IDataNodeRPCService.Iface
@Override
public TSStatus notifyRegionMigration(TNotifyRegionMigrationReq req) throws
TException {
+ TSStatus consensusStatus = waitForConsensusStarted();
+ if (consensusStatus != null) {
+ return consensusStatus;
+ }
RegionMigrateService.getInstance().notifyRegionMigration(req);
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
}
@@ -3439,4 +3509,8 @@ public class DataNodeInternalRPCServiceImpl implements
IDataNodeRPCService.Iface
return result;
}
+
+ public void setConsensusWaitTimeoutSeconds(long consensusWaitTimeoutSeconds)
{
+ this.consensusWaitTimeoutSeconds = consensusWaitTimeoutSeconds;
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java
index 5db478c6a09..4d7e2260d68 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java
@@ -184,14 +184,16 @@ public class DataNode extends ServerCommandLine
implements DataNodeMBean {
private static final String REGISTER_INTERRUPTION =
"Unexpected interruption when waiting to register to the cluster";
- private boolean schemaRegionConsensusStarted = false;
- private boolean dataRegionConsensusStarted = false;
+ private volatile boolean schemaRegionConsensusStarted = false;
+ private volatile boolean dataRegionConsensusStarted = false;
private static Thread watcherThread;
+ private DataNodeContext context;
public DataNode() {
super("DataNode");
// We do not init anything here, so that we can re-initialize the instance
in IT.
DataNodeHolder.INSTANCE = this;
+ context = new DataNodeContext();
}
public static void reinitializeStatics() {
@@ -935,7 +937,9 @@ public class DataNode extends ServerCommandLine implements
DataNodeMBean {
protected void registerInternalRPCService() throws StartupException {
// Start InternalRPCService to indicate that the current DataNode can
accept cluster scheduling
- registerManager.register(DataNodeInternalRPCService.getInstance());
+ DataNodeInternalRPCService instance =
DataNodeInternalRPCService.getInstance();
+ instance.setDataNodeContext(context);
+ registerManager.register(instance);
}
// make it easier for users to extend ClientRPCServiceImpl to export more
rpc services
@@ -1374,4 +1378,10 @@ public class DataNode extends ServerCommandLine
implements DataNodeMBean {
// Empty constructor
}
}
+
+ public class DataNodeContext {
+ public boolean isAllConsensusStarted() {
+ return dataRegionConsensusStarted && schemaRegionConsensusStarted;
+ }
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeInternalRPCService.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeInternalRPCService.java
index 5de1041a9a0..f3bf8e507c2 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeInternalRPCService.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeInternalRPCService.java
@@ -31,6 +31,7 @@ import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import
org.apache.iotdb.db.protocol.thrift.handler.InternalServiceThriftHandler;
import org.apache.iotdb.db.protocol.thrift.impl.DataNodeInternalRPCServiceImpl;
+import org.apache.iotdb.db.service.DataNode.DataNodeContext;
import org.apache.iotdb.db.service.metrics.DataNodeInternalRPCServiceMetrics;
import org.apache.iotdb.mpp.rpc.thrift.IDataNodeRPCService.Processor;
import org.apache.iotdb.rpc.DeepCopyRpcTransportFactory;
@@ -44,6 +45,7 @@ public class DataNodeInternalRPCService extends ThriftService
private static final CommonConfig commonConfig =
CommonDescriptor.getInstance().getConfig();
private final AtomicReference<DataNodeInternalRPCServiceImpl> impl = new
AtomicReference<>();
+ private DataNodeContext dataNodeContext;
private DataNodeInternalRPCService() {}
@@ -54,9 +56,9 @@ public class DataNodeInternalRPCService extends ThriftService
@Override
public void initTProcessor() {
- impl.compareAndSet(null, new DataNodeInternalRPCServiceImpl());
+ DataNodeInternalRPCServiceImpl service = getImpl();
initSyncedServiceImpl(null);
- processor = new Processor<>(impl.get());
+ processor = new Processor<>(service);
}
@Override
@@ -109,7 +111,7 @@ public class DataNodeInternalRPCService extends
ThriftService
}
public DataNodeInternalRPCServiceImpl getImpl() {
- impl.compareAndSet(null, new DataNodeInternalRPCServiceImpl());
+ impl.compareAndSet(null, new
DataNodeInternalRPCServiceImpl(dataNodeContext));
return impl.get();
}
@@ -122,4 +124,8 @@ public class DataNodeInternalRPCService extends
ThriftService
public static DataNodeInternalRPCService getInstance() {
return DataNodeInternalRPCServiceHolder.INSTANCE;
}
+
+ public void setDataNodeContext(DataNodeContext dataNodeContext) {
+ this.dataNodeContext = dataNodeContext;
+ }
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/protocol/thrift/impl/ConsensusWaitTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/protocol/thrift/impl/ConsensusWaitTest.java
new file mode 100644
index 00000000000..65fd2b47858
--- /dev/null
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/protocol/thrift/impl/ConsensusWaitTest.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.protocol.thrift.impl;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.service.DataNode.DataNodeContext;
+import org.apache.iotdb.mpp.rpc.thrift.TCreateDataRegionReq;
+import org.apache.iotdb.mpp.rpc.thrift.TCreateSchemaRegionReq;
+import org.apache.iotdb.mpp.rpc.thrift.TRegionLeaderChangeReq;
+import org.apache.iotdb.mpp.rpc.thrift.TRegionLeaderChangeResp;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.util.Collections;
+
+import static org.mockito.Mockito.when;
+
+public class ConsensusWaitTest {
+
+ @BeforeClass
+ public static void setUp() {
+ IoTDBDescriptor.getInstance().getConfig().setDataNodeId(0);
+ }
+
+ private DataNodeInternalRPCServiceImpl
createServiceWithConsensusState(boolean started) {
+ DataNodeContext context = Mockito.mock(DataNodeContext.class);
+ when(context.isAllConsensusStarted()).thenReturn(started);
+ DataNodeInternalRPCServiceImpl service = new
DataNodeInternalRPCServiceImpl(context);
+ service.setConsensusWaitTimeoutSeconds(1);
+ return service;
+ }
+
+ private TCreateSchemaRegionReq createSchemaRegionReq() {
+ TCreateSchemaRegionReq req = new TCreateSchemaRegionReq();
+ req.setStorageGroup("root.test");
+ TRegionReplicaSet replicaSet = new TRegionReplicaSet();
+ replicaSet.setRegionId(new
TConsensusGroupId(TConsensusGroupType.SchemaRegion, 0));
+ TDataNodeLocation location = new TDataNodeLocation();
+ location.setDataNodeId(0);
+ location.setClientRpcEndPoint(new TEndPoint("0.0.0.0", 6667));
+ location.setInternalEndPoint(new TEndPoint("0.0.0.0", 10730));
+ location.setMPPDataExchangeEndPoint(new TEndPoint("0.0.0.0", 10740));
+ location.setDataRegionConsensusEndPoint(new TEndPoint("0.0.0.0", 10760));
+ location.setSchemaRegionConsensusEndPoint(new TEndPoint("0.0.0.0", 10750));
+ replicaSet.setDataNodeLocations(Collections.singletonList(location));
+ req.setRegionReplicaSet(replicaSet);
+ return req;
+ }
+
+ private TCreateDataRegionReq createDataRegionReq() {
+ TCreateDataRegionReq req = new TCreateDataRegionReq();
+ req.setStorageGroup("root.test");
+ TRegionReplicaSet replicaSet = new TRegionReplicaSet();
+ replicaSet.setRegionId(new
TConsensusGroupId(TConsensusGroupType.DataRegion, 0));
+ TDataNodeLocation location = new TDataNodeLocation();
+ location.setDataNodeId(0);
+ location.setClientRpcEndPoint(new TEndPoint("0.0.0.0", 6667));
+ location.setInternalEndPoint(new TEndPoint("0.0.0.0", 10730));
+ location.setMPPDataExchangeEndPoint(new TEndPoint("0.0.0.0", 10740));
+ location.setDataRegionConsensusEndPoint(new TEndPoint("0.0.0.0", 10760));
+ location.setSchemaRegionConsensusEndPoint(new TEndPoint("0.0.0.0", 10750));
+ replicaSet.setDataNodeLocations(Collections.singletonList(location));
+ req.setRegionReplicaSet(replicaSet);
+ return req;
+ }
+
+ @Test
+ public void testCreateSchemaRegionRejectsWhenConsensusNotStarted() {
+ DataNodeInternalRPCServiceImpl service =
createServiceWithConsensusState(false);
+ TSStatus status = service.createSchemaRegion(createSchemaRegionReq());
+
Assert.assertEquals(TSStatusCode.CONSENSUS_NOT_INITIALIZED.getStatusCode(),
status.getCode());
+ }
+
+ @Test
+ public void testCreateDataRegionRejectsWhenConsensusNotStarted() {
+ DataNodeInternalRPCServiceImpl service =
createServiceWithConsensusState(false);
+ TSStatus status = service.createDataRegion(createDataRegionReq());
+
Assert.assertEquals(TSStatusCode.CONSENSUS_NOT_INITIALIZED.getStatusCode(),
status.getCode());
+ }
+
+ @Test
+ public void testDeleteRegionRejectsWhenConsensusNotStarted() {
+ DataNodeInternalRPCServiceImpl service =
createServiceWithConsensusState(false);
+ TConsensusGroupId groupId = new
TConsensusGroupId(TConsensusGroupType.DataRegion, 0);
+ TSStatus status = service.deleteRegion(groupId);
+
Assert.assertEquals(TSStatusCode.CONSENSUS_NOT_INITIALIZED.getStatusCode(),
status.getCode());
+ }
+
+ @Test
+ public void testChangeRegionLeaderRejectsWhenConsensusNotStarted() {
+ DataNodeInternalRPCServiceImpl service =
createServiceWithConsensusState(false);
+ TRegionLeaderChangeReq req = new TRegionLeaderChangeReq();
+ req.setRegionId(new TConsensusGroupId(TConsensusGroupType.DataRegion, 0));
+ TDataNodeLocation newLeader = new TDataNodeLocation();
+ newLeader.setDataNodeId(0);
+ newLeader.setInternalEndPoint(new TEndPoint("0.0.0.0", 10730));
+ newLeader.setDataRegionConsensusEndPoint(new TEndPoint("0.0.0.0", 10760));
+ newLeader.setSchemaRegionConsensusEndPoint(new TEndPoint("0.0.0.0",
10750));
+ req.setNewLeaderNode(newLeader);
+ TRegionLeaderChangeResp resp = service.changeRegionLeader(req);
+ Assert.assertEquals(
+ TSStatusCode.CONSENSUS_NOT_INITIALIZED.getStatusCode(),
resp.getStatus().getCode());
+ }
+}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/service/DataNodeInternalRPCServiceImplTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/service/DataNodeInternalRPCServiceImplTest.java
index 8d100f4be14..15484df7e47 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/service/DataNodeInternalRPCServiceImplTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/service/DataNodeInternalRPCServiceImplTest.java
@@ -50,6 +50,7 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metadata.write.Cre
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metadata.write.CreateMultiTimeSeriesNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metadata.write.CreateTimeSeriesNode;
import org.apache.iotdb.db.schemaengine.SchemaEngine;
+import org.apache.iotdb.db.service.DataNode.DataNodeContext;
import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import org.apache.iotdb.db.utils.EnvironmentUtils;
@@ -68,6 +69,7 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
+import org.mockito.Mockito;
import java.io.File;
import java.io.IOException;
@@ -80,6 +82,8 @@ import java.util.Map;
import java.util.Objects;
import java.util.Optional;
+import static org.mockito.Mockito.when;
+
public class DataNodeInternalRPCServiceImplTest {
private static final IoTDBConfig conf =
IoTDBDescriptor.getInstance().getConfig();
@@ -134,7 +138,9 @@ public class DataNodeInternalRPCServiceImplTest {
.createLocalPeer(
ConsensusGroupId.Factory.createFromTConsensusGroupId(regionReplicaSet.getRegionId()),
genSchemaRegionPeerList(regionReplicaSet));
- dataNodeInternalRPCServiceImpl = new DataNodeInternalRPCServiceImpl();
+ DataNodeContext context = Mockito.mock(DataNodeContext.class);
+ when(context.isAllConsensusStarted()).thenReturn(true);
+ dataNodeInternalRPCServiceImpl = new
DataNodeInternalRPCServiceImpl(context);
}
@After
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/Await.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/Await.java
new file mode 100644
index 00000000000..f08ccd17495
--- /dev/null
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/Await.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.concurrent;
+
+/**
+ * Lightweight polling utility for production code. Provides a fluent API
similar to Awaitility for
+ * waiting until a condition becomes true.
+ *
+ * <pre>{@code
+ * // Wait with timeout
+ * Await.await()
+ * .atMost(5, TimeUnit.SECONDS)
+ * .pollInterval(100, TimeUnit.MILLISECONDS)
+ * .until(() -> isReady());
+ *
+ * // Wait forever (use with caution)
+ * Await.await()
+ * .forever()
+ * .pollInterval(1, TimeUnit.SECONDS)
+ * .until(() -> isReady());
+ *
+ * // Ignore exceptions during polling
+ * Await.await()
+ * .atMost(30, TimeUnit.SECONDS)
+ * .ignoreExceptions()
+ * .until(() -> tryConnect());
+ * }</pre>
+ */
+public final class Await {
+
+ private Await() {}
+
+ public static ConditionAwaiter await() {
+ return new ConditionAwaiter();
+ }
+}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/AwaitTimeoutException.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/AwaitTimeoutException.java
new file mode 100644
index 00000000000..b0d5c98bfe7
--- /dev/null
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/AwaitTimeoutException.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.concurrent;
+
+public class AwaitTimeoutException extends RuntimeException {
+
+ public AwaitTimeoutException(String message) {
+ super(message);
+ }
+
+ public AwaitTimeoutException(String message, Throwable cause) {
+ super(message, cause);
+ }
+}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ConditionAwaiter.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ConditionAwaiter.java
new file mode 100644
index 00000000000..f88db57f612
--- /dev/null
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ConditionAwaiter.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.concurrent;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+
+public class ConditionAwaiter {
+
+ private static final long DEFAULT_POLL_INTERVAL_MS = 100;
+ private static final long DEFAULT_TIMEOUT_MS = 10_000;
+
+ private long timeoutMs = DEFAULT_TIMEOUT_MS;
+ private long pollIntervalMs = DEFAULT_POLL_INTERVAL_MS;
+ private long pollDelayMs = 0;
+ private boolean ignoreAllExceptions = false;
+ private boolean forever = false;
+ private final List<Class<? extends Exception>> ignoredExceptions = new
ArrayList<>();
+
+ ConditionAwaiter() {}
+
+ public ConditionAwaiter atMost(long time, TimeUnit unit) {
+ this.timeoutMs = unit.toMillis(time);
+ return this;
+ }
+
+ public ConditionAwaiter pollInterval(long time, TimeUnit unit) {
+ this.pollIntervalMs = unit.toMillis(time);
+ return this;
+ }
+
+ public ConditionAwaiter pollDelay(long time, TimeUnit unit) {
+ this.pollDelayMs = unit.toMillis(time);
+ return this;
+ }
+
+ public ConditionAwaiter ignoreExceptions() {
+ this.ignoreAllExceptions = true;
+ return this;
+ }
+
+ public ConditionAwaiter ignoreException(Class<? extends Exception>
exceptionType) {
+ this.ignoredExceptions.add(exceptionType);
+ return this;
+ }
+
+ public ConditionAwaiter forever() {
+ this.forever = true;
+ return this;
+ }
+
+ public void until(Callable<Boolean> conditionEvaluator) {
+ long startTime = System.currentTimeMillis();
+
+ if (pollDelayMs > 0) {
+ sleep(pollDelayMs);
+ }
+
+ Exception lastException = null;
+ while (true) {
+ try {
+ Boolean result = conditionEvaluator.call();
+ if (Boolean.TRUE.equals(result)) {
+ return;
+ }
+ lastException = null;
+ } catch (Exception e) {
+ if (shouldIgnore(e)) {
+ lastException = e;
+ } else if (e instanceof InterruptedException) {
+ Thread.currentThread().interrupt();
+ throw new AwaitTimeoutException("Interrupted while awaiting
condition", e);
+ } else {
+ throw new AwaitTimeoutException("Exception while evaluating
condition", e);
+ }
+ }
+
+ if (!forever && System.currentTimeMillis() - startTime >= timeoutMs) {
+ String message = String.format("Condition was not met within %d ms",
timeoutMs);
+ if (lastException != null) {
+ throw new AwaitTimeoutException(message, lastException);
+ }
+ throw new AwaitTimeoutException(message);
+ }
+
+ sleep(pollIntervalMs);
+ }
+ }
+
+ public void untilAsserted(Runnable assertion) {
+ final AssertionErrorHolder holder = new AssertionErrorHolder();
+ try {
+ until(
+ () -> {
+ try {
+ assertion.run();
+ return true;
+ } catch (AssertionError e) {
+ holder.error = e;
+ return false;
+ }
+ });
+ } catch (AwaitTimeoutException e) {
+ if (holder.error != null) {
+ throw new AwaitTimeoutException(e.getMessage(), holder.error);
+ }
+ throw e;
+ }
+ }
+
+ private static final class AssertionErrorHolder {
+ AssertionError error;
+ }
+
+ private boolean shouldIgnore(Exception e) {
+ if (ignoreAllExceptions) {
+ return true;
+ }
+ for (Class<? extends Exception> ignoredType : ignoredExceptions) {
+ if (ignoredType.isInstance(e)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ private static void sleep(long millis) {
+ try {
+ Thread.sleep(millis);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new AwaitTimeoutException("Interrupted while awaiting condition",
e);
+ }
+ }
+}
diff --git
a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/AwaitTest.java
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/AwaitTest.java
new file mode 100644
index 00000000000..ae094c1a806
--- /dev/null
+++
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/AwaitTest.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons;
+
+import org.apache.iotdb.commons.concurrent.Await;
+import org.apache.iotdb.commons.concurrent.AwaitTimeoutException;
+
+import org.junit.Test;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class AwaitTest {
+
+ @Test
+ public void testConditionAlreadyTrue() {
+ Await.await().atMost(1, TimeUnit.SECONDS).until(() -> true);
+ }
+
+ @Test
+ public void testConditionBecomesTrue() {
+ AtomicBoolean flag = new AtomicBoolean(false);
+ new Thread(
+ () -> {
+ try {
+ Thread.sleep(200);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ flag.set(true);
+ })
+ .start();
+
+ Await.await()
+ .atMost(5, TimeUnit.SECONDS)
+ .pollInterval(50, TimeUnit.MILLISECONDS)
+ .until(flag::get);
+
+ assertTrue(flag.get());
+ }
+
+ @Test(expected = AwaitTimeoutException.class)
+ public void testTimeout() {
+ Await.await()
+ .atMost(300, TimeUnit.MILLISECONDS)
+ .pollInterval(50, TimeUnit.MILLISECONDS)
+ .until(() -> false);
+ }
+
+ @Test
+ public void testPollDelay() {
+ long start = System.currentTimeMillis();
+
+ Await.await()
+ .atMost(5, TimeUnit.SECONDS)
+ .pollDelay(200, TimeUnit.MILLISECONDS)
+ .until(() -> true);
+
+ long elapsed = System.currentTimeMillis() - start;
+ assertTrue("Expected at least 200ms delay, got " + elapsed, elapsed >=
180);
+ }
+
+ @Test
+ public void testIgnoreAllExceptions() {
+ AtomicInteger counter = new AtomicInteger(0);
+
+ Await.await()
+ .atMost(5, TimeUnit.SECONDS)
+ .pollInterval(50, TimeUnit.MILLISECONDS)
+ .ignoreExceptions()
+ .until(
+ () -> {
+ int val = counter.incrementAndGet();
+ if (val < 3) {
+ throw new RuntimeException("not ready yet");
+ }
+ return true;
+ });
+
+ assertTrue(counter.get() >= 3);
+ }
+
+ @Test
+ public void testIgnoreSpecificException() {
+ AtomicInteger counter = new AtomicInteger(0);
+
+ Await.await()
+ .atMost(5, TimeUnit.SECONDS)
+ .pollInterval(50, TimeUnit.MILLISECONDS)
+ .ignoreException(IllegalStateException.class)
+ .until(
+ () -> {
+ int val = counter.incrementAndGet();
+ if (val < 3) {
+ throw new IllegalStateException("not ready");
+ }
+ return true;
+ });
+
+ assertTrue(counter.get() >= 3);
+ }
+
+ @Test
+ public void testNonIgnoredExceptionPropagates() {
+ try {
+ Await.await()
+ .atMost(5, TimeUnit.SECONDS)
+ .ignoreException(IllegalStateException.class)
+ .until(
+ () -> {
+ throw new IllegalArgumentException("unexpected");
+ });
+ fail("Should have thrown");
+ } catch (AwaitTimeoutException e) {
+ assertTrue(e.getCause() instanceof IllegalArgumentException);
+ }
+ }
+
+ @Test
+ public void testUntilAsserted() {
+ AtomicInteger value = new AtomicInteger(0);
+ new Thread(
+ () -> {
+ try {
+ Thread.sleep(200);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ value.set(42);
+ })
+ .start();
+
+ Await.await()
+ .atMost(5, TimeUnit.SECONDS)
+ .pollInterval(50, TimeUnit.MILLISECONDS)
+ .untilAsserted(() -> assertEquals(42, value.get()));
+ }
+
+ @Test
+ public void testForever() {
+ AtomicInteger counter = new AtomicInteger(0);
+
+ Await.await()
+ .forever()
+ .pollInterval(10, TimeUnit.MILLISECONDS)
+ .until(() -> counter.incrementAndGet() >= 5);
+
+ assertTrue(counter.get() >= 5);
+ }
+
+ @Test
+ public void testTimeoutMessageIncludesLastException() {
+ try {
+ Await.await()
+ .atMost(200, TimeUnit.MILLISECONDS)
+ .pollInterval(50, TimeUnit.MILLISECONDS)
+ .ignoreExceptions()
+ .until(
+ () -> {
+ throw new RuntimeException("still failing");
+ });
+ fail("Should have thrown");
+ } catch (AwaitTimeoutException e) {
+ assertTrue(e.getCause() instanceof RuntimeException);
+ assertEquals("still failing", e.getCause().getMessage());
+ }
+ }
+}