This is an automated email from the ASF dual-hosted git repository.

yongzao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 5b642dff869 Repair the problem named ClassCastException cause by CN 
resend rpc request frequently after one of DNs in the cluster is down up 
(#17461)
5b642dff869 is described below

commit 5b642dff869f18d9f595995384e65de74ce92619
Author: libo <[email protected]>
AuthorDate: Tue Apr 14 14:34:41 2026 +0800

    Repair the problem named ClassCastException cause by CN resend rpc request 
frequently after one of DNs in the cluster is down up (#17461)
---
 .../DataPartitionTableIntegrityCheckProcedure.java | 102 ++++++++++++++++-----
 1 file changed, 79 insertions(+), 23 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/partition/DataPartitionTableIntegrityCheckProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/partition/DataPartitionTableIntegrityCheckProcedure.java
index 0970c3ac057..c95c1ec9072 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/partition/DataPartitionTableIntegrityCheckProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/partition/DataPartitionTableIntegrityCheckProcedure.java
@@ -24,6 +24,7 @@ import 
org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
 import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
+import org.apache.iotdb.commons.cluster.NodeStatus;
 import org.apache.iotdb.commons.enums.DataPartitionTableGeneratorState;
 import org.apache.iotdb.commons.partition.DataPartitionTable;
 import org.apache.iotdb.commons.partition.DatabaseScopedDataPartitionTable;
@@ -33,6 +34,7 @@ import 
org.apache.iotdb.confignode.client.sync.CnToDnSyncRequestType;
 import org.apache.iotdb.confignode.client.sync.SyncDataNodeClientPool;
 import 
org.apache.iotdb.confignode.consensus.request.read.partition.GetDataPartitionPlan;
 import 
org.apache.iotdb.confignode.consensus.request.write.partition.CreateDataPartitionPlan;
+import org.apache.iotdb.confignode.manager.load.LoadManager;
 import org.apache.iotdb.confignode.manager.node.NodeManager;
 import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
 import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
@@ -95,6 +97,7 @@ public class DataPartitionTableIntegrityCheckProcedure
   private static final long ROLL_BACK_NEXT_STATE_INTERVAL = 60000;
 
   NodeManager dataNodeManager;
+  LoadManager loadManager;
   private List<TDataNodeConfiguration> allDataNodes = new ArrayList<>();
 
   // ============Need serialize BEGIN=============/
@@ -135,6 +138,7 @@ public class DataPartitionTableIntegrityCheckProcedure
     try {
       // Ensure to get the real-time DataNodes in the current cluster at every 
step
       dataNodeManager = env.getConfigManager().getNodeManager();
+      loadManager = env.getConfigManager().getLoadManager();
       allDataNodes = dataNodeManager.getRegisteredDataNodes();
 
       switch (state) {
@@ -214,6 +218,10 @@ public class DataPartitionTableIntegrityCheckProcedure
     return 
DataPartitionTableIntegrityCheckProcedureState.COLLECT_EARLIEST_TIMESLOTS;
   }
 
+  /**
+   * Collect earliest timeslot information from all DataNodes. Each DataNode 
returns a Map<String,
+   * Long> where key is database name and value is the earliest timeslot id.
+   */
   /**
    * Collect earliest timeslot information from all DataNodes. Each DataNode 
returns a Map<String,
    * Long> where key is database name and value is the earliest timeslot id.
@@ -236,15 +244,31 @@ public class DataPartitionTableIntegrityCheckProcedure
     // Collect earliest timeslots from all DataNodes
     allDataNodes.removeAll(skipDataNodes);
     for (TDataNodeConfiguration dataNode : allDataNodes) {
+      // Check if DataNode is alive before sending request
+      NodeStatus nodeStatus = 
loadManager.getNodeStatus(dataNode.getLocation().getDataNodeId());
+      if (!NodeStatus.Running.equals(nodeStatus)) {
+        failedDataNodes.add(dataNode);
+        continue;
+      }
+
       try {
-        TGetEarliestTimeslotsResp resp =
-            (TGetEarliestTimeslotsResp)
-                SyncDataNodeClientPool.getInstance()
-                    .sendSyncRequestToDataNodeWithGivenRetry(
-                        dataNode.getLocation().getInternalEndPoint(),
-                        null,
-                        CnToDnSyncRequestType.COLLECT_EARLIEST_TIMESLOTS,
-                        MAX_RETRY_COUNT);
+        Object response =
+            SyncDataNodeClientPool.getInstance()
+                .sendSyncRequestToDataNodeWithGivenRetry(
+                    dataNode.getLocation().getInternalEndPoint(),
+                    null,
+                    CnToDnSyncRequestType.COLLECT_EARLIEST_TIMESLOTS,
+                    MAX_RETRY_COUNT);
+
+        if (response instanceof TSStatus) {
+          failedDataNodes.add(dataNode);
+          LOG.error(
+              "[DataPartitionIntegrity] Failed to collected earliest timeslots 
from the DataNode[id={}], already out of max retry time",
+              dataNode.getLocation().getDataNodeId());
+          continue;
+        }
+
+        TGetEarliestTimeslotsResp resp = (TGetEarliestTimeslotsResp) response;
         if (resp.getStatus().getCode() != 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
           failedDataNodes.add(dataNode);
           LOG.error(
@@ -423,18 +447,34 @@ public class DataPartitionTableIntegrityCheckProcedure
     allDataNodes.removeAll(failedDataNodes);
     for (TDataNodeConfiguration dataNode : allDataNodes) {
       int dataNodeId = dataNode.getLocation().getDataNodeId();
+      // Check if DataNode is alive before sending request
+      NodeStatus nodeStatus = loadManager.getNodeStatus(dataNodeId);
+      if (!NodeStatus.Running.equals(nodeStatus)) {
+        failedDataNodes.add(dataNode);
+        continue;
+      }
+
       if (!dataPartitionTables.containsKey(dataNodeId)) {
         try {
           TGenerateDataPartitionTableReq req = new 
TGenerateDataPartitionTableReq();
           req.setDatabases(databasesWithLostDataPartition);
-          TGenerateDataPartitionTableResp resp =
-              (TGenerateDataPartitionTableResp)
-                  SyncDataNodeClientPool.getInstance()
-                      .sendSyncRequestToDataNodeWithGivenRetry(
-                          dataNode.getLocation().getInternalEndPoint(),
-                          req,
-                          CnToDnSyncRequestType.GENERATE_DATA_PARTITION_TABLE,
-                          MAX_RETRY_COUNT);
+          Object response =
+              SyncDataNodeClientPool.getInstance()
+                  .sendSyncRequestToDataNodeWithGivenRetry(
+                      dataNode.getLocation().getInternalEndPoint(),
+                      req,
+                      CnToDnSyncRequestType.GENERATE_DATA_PARTITION_TABLE,
+                      MAX_RETRY_COUNT);
+
+          if (response instanceof TSStatus) {
+            failedDataNodes.add(dataNode);
+            LOG.error(
+                "[DataPartitionIntegrity] Failed to request DataPartitionTable 
generation from the DataNode[id={}], already out of max retry time",
+                dataNode.getLocation().getDataNodeId());
+            continue;
+          }
+
+          TGenerateDataPartitionTableResp resp = 
(TGenerateDataPartitionTableResp) response;
           if (resp.getStatus().getCode() != 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
             failedDataNodes.add(dataNode);
             LOG.error(
@@ -472,17 +512,33 @@ public class DataPartitionTableIntegrityCheckProcedure
     int completeCount = 0;
     for (TDataNodeConfiguration dataNode : allDataNodes) {
       int dataNodeId = dataNode.getLocation().getDataNodeId();
+      // Check if DataNode is alive before sending request
+      NodeStatus nodeStatus = loadManager.getNodeStatus(dataNodeId);
+      if (!NodeStatus.Running.equals(nodeStatus)) {
+        failedDataNodes.add(dataNode);
+        continue;
+      }
 
       if (!dataPartitionTables.containsKey(dataNodeId)) {
         try {
+          Object response =
+              SyncDataNodeClientPool.getInstance()
+                  .sendSyncRequestToDataNodeWithGivenRetry(
+                      dataNode.getLocation().getInternalEndPoint(),
+                      null,
+                      
CnToDnSyncRequestType.GENERATE_DATA_PARTITION_TABLE_HEART_BEAT,
+                      MAX_RETRY_COUNT);
+
+          if (response instanceof TSStatus) {
+            failedDataNodes.add(dataNode);
+            LOG.error(
+                "[DataPartitionIntegrity] Failed to request DataPartitionTable 
generation heart beat from the DataNode[id={}], already out of max retry time",
+                dataNode.getLocation().getDataNodeId());
+            continue;
+          }
+
           TGenerateDataPartitionTableHeartbeatResp resp =
-              (TGenerateDataPartitionTableHeartbeatResp)
-                  SyncDataNodeClientPool.getInstance()
-                      .sendSyncRequestToDataNodeWithGivenRetry(
-                          dataNode.getLocation().getInternalEndPoint(),
-                          null,
-                          
CnToDnSyncRequestType.GENERATE_DATA_PARTITION_TABLE_HEART_BEAT,
-                          MAX_RETRY_COUNT);
+              (TGenerateDataPartitionTableHeartbeatResp) response;
           DataPartitionTableGeneratorState state =
               
DataPartitionTableGeneratorState.getStateByCode(resp.getErrorCode());
 

Reply via email to