This is an automated email from the ASF dual-hosted git repository.
yongzao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 5b642dff869 Repair the problem named ClassCastException cause by CN
resend rpc request frequently after one of DNs in the cluster is down up
(#17461)
5b642dff869 is described below
commit 5b642dff869f18d9f595995384e65de74ce92619
Author: libo <[email protected]>
AuthorDate: Tue Apr 14 14:34:41 2026 +0800
Repair the problem named ClassCastException cause by CN resend rpc request
frequently after one of DNs in the cluster is down up (#17461)
---
.../DataPartitionTableIntegrityCheckProcedure.java | 102 ++++++++++++++++-----
1 file changed, 79 insertions(+), 23 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/partition/DataPartitionTableIntegrityCheckProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/partition/DataPartitionTableIntegrityCheckProcedure.java
index 0970c3ac057..c95c1ec9072 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/partition/DataPartitionTableIntegrityCheckProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/partition/DataPartitionTableIntegrityCheckProcedure.java
@@ -24,6 +24,7 @@ import
org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
+import org.apache.iotdb.commons.cluster.NodeStatus;
import org.apache.iotdb.commons.enums.DataPartitionTableGeneratorState;
import org.apache.iotdb.commons.partition.DataPartitionTable;
import org.apache.iotdb.commons.partition.DatabaseScopedDataPartitionTable;
@@ -33,6 +34,7 @@ import
org.apache.iotdb.confignode.client.sync.CnToDnSyncRequestType;
import org.apache.iotdb.confignode.client.sync.SyncDataNodeClientPool;
import
org.apache.iotdb.confignode.consensus.request.read.partition.GetDataPartitionPlan;
import
org.apache.iotdb.confignode.consensus.request.write.partition.CreateDataPartitionPlan;
+import org.apache.iotdb.confignode.manager.load.LoadManager;
import org.apache.iotdb.confignode.manager.node.NodeManager;
import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
@@ -95,6 +97,7 @@ public class DataPartitionTableIntegrityCheckProcedure
private static final long ROLL_BACK_NEXT_STATE_INTERVAL = 60000;
NodeManager dataNodeManager;
+ LoadManager loadManager;
private List<TDataNodeConfiguration> allDataNodes = new ArrayList<>();
// ============Need serialize BEGIN=============/
@@ -135,6 +138,7 @@ public class DataPartitionTableIntegrityCheckProcedure
try {
// Ensure to get the real-time DataNodes in the current cluster at every
step
dataNodeManager = env.getConfigManager().getNodeManager();
+ loadManager = env.getConfigManager().getLoadManager();
allDataNodes = dataNodeManager.getRegisteredDataNodes();
switch (state) {
@@ -214,6 +218,10 @@ public class DataPartitionTableIntegrityCheckProcedure
return
DataPartitionTableIntegrityCheckProcedureState.COLLECT_EARLIEST_TIMESLOTS;
}
+ /**
+ * Collect earliest timeslot information from all DataNodes. Each DataNode
returns a Map<String,
+ * Long> where key is database name and value is the earliest timeslot id.
+ */
/**
* Collect earliest timeslot information from all DataNodes. Each DataNode
returns a Map<String,
* Long> where key is database name and value is the earliest timeslot id.
@@ -236,15 +244,31 @@ public class DataPartitionTableIntegrityCheckProcedure
// Collect earliest timeslots from all DataNodes
allDataNodes.removeAll(skipDataNodes);
for (TDataNodeConfiguration dataNode : allDataNodes) {
+ // Check if DataNode is alive before sending request
+ NodeStatus nodeStatus =
loadManager.getNodeStatus(dataNode.getLocation().getDataNodeId());
+ if (!NodeStatus.Running.equals(nodeStatus)) {
+ failedDataNodes.add(dataNode);
+ continue;
+ }
+
try {
- TGetEarliestTimeslotsResp resp =
- (TGetEarliestTimeslotsResp)
- SyncDataNodeClientPool.getInstance()
- .sendSyncRequestToDataNodeWithGivenRetry(
- dataNode.getLocation().getInternalEndPoint(),
- null,
- CnToDnSyncRequestType.COLLECT_EARLIEST_TIMESLOTS,
- MAX_RETRY_COUNT);
+ Object response =
+ SyncDataNodeClientPool.getInstance()
+ .sendSyncRequestToDataNodeWithGivenRetry(
+ dataNode.getLocation().getInternalEndPoint(),
+ null,
+ CnToDnSyncRequestType.COLLECT_EARLIEST_TIMESLOTS,
+ MAX_RETRY_COUNT);
+
+ if (response instanceof TSStatus) {
+ failedDataNodes.add(dataNode);
+ LOG.error(
+ "[DataPartitionIntegrity] Failed to collected earliest timeslots
from the DataNode[id={}], already out of max retry time",
+ dataNode.getLocation().getDataNodeId());
+ continue;
+ }
+
+ TGetEarliestTimeslotsResp resp = (TGetEarliestTimeslotsResp) response;
if (resp.getStatus().getCode() !=
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
failedDataNodes.add(dataNode);
LOG.error(
@@ -423,18 +447,34 @@ public class DataPartitionTableIntegrityCheckProcedure
allDataNodes.removeAll(failedDataNodes);
for (TDataNodeConfiguration dataNode : allDataNodes) {
int dataNodeId = dataNode.getLocation().getDataNodeId();
+ // Check if DataNode is alive before sending request
+ NodeStatus nodeStatus = loadManager.getNodeStatus(dataNodeId);
+ if (!NodeStatus.Running.equals(nodeStatus)) {
+ failedDataNodes.add(dataNode);
+ continue;
+ }
+
if (!dataPartitionTables.containsKey(dataNodeId)) {
try {
TGenerateDataPartitionTableReq req = new
TGenerateDataPartitionTableReq();
req.setDatabases(databasesWithLostDataPartition);
- TGenerateDataPartitionTableResp resp =
- (TGenerateDataPartitionTableResp)
- SyncDataNodeClientPool.getInstance()
- .sendSyncRequestToDataNodeWithGivenRetry(
- dataNode.getLocation().getInternalEndPoint(),
- req,
- CnToDnSyncRequestType.GENERATE_DATA_PARTITION_TABLE,
- MAX_RETRY_COUNT);
+ Object response =
+ SyncDataNodeClientPool.getInstance()
+ .sendSyncRequestToDataNodeWithGivenRetry(
+ dataNode.getLocation().getInternalEndPoint(),
+ req,
+ CnToDnSyncRequestType.GENERATE_DATA_PARTITION_TABLE,
+ MAX_RETRY_COUNT);
+
+ if (response instanceof TSStatus) {
+ failedDataNodes.add(dataNode);
+ LOG.error(
+ "[DataPartitionIntegrity] Failed to request DataPartitionTable
generation from the DataNode[id={}], already out of max retry time",
+ dataNode.getLocation().getDataNodeId());
+ continue;
+ }
+
+ TGenerateDataPartitionTableResp resp =
(TGenerateDataPartitionTableResp) response;
if (resp.getStatus().getCode() !=
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
failedDataNodes.add(dataNode);
LOG.error(
@@ -472,17 +512,33 @@ public class DataPartitionTableIntegrityCheckProcedure
int completeCount = 0;
for (TDataNodeConfiguration dataNode : allDataNodes) {
int dataNodeId = dataNode.getLocation().getDataNodeId();
+ // Check if DataNode is alive before sending request
+ NodeStatus nodeStatus = loadManager.getNodeStatus(dataNodeId);
+ if (!NodeStatus.Running.equals(nodeStatus)) {
+ failedDataNodes.add(dataNode);
+ continue;
+ }
if (!dataPartitionTables.containsKey(dataNodeId)) {
try {
+ Object response =
+ SyncDataNodeClientPool.getInstance()
+ .sendSyncRequestToDataNodeWithGivenRetry(
+ dataNode.getLocation().getInternalEndPoint(),
+ null,
+
CnToDnSyncRequestType.GENERATE_DATA_PARTITION_TABLE_HEART_BEAT,
+ MAX_RETRY_COUNT);
+
+ if (response instanceof TSStatus) {
+ failedDataNodes.add(dataNode);
+ LOG.error(
+ "[DataPartitionIntegrity] Failed to request DataPartitionTable
generation heart beat from the DataNode[id={}], already out of max retry time",
+ dataNode.getLocation().getDataNodeId());
+ continue;
+ }
+
TGenerateDataPartitionTableHeartbeatResp resp =
- (TGenerateDataPartitionTableHeartbeatResp)
- SyncDataNodeClientPool.getInstance()
- .sendSyncRequestToDataNodeWithGivenRetry(
- dataNode.getLocation().getInternalEndPoint(),
- null,
-
CnToDnSyncRequestType.GENERATE_DATA_PARTITION_TABLE_HEART_BEAT,
- MAX_RETRY_COUNT);
+ (TGenerateDataPartitionTableHeartbeatResp) response;
DataPartitionTableGeneratorState state =
DataPartitionTableGeneratorState.getStateByCode(resp.getErrorCode());