This is an automated email from the ASF dual-hosted git repository.

yongzao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new b4ce8dab040 Fix can not use currentGeneratorFuture and 
currentGenerator to get current progress if DN is restarted before the data 
partition generation complete (#17491)
b4ce8dab040 is described below

commit b4ce8dab040c39d68f877348669ac1bf78b506ff
Author: libo <[email protected]>
AuthorDate: Thu Apr 16 14:15:38 2026 +0800

    Fix can not use currentGeneratorFuture and currentGenerator to get current 
progress if DN is restarted before the data partition generation complete 
(#17491)
---
 .../client/sync/SyncDataNodeClientPool.java        |  3 +-
 .../iotdb/confignode/manager/ConfigManager.java    |  6 +-
 .../iotdb/confignode/manager/ProcedureManager.java | 20 +++++++
 .../manager/consensus/ConsensusManager.java        | 19 +++++++
 .../DataPartitionTableIntegrityCheckProcedure.java |  4 +-
 .../iotdb/confignode/service/ConfigNode.java       | 66 +++++++++++++---------
 .../impl/DataNodeInternalRPCServiceImpl.java       | 14 +++--
 .../src/main/thrift/datanode.thrift                |  2 +-
 8 files changed, 95 insertions(+), 39 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncDataNodeClientPool.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncDataNodeClientPool.java
index 9f5729ef06d..b9cf775459c 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncDataNodeClientPool.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncDataNodeClientPool.java
@@ -148,7 +148,8 @@ public class SyncDataNodeClientPool {
         (req, client) -> 
client.generateDataPartitionTable((TGenerateDataPartitionTableReq) req));
     actionMapBuilder.put(
         CnToDnSyncRequestType.GENERATE_DATA_PARTITION_TABLE_HEART_BEAT,
-        (req, client) -> client.generateDataPartitionTableHeartbeat());
+        (req, client) ->
+            
client.generateDataPartitionTableHeartbeat((TGenerateDataPartitionTableReq) 
req));
     actionMap = actionMapBuilder.build();
   }
 
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index f455edb26b8..182dc2f9fb2 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -443,15 +443,15 @@ public class ConfigManager implements IManager {
   }
 
   public void close() throws IOException {
-    if (consensusManager.get() != null) {
-      consensusManager.get().close();
-    }
     if (partitionManager != null) {
       partitionManager.getRegionMaintainer().shutdown();
     }
     if (procedureManager != null) {
       procedureManager.stopExecutor();
     }
+    if (consensusManager.get() != null) {
+      consensusManager.get().close();
+    }
   }
 
   @Override
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
index 4d420cf4f3c..2c5a77303d9 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
@@ -62,6 +62,7 @@ import org.apache.iotdb.confignode.procedure.ProcedureMetrics;
 import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
 import org.apache.iotdb.confignode.procedure.env.RegionMaintainHandler;
 import org.apache.iotdb.confignode.procedure.env.RemoveDataNodeHandler;
+import org.apache.iotdb.confignode.procedure.impl.StateMachineProcedure;
 import org.apache.iotdb.confignode.procedure.impl.cq.CreateCQProcedure;
 import org.apache.iotdb.confignode.procedure.impl.node.AddConfigNodeProcedure;
 import org.apache.iotdb.confignode.procedure.impl.node.RemoveAINodeProcedure;
@@ -2329,6 +2330,25 @@ public class ProcedureManager {
     return new Pair<>(-1L, false);
   }
 
+  public boolean isExistUnfinishedProcedure(
+      Class<? extends StateMachineProcedure<?, ?>> procedureClass) {
+    if (procedureClass == null) {
+      return false;
+    }
+
+    for (Procedure<ConfigNodeProcedureEnv> procedure : 
getExecutor().getProcedures().values()) {
+      if (!procedure.isFinished() && procedureClass.isInstance(procedure)) {
+        LOGGER.info(
+            "[{}] procedure details are {}",
+            procedureClass.getSimpleName(),
+            procedure.toStringDetails());
+        return true;
+      }
+    }
+
+    return false;
+  }
+
   // ======================================================
   /*
      GET-SET Region
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManager.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManager.java
index a4436a94fa2..51773d83923 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManager.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManager.java
@@ -388,6 +388,25 @@ public class ConsensusManager {
     return null;
   }
 
+  public TConfigNodeLocation getNotNullLeaderLocation() {
+    Peer leaderPeer = getLeaderPeer();
+
+    while (leaderPeer == null) {
+      try {
+        Thread.sleep(1000);
+      } catch (InterruptedException ignored) {
+
+      }
+      leaderPeer = getLeaderPeer();
+    }
+
+    Peer finalLeaderPeer = leaderPeer;
+    return getNodeManager().getRegisteredConfigNodes().stream()
+        .filter(leader -> leader.getConfigNodeId() == 
finalLeaderPeer.getNodeId())
+        .findFirst()
+        .orElse(null);
+  }
+
   /**
    * @return true if ConfigNode-leader is elected, false otherwise.
    */
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/partition/DataPartitionTableIntegrityCheckProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/partition/DataPartitionTableIntegrityCheckProcedure.java
index c95c1ec9072..4f2c6933fd8 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/partition/DataPartitionTableIntegrityCheckProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/partition/DataPartitionTableIntegrityCheckProcedure.java
@@ -521,11 +521,13 @@ public class DataPartitionTableIntegrityCheckProcedure
 
       if (!dataPartitionTables.containsKey(dataNodeId)) {
         try {
+          TGenerateDataPartitionTableReq req = new 
TGenerateDataPartitionTableReq();
+          req.setDatabases(databasesWithLostDataPartition);
           Object response =
               SyncDataNodeClientPool.getInstance()
                   .sendSyncRequestToDataNodeWithGivenRetry(
                       dataNode.getLocation().getInternalEndPoint(),
-                      null,
+                      req,
                       
CnToDnSyncRequestType.GENERATE_DATA_PARTITION_TABLE_HEART_BEAT,
                       MAX_RETRY_COUNT);
 
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java
index 3a6c93b12ec..5caac7127f7 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java
@@ -55,6 +55,7 @@ import org.apache.iotdb.confignode.manager.ConfigManager;
 import org.apache.iotdb.confignode.manager.consensus.ConsensusManager;
 import org.apache.iotdb.confignode.manager.pipe.agent.PipeConfigNodeAgent;
 import org.apache.iotdb.confignode.manager.pipe.metric.PipeConfigNodeMetrics;
+import 
org.apache.iotdb.confignode.procedure.impl.partition.DataPartitionTableIntegrityCheckProcedure;
 import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
 import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterResp;
 import org.apache.iotdb.confignode.rpc.thrift.TNodeVersionInfo;
@@ -195,6 +196,9 @@ public class ConfigNode extends ServerCommandLine 
implements ConfigNodeMBean {
         configManager.initConsensusManager();
         upgrade();
         TConfigNodeLocation leaderNodeLocation = waitForLeaderElected();
+        if (leaderNodeLocation == null) {
+          leaderNodeLocation = 
configManager.getConsensusManager().getNotNullLeaderLocation();
+        }
         setUpMetricService();
         // Notice: We always set up Seed-ConfigNode's RPC service lastly to 
ensure
         // that the external service is not provided until ConfigNode is fully 
available
@@ -225,36 +229,42 @@ public class ConfigNode extends ServerCommandLine 
implements ConfigNodeMBean {
 
         /* After the ConfigNode leader election, a leader switch may occur, 
which could cause the procedure not to be created. This can happen if the 
original leader has not yet executed the procedure creation, while the other 
followers have already finished starting up. Therefore, having the original 
leader (before the leader switch) initiate the process ensures that only one 
procedure will be created. */
         if (leaderNodeLocation.getConfigNodeId() == configNodeId) {
-          dataPartitionTableCheckFuture =
-              dataPartitionTableCheckExecutor.submit(
-                  () -> {
-                    LOGGER.info(
-                        "[DataPartitionIntegrity] Prepare to start 
dataPartitionTableIntegrityCheck after all datanodes started up");
-                    
Thread.sleep(CONF.getPartitionTableRecoverWaitAllDnUpTimeoutInMs());
-
-                    while (true) {
-                      List<Integer> dnList =
-                          configManager
-                              .getLoadManager()
-                              .filterDataNodeThroughStatus(NodeStatus.Running);
-                      if (dnList != null && !dnList.isEmpty()) {
-                        LOGGER.info("Starting 
dataPartitionTableIntegrityCheck...");
-                        TSStatus status =
-                            
configManager.getProcedureManager().dataPartitionTableIntegrityCheck();
-                        if (status.getCode() != 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-                          LOGGER.error(
-                              "Data partition table integrity check failed! 
Current status code is {}, status message is {}",
-                              status.getCode(),
-                              status.getMessage());
+          if (!configManager
+              .getProcedureManager()
+              
.isExistUnfinishedProcedure(DataPartitionTableIntegrityCheckProcedure.class)) {
+            dataPartitionTableCheckFuture =
+                dataPartitionTableCheckExecutor.submit(
+                    () -> {
+                      LOGGER.info(
+                          "[DataPartitionIntegrity] Prepare to start 
dataPartitionTableIntegrityCheck after all datanodes started up");
+                      
Thread.sleep(CONF.getPartitionTableRecoverWaitAllDnUpTimeoutInMs());
+
+                      while (true) {
+                        List<Integer> dnList =
+                            configManager
+                                .getLoadManager()
+                                
.filterDataNodeThroughStatus(NodeStatus.Running);
+                        if (dnList != null && !dnList.isEmpty()) {
+                          LOGGER.info("Starting 
dataPartitionTableIntegrityCheck...");
+                          TSStatus status =
+                              configManager
+                                  .getProcedureManager()
+                                  .dataPartitionTableIntegrityCheck();
+                          if (status.getCode() != 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+                            LOGGER.error(
+                                "Data partition table integrity check failed! 
Current status code is {}, status message is {}",
+                                status.getCode(),
+                                status.getMessage());
+                          }
+                          break;
+                        } else {
+                          LOGGER.info("No running datanodes found, 
waiting...");
+                          Thread.sleep(5000);
                         }
-                        break;
-                      } else {
-                        LOGGER.info("No running datanodes found, waiting...");
-                        Thread.sleep(5000);
                       }
-                    }
-                    return null;
-                  });
+                      return null;
+                    });
+          }
         }
         return;
       } else {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
index f5cdb47876a..9fc5fcb436c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -3228,7 +3228,8 @@ public class DataNodeInternalRPCServiceImpl implements 
IDataNodeRPCService.Iface
   }
 
   @Override
-  public TGenerateDataPartitionTableHeartbeatResp 
generateDataPartitionTableHeartbeat() {
+  public TGenerateDataPartitionTableHeartbeatResp 
generateDataPartitionTableHeartbeat(
+      TGenerateDataPartitionTableReq req) {
     TGenerateDataPartitionTableHeartbeatResp resp = new 
TGenerateDataPartitionTableHeartbeatResp();
     // Must be lower than the RPC request timeout, in milliseconds
     final long timeoutMs = 50000;
@@ -3238,10 +3239,13 @@ public class DataNodeInternalRPCServiceImpl implements 
IDataNodeRPCService.Iface
       // To resolve this situation that the DataNode is registered and didn't 
request
       // generateDataPartitionTable interface yet.
       if (currentGeneratorFuture == null || currentGenerator == null) {
-        resp.setErrorCode(DataPartitionTableGeneratorState.UNKNOWN.getCode());
-        resp.setMessage("No DataPartitionTable generation task found");
-        resp.setStatus(RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR));
-        return resp;
+        generateDataPartitionTable(req);
+        if (currentGeneratorFuture == null || currentGenerator == null) {
+          
resp.setErrorCode(DataPartitionTableGeneratorState.UNKNOWN.getCode());
+          resp.setMessage("No DataPartitionTable generation task found");
+          
resp.setStatus(RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR));
+          return resp;
+        }
       }
 
       currentGeneratorFuture.get(timeoutMs, TimeUnit.MILLISECONDS);
diff --git a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift 
b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
index 8c3e12217e0..4323e956a8e 100644
--- a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
+++ b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
@@ -1330,7 +1330,7 @@ service IDataNodeRPCService {
   /**
    * Check the status of DataPartitionTable generation task
    */
-  TGenerateDataPartitionTableHeartbeatResp 
generateDataPartitionTableHeartbeat()
+  TGenerateDataPartitionTableHeartbeatResp 
generateDataPartitionTableHeartbeat(TGenerateDataPartitionTableReq req)
 
   /**
   * END: Data Partition Table Integrity Check

Reply via email to