This is an automated email from the ASF dual-hosted git repository.

CRZbulabula pushed a commit to branch improve-confignode-leader-confirm
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 4e6d45f4e689a38274f26654b37dc4ba5d09f160
Author: Yongzao <[email protected]>
AuthorDate: Tue Jun 2 19:54:41 2026 +0800

    Improve ConfigNode leader warm-up gating
---
 .../java/org/apache/iotdb/rpc/TSStatusCode.java    |   1 +
 .../heartbeat/DataNodeHeartbeatHandler.java        |  93 +++++++------
 .../statemachine/ConfigRegionStateMachine.java     |  25 +++-
 .../iotdb/confignode/manager/ConfigManager.java    |   8 +-
 .../iotdb/confignode/manager/load/LoadManager.java | 110 +++++++++++++++-
 .../manager/load/cache/AbstractLoadCache.java      |   4 +
 .../confignode/manager/load/cache/LoadCache.java   | 146 +++++++++++++++++++++
 .../load/cache/consensus/ConsensusGroupCache.java  |   2 +-
 .../manager/partition/PartitionManager.java        |  16 ++-
 .../confignode/manager/load/LoadManagerTest.java   |  62 +++++++++
 .../iotdb/db/protocol/client/ConfigNodeClient.java |  17 +++
 11 files changed, 435 insertions(+), 49 deletions(-)

diff --git 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
index 035c648132f..dad3ef44e23 100644
--- 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
+++ 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
@@ -209,6 +209,7 @@ public enum TSStatusCode {
   CAN_NOT_CONNECT_AINODE(1011),
   NO_AVAILABLE_REPLICA(1012),
   NO_AVAILABLE_AINODE(1013),
+  CONFIG_NODE_LEADER_WARMING_UP(1014),
 
   // Sync, Load TsFile
   LOAD_FILE_ERROR(1100),
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java
index e7a31b1dc73..18b8206cbb2 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.confignode.client.async.handlers.heartbeat;
 
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
 import org.apache.iotdb.commons.client.ThriftClient;
 import org.apache.iotdb.commons.cluster.NodeStatus;
@@ -27,7 +28,6 @@ import org.apache.iotdb.commons.cluster.RegionStatus;
 import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
 import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
 import org.apache.iotdb.confignode.manager.load.LoadManager;
-import 
org.apache.iotdb.confignode.manager.load.cache.consensus.ConsensusGroupHeartbeatSample;
 import org.apache.iotdb.confignode.manager.load.cache.node.NodeHeartbeatSample;
 import 
org.apache.iotdb.confignode.manager.load.cache.region.RegionHeartbeatSample;
 import 
org.apache.iotdb.confignode.manager.pipe.coordinator.runtime.PipeRuntimeCoordinator;
@@ -36,6 +36,7 @@ import org.apache.iotdb.mpp.rpc.thrift.TDataNodeHeartbeatResp;
 
 import org.apache.thrift.async.AsyncMethodCallback;
 
+import java.util.Collections;
 import java.util.Map;
 import java.util.function.Consumer;
 
@@ -89,46 +90,55 @@ public class DataNodeHeartbeatHandler implements 
AsyncMethodCallback<TDataNodeHe
 
     RegionStatus regionStatus = 
RegionStatus.valueOf(heartbeatResp.getStatus());
 
-    heartbeatResp
-        .getJudgedLeaders()
-        .forEach(
-            (regionGroupId, isLeader) -> {
-
-              // Do not allow regions to inherit the Removing state from 
datanode
-              RegionStatus nextRegionStatus = regionStatus;
-              if (nextRegionStatus == RegionStatus.Removing) {
-                nextRegionStatus =
-                    loadManager
-                        .getLoadCache()
-                        .getRegionCacheLastSampleStatus(regionGroupId, nodeId);
-              }
-
-              // Update RegionGroupCache
-              loadManager
-                  .getLoadCache()
-                  .cacheRegionHeartbeatSample(
-                      regionGroupId,
-                      nodeId,
-                      new RegionHeartbeatSample(
-                          heartbeatResp.getHeartbeatTimestamp(),
-                          // Region will inherit DataNode's status
-                          nextRegionStatus),
-                      false);
-
-              if 
(((TConsensusGroupType.SchemaRegion.equals(regionGroupId.getType())
-                          && SCHEMA_REGION_SHOULD_CACHE_CONSENSUS_SAMPLE)
-                      || 
(TConsensusGroupType.DataRegion.equals(regionGroupId.getType())
-                          && DATA_REGION_SHOULD_CACHE_CONSENSUS_SAMPLE))
-                  && Boolean.TRUE.equals(isLeader)) {
-                // Update ConsensusGroupCache when necessary
-                loadManager
-                    .getLoadCache()
-                    .cacheConsensusSample(
-                        regionGroupId,
-                        new ConsensusGroupHeartbeatSample(
-                            
heartbeatResp.getConsensusLogicalTimeMap().get(regionGroupId), nodeId));
-              }
-            });
+    Map<TConsensusGroupId, Boolean> judgedLeaders =
+        heartbeatResp.isSetJudgedLeaders()
+            ? heartbeatResp.getJudgedLeaders()
+            : Collections.emptyMap();
+    judgedLeaders.forEach(
+        (regionGroupId, isLeader) -> {
+
+          // Do not allow regions to inherit the Removing state from datanode
+          RegionStatus nextRegionStatus = regionStatus;
+          if (nextRegionStatus == RegionStatus.Removing) {
+            nextRegionStatus =
+                
loadManager.getLoadCache().getRegionCacheLastSampleStatus(regionGroupId, 
nodeId);
+          }
+
+          // Update RegionGroupCache
+          loadManager
+              .getLoadCache()
+              .cacheRegionHeartbeatSample(
+                  regionGroupId,
+                  nodeId,
+                  new RegionHeartbeatSample(
+                      heartbeatResp.getHeartbeatTimestamp(),
+                      // Region will inherit DataNode's status
+                      nextRegionStatus),
+                  false);
+
+          boolean shouldCacheConsensusSample =
+              (TConsensusGroupType.SchemaRegion.equals(regionGroupId.getType())
+                      && SCHEMA_REGION_SHOULD_CACHE_CONSENSUS_SAMPLE)
+                  || 
(TConsensusGroupType.DataRegion.equals(regionGroupId.getType())
+                      && DATA_REGION_SHOULD_CACHE_CONSENSUS_SAMPLE);
+          long logicalTimestamp =
+              heartbeatResp.isSetConsensusLogicalTimeMap()
+                      && 
heartbeatResp.getConsensusLogicalTimeMap().containsKey(regionGroupId)
+                  ? 
heartbeatResp.getConsensusLogicalTimeMap().get(regionGroupId)
+                  : heartbeatResp.getHeartbeatTimestamp();
+          loadManager
+              .getLoadCache()
+              .cacheConsensusGroupHeartbeatSample(
+                  regionGroupId,
+                  nodeId,
+                  Boolean.TRUE.equals(isLeader),
+                  logicalTimestamp,
+                  shouldCacheConsensusSample);
+        });
+    loadManager
+        .getLoadCache()
+        .cacheUnreportedDataNodeRegionHeartbeatSamples(
+            nodeId, judgedLeaders.keySet(), 
heartbeatResp.getHeartbeatTimestamp());
 
     if (heartbeatResp.getRegionDeviceUsageMap() != null) {
       deviceNum.putAll(heartbeatResp.getRegionDeviceUsageMap());
@@ -170,6 +180,7 @@ public class DataNodeHeartbeatHandler implements 
AsyncMethodCallback<TDataNodeHe
     if (ThriftClient.isConnectionBroken(e)) {
       loadManager.forceUpdateNodeCache(
           NodeType.DataNode, nodeId, new 
NodeHeartbeatSample(NodeStatus.Unknown));
+      loadManager.getLoadCache().cacheDataNodeHeartbeatFailureSample(nodeId, 
System.nanoTime());
     }
     loadManager.getLoadCache().resetHeartbeatProcessing(nodeId);
   }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java
index fe687d17556..2b0dc1610e6 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java
@@ -288,13 +288,34 @@ public class ConfigRegionStateMachine implements 
IStateMachine, IStateMachine.Ev
         ConfigNodeDescriptor.getInstance().getConf().getConfigNodeId(),
         currentNodeTEndPoint);
 
-    // Always start load services first
-    configManager.getLoadManager().startLoadServices();
+    // Always start load services first and wait for its first full warm-up 
before serving.
+    long loadReadyEpoch = configManager.getLoadManager().startLoadServices();
 
     if (CONF.isEnableTopologyProbing()) {
       configManager.getLoadManager().startTopologyService();
     }
 
+    threadPool.submit(() -> startLeaderServicesAfterLoadReady(loadReadyEpoch));
+  }
+
+  private void startLeaderServicesAfterLoadReady(long loadReadyEpoch) {
+    if (!configManager.getLoadManager().waitForLoadReady(loadReadyEpoch)) {
+      LOGGER.info(
+          
ConfigNodeMessages.CURRENT_NODE_NODEID_IP_PORT_IS_NO_LONGER_THE_LEADER
+              + "skip starting leader services because load warm-up is 
interrupted",
+          ConfigNodeDescriptor.getInstance().getConf().getConfigNodeId(),
+          currentNodeTEndPoint);
+      return;
+    }
+    if (!configManager.getConsensusManager().isLeaderReady()) {
+      LOGGER.info(
+          
ConfigNodeMessages.CURRENT_NODE_NODEID_IP_PORT_IS_NO_LONGER_THE_LEADER
+              + "skip starting leader services because consensus leader is no 
longer ready",
+          ConfigNodeDescriptor.getInstance().getConf().getConfigNodeId(),
+          currentNodeTEndPoint);
+      return;
+    }
+
     // Start leader scheduling services
     configManager.getProcedureManager().startExecutor();
     threadPool.submit(
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index 2db0255e35a..ea2ffc1c2c1 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -1247,7 +1247,13 @@ public class ConfigManager implements IManager {
               "ConsensusManager of target-ConfigNode is not initialized, "
                   + "please make sure the target-ConfigNode has been started 
successfully.");
     }
-    return getConsensusManager().confirmLeader();
+    TSStatus status = getConsensusManager().confirmLeader();
+    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
+        && !getLoadManager().isLoadReady()) {
+      return new 
TSStatus(TSStatusCode.CONFIG_NODE_LEADER_WARMING_UP.getStatusCode())
+          .setMessage(getLoadManager().getLoadReadyReason());
+    }
+    return status;
   }
 
   @Override
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
index e97f32bdbda..148a4bc8ae8 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
@@ -48,10 +48,16 @@ import 
org.apache.iotdb.confignode.manager.load.service.TopologyService;
 import org.apache.iotdb.confignode.manager.partition.RegionGroupStatus;
 import org.apache.iotdb.confignode.rpc.thrift.TTimeSlotList;
 
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Function;
+import java.util.stream.Collectors;
 
 /**
  * The {@link LoadManager} at ConfigNodeGroup-Leader is active. It proactively 
implements the
@@ -59,6 +65,9 @@ import java.util.function.Function;
  */
 public class LoadManager {
 
+  private static final long LOAD_READY_CHECK_INTERVAL_MS =
+      Math.max(10, Math.min(100, StatisticsService.STATISTICS_UPDATE_INTERVAL 
/ 10));
+
   protected final IManager configManager;
 
   /** Balancers. */
@@ -74,6 +83,10 @@ public class LoadManager {
   private final StatisticsService statisticsService;
   private final EventService eventService;
   private final TopologyService topologyService;
+  private final AtomicBoolean loadServicesStarted;
+  private final AtomicLong loadReadyEpoch;
+  private final AtomicBoolean loadReady;
+  private volatile String loadReadyReason;
 
   public LoadManager(IManager configManager) {
     this.configManager = configManager;
@@ -90,6 +103,10 @@ public class LoadManager {
     
this.eventService.register(configManager.getPipeManager().getPipeRuntimeCoordinator());
     this.eventService.register(routeBalancer);
     this.eventService.register(topologyService);
+    this.loadServicesStarted = new AtomicBoolean(false);
+    this.loadReadyEpoch = new AtomicLong(0);
+    this.loadReady = new AtomicBoolean(false);
+    this.loadReadyReason = "ConfigNode leader load services are not started.";
   }
 
   protected void setHeartbeatService(IManager configManager, LoadCache 
loadCache) {
@@ -146,15 +163,24 @@ public class LoadManager {
     partitionBalancer.reBalanceDataPartitionPolicy(database);
   }
 
-  public void startLoadServices() {
+  public long startLoadServices() {
+    long epoch = loadReadyEpoch.incrementAndGet();
+    loadReady.set(false);
+    loadReadyReason = "ConfigNode leader is waiting for cluster heartbeat 
sampling.";
     loadCache.initHeartbeatCache(configManager);
+    loadServicesStarted.set(true);
     heartbeatService.startHeartbeatService();
     statisticsService.startLoadStatisticsService();
     eventService.startEventService();
     partitionBalancer.setupPartitionBalancer();
+    return epoch;
   }
 
   public void stopLoadServices() {
+    loadReadyEpoch.incrementAndGet();
+    loadServicesStarted.set(false);
+    loadReady.set(false);
+    loadReadyReason = "ConfigNode leader load services are stopped.";
     heartbeatService.stopHeartbeatService();
     statisticsService.stopLoadStatisticsService();
     eventService.stopEventService();
@@ -163,6 +189,88 @@ public class LoadManager {
     routeBalancer.clearRegionPriority();
   }
 
+  public boolean waitForLoadReady(long epoch) {
+    while (epoch == loadReadyEpoch.get() && 
!Thread.currentThread().isInterrupted()) {
+      if (tryUpdateLoadReady()) {
+        return true;
+      }
+      try {
+        TimeUnit.MILLISECONDS.sleep(LOAD_READY_CHECK_INTERVAL_MS);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        return false;
+      }
+    }
+    return false;
+  }
+
+  public boolean isLoadReady() {
+    return loadReady.get() || tryUpdateLoadReady();
+  }
+
+  public String getLoadReadyReason() {
+    return loadReadyReason;
+  }
+
+  private synchronized boolean tryUpdateLoadReady() {
+    if (loadReady.get()) {
+      return true;
+    }
+    if (!loadServicesStarted.get()) {
+      loadReadyReason = "ConfigNode leader load services are not started.";
+      return false;
+    }
+
+    loadCache.updateNodeStatistics(false);
+    loadCache.updateRegionGroupStatistics();
+    loadCache.updateConsensusGroupStatistics();
+    eventService.checkAndBroadcastNodeStatisticsChangeEventIfNecessary();
+    
eventService.checkAndBroadcastRegionGroupStatisticsChangeEventIfNecessary();
+    
eventService.checkAndBroadcastConsensusGroupStatisticsChangeEventIfNecessary();
+
+    List<String> unreadyReasons = loadCache.getLoadWarmUpUnreadyReasons();
+    if (!unreadyReasons.isEmpty()
+        && unreadyReasons.stream().anyMatch(reason -> 
!reason.startsWith("consensusGroups="))) {
+      loadReadyReason = "ConfigNode leader is warming up LoadCache: " + 
unreadyReasons;
+      return false;
+    }
+
+    routeBalancer.balanceRegionLeaderAndPriority();
+
+    unreadyReasons = loadCache.getLoadWarmUpUnreadyReasons();
+    if (!unreadyReasons.isEmpty()) {
+      loadReadyReason = "ConfigNode leader is warming up LoadCache: " + 
unreadyReasons;
+      return false;
+    }
+
+    List<TConsensusGroupId> unreadyRegionPriorities = 
getUnreadyRegionPriorities();
+    if (!unreadyRegionPriorities.isEmpty()) {
+      loadReadyReason =
+          "ConfigNode leader is warming up region priority: "
+              + unreadyRegionPriorities.subList(0, Math.min(10, 
unreadyRegionPriorities.size()))
+              + (unreadyRegionPriorities.size() > 10
+                  ? "...(" + (unreadyRegionPriorities.size() - 10) + " more)"
+                  : "");
+      return false;
+    }
+
+    loadReadyReason = "ConfigNode leader load services are ready.";
+    loadReady.set(true);
+    return true;
+  }
+
+  private List<TConsensusGroupId> getUnreadyRegionPriorities() {
+    List<TConsensusGroupId> regionGroupIds = loadCache.getAllRegionGroupIds();
+    if (regionGroupIds.isEmpty()) {
+      return Collections.emptyList();
+    }
+    Map<TConsensusGroupId, TRegionReplicaSet> regionPriorityMap =
+        routeBalancer.getRegionPriorityMap();
+    return regionGroupIds.stream()
+        .filter(regionGroupId -> !regionPriorityMap.containsKey(regionGroupId))
+        .collect(Collectors.toCollection(ArrayList::new));
+  }
+
   public void startTopologyService() {
     topologyService.startTopologyService();
   }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/AbstractLoadCache.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/AbstractLoadCache.java
index d61a0043520..e5ab6445f98 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/AbstractLoadCache.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/AbstractLoadCache.java
@@ -97,6 +97,10 @@ public abstract class AbstractLoadCache {
     return slidingWindow.isEmpty() ? null : 
slidingWindow.get(slidingWindow.size() - 1);
   }
 
+  public boolean hasHeartbeatSample() {
+    return getLastSample() != null;
+  }
+
   /**
    * Update currentStatistics based on the latest heartbeat sample that cached 
in the slidingWindow.
    */
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java
index 818171c89bc..6be880dd085 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java
@@ -44,6 +44,7 @@ import 
org.apache.iotdb.confignode.manager.load.cache.node.ConfigNodeHeartbeatCa
 import 
org.apache.iotdb.confignode.manager.load.cache.node.DataNodeHeartbeatCache;
 import org.apache.iotdb.confignode.manager.load.cache.node.NodeHeartbeatSample;
 import org.apache.iotdb.confignode.manager.load.cache.node.NodeStatistics;
+import org.apache.iotdb.confignode.manager.load.cache.region.RegionCache;
 import org.apache.iotdb.confignode.manager.load.cache.region.RegionGroupCache;
 import 
org.apache.iotdb.confignode.manager.load.cache.region.RegionGroupStatistics;
 import 
org.apache.iotdb.confignode.manager.load.cache.region.RegionHeartbeatSample;
@@ -82,6 +83,7 @@ public class LoadCache {
       Math.max(
           ProcedureManager.PROCEDURE_WAIT_TIME_OUT - 
TimeUnit.SECONDS.toMillis(2),
           TimeUnit.SECONDS.toMillis(10));
+  private static final int MAX_UNREADY_ENTITY_PRINT = 10;
 
   private static final ConfigNodeConfig CONF = 
ConfigNodeDescriptor.getInstance().getConf();
 
@@ -98,6 +100,8 @@ public class LoadCache {
   private final Map<Integer, Map<Integer, Long>> regionRawSizeMap;
   // Map<RegionGroupId, ConsensusGroupCache>
   private final Map<TConsensusGroupId, ConsensusGroupCache> 
consensusGroupCacheMap;
+  // Map<RegionGroupId, Set<DataNodeId that has reported leader judgment>>
+  private final Map<TConsensusGroupId, Set<Integer>> 
consensusGroupHeartbeatSampledNodeMap;
   // Map<DataNodeId, confirmedConfigNodes>
   private final Map<Integer, Set<TEndPoint>> confirmedConfigNodeMap;
   private Map<Integer, Set<Integer>> topologyGraph;
@@ -110,6 +114,7 @@ public class LoadCache {
     this.regionSizeMap = new ConcurrentHashMap<>();
     this.regionRawSizeMap = new ConcurrentHashMap<>();
     this.consensusGroupCacheMap = new ConcurrentHashMap<>();
+    this.consensusGroupHeartbeatSampledNodeMap = new ConcurrentHashMap<>();
     this.confirmedConfigNodeMap = new ConcurrentHashMap<>();
     this.topologyGraph = new HashMap<>();
     this.topologyUpdated = new AtomicBoolean(false);
@@ -175,6 +180,7 @@ public class LoadCache {
       Map<String, List<TRegionReplicaSet>> regionReplicaMap) {
     regionGroupCacheMap.clear();
     consensusGroupCacheMap.clear();
+    consensusGroupHeartbeatSampledNodeMap.clear();
     regionReplicaMap.forEach(
         (database, regionReplicaSets) ->
             regionReplicaSets.forEach(
@@ -192,6 +198,8 @@ public class LoadCache {
                               .collect(Collectors.toSet()),
                           isStrongConsistency));
                   consensusGroupCacheMap.put(regionGroupId, new 
ConsensusGroupCache());
+                  consensusGroupHeartbeatSampledNodeMap.put(
+                      regionGroupId, ConcurrentHashMap.newKeySet());
                 }));
   }
 
@@ -200,6 +208,7 @@ public class LoadCache {
     nodeCacheMap.clear();
     regionGroupCacheMap.clear();
     consensusGroupCacheMap.clear();
+    consensusGroupHeartbeatSampledNodeMap.clear();
   }
 
   /**
@@ -302,6 +311,7 @@ public class LoadCache {
         regionGroupId,
         new RegionGroupCache(database, regionGroupId, dataNodeIds, 
isStrongConsistency));
     consensusGroupCacheMap.put(regionGroupId, new ConsensusGroupCache());
+    consensusGroupHeartbeatSampledNodeMap.put(regionGroupId, 
ConcurrentHashMap.newKeySet());
   }
 
   /**
@@ -364,6 +374,69 @@ public class LoadCache {
         .ifPresent(group -> group.cacheHeartbeatSample(sample));
   }
 
+  public void cacheConsensusGroupHeartbeatSample(
+      TConsensusGroupId regionGroupId,
+      int nodeId,
+      boolean isLeader,
+      long logicalTimestamp,
+      boolean cacheLeader) {
+    consensusGroupHeartbeatSampledNodeMap
+        .computeIfAbsent(regionGroupId, empty -> ConcurrentHashMap.newKeySet())
+        .add(nodeId);
+    if (cacheLeader && isLeader) {
+      cacheConsensusSample(
+          regionGroupId, new ConsensusGroupHeartbeatSample(logicalTimestamp, 
nodeId));
+    } else if (isConsensusGroupHeartbeatFullySampled(regionGroupId)
+        && !Optional.ofNullable(consensusGroupCacheMap.get(regionGroupId))
+            .map(AbstractLoadCache::hasHeartbeatSample)
+            .orElse(false)) {
+      cacheConsensusSample(
+          regionGroupId,
+          new ConsensusGroupHeartbeatSample(
+              logicalTimestamp, ConsensusGroupCache.UN_READY_LEADER_ID));
+    }
+  }
+
+  private boolean isConsensusGroupHeartbeatFullySampled(TConsensusGroupId 
regionGroupId) {
+    return Optional.ofNullable(regionGroupCacheMap.get(regionGroupId))
+        .map(RegionGroupCache::getRegionLocations)
+        .map(
+            regionLocations ->
+                consensusGroupHeartbeatSampledNodeMap
+                    .getOrDefault(regionGroupId, Collections.emptySet())
+                    .containsAll(regionLocations))
+        .orElse(false);
+  }
+
+  public void cacheDataNodeHeartbeatFailureSample(int nodeId, long 
sampleTimestamp) {
+    cacheUnreportedDataNodeRegionHeartbeatSamples(nodeId, 
Collections.emptySet(), sampleTimestamp);
+  }
+
+  public void cacheUnreportedDataNodeRegionHeartbeatSamples(
+      int nodeId, Set<TConsensusGroupId> reportedRegionGroupIds, long 
sampleTimestamp) {
+    getRegionGroupIdsByDataNodeId(nodeId)
+        .forEach(
+            regionGroupId -> {
+              if (reportedRegionGroupIds.contains(regionGroupId)) {
+                return;
+              }
+              cacheRegionHeartbeatSample(
+                  regionGroupId,
+                  nodeId,
+                  new RegionHeartbeatSample(sampleTimestamp, 
RegionStatus.Unknown),
+                  false);
+              cacheConsensusGroupHeartbeatSample(
+                  regionGroupId, nodeId, false, sampleTimestamp, false);
+            });
+  }
+
+  private List<TConsensusGroupId> getRegionGroupIdsByDataNodeId(int nodeId) {
+    return regionGroupCacheMap.entrySet().stream()
+        .filter(entry -> 
entry.getValue().getRegionLocations().contains(nodeId))
+        .map(Map.Entry::getKey)
+        .collect(Collectors.toList());
+  }
+
   /** Update the NodeStatistics of all Nodes. */
   public void updateNodeStatistics(boolean forceUpdate) {
     nodeCacheMap
@@ -450,6 +523,10 @@ public class LoadCache {
     return regionGroupIdsMap;
   }
 
+  public List<TConsensusGroupId> getAllRegionGroupIds() {
+    return new ArrayList<>(regionGroupCacheMap.keySet());
+  }
+
   /**
    * Get the RegionGroupStatistics of all RegionGroups.
    *
@@ -496,6 +573,74 @@ public class LoadCache {
     return consensusGroupStatisticsMap;
   }
 
+  public boolean isLoadWarmUpReady() {
+    return getLoadWarmUpUnreadyReasons().isEmpty();
+  }
+
+  public List<String> getLoadWarmUpUnreadyReasons() {
+    List<String> unreadyReasons = new ArrayList<>();
+    List<Integer> unreadyNodes = new ArrayList<>();
+    nodeCacheMap.forEach(
+        (nodeId, nodeCache) -> {
+          if (nodeId == ConfigNodeHeartbeatCache.CURRENT_NODE_ID) {
+            return;
+          }
+          if (!nodeCache.hasHeartbeatSample()
+              || nodeCache.getCurrentStatistics().getStatisticsNanoTimestamp() 
== Long.MIN_VALUE) {
+            unreadyNodes.add(nodeId);
+          }
+        });
+    addUnreadyReason(unreadyReasons, "nodes", unreadyNodes);
+
+    List<String> unreadyRegions = new ArrayList<>();
+    List<TConsensusGroupId> unreadyRegionGroups = new ArrayList<>();
+    regionGroupCacheMap.forEach(
+        (regionGroupId, regionGroupCache) -> {
+          regionGroupCache
+              .getRegionLocations()
+              .forEach(
+                  dataNodeId -> {
+                    RegionCache regionCache = 
regionGroupCache.getRegionCache(dataNodeId);
+                    if (regionCache == null || 
!regionCache.hasHeartbeatSample()) {
+                      unreadyRegions.add(regionGroupId + "@" + dataNodeId);
+                    }
+                  });
+          if (!regionGroupCache
+              .getCurrentStatistics()
+              .getRegionStatisticsMap()
+              .keySet()
+              .containsAll(regionGroupCache.getRegionLocations())) {
+            unreadyRegionGroups.add(regionGroupId);
+          }
+        });
+    addUnreadyReason(unreadyReasons, "regions", unreadyRegions);
+    addUnreadyReason(unreadyReasons, "regionGroups", unreadyRegionGroups);
+
+    List<TConsensusGroupId> unreadyConsensusGroups = new ArrayList<>();
+    consensusGroupCacheMap.forEach(
+        (consensusGroupId, consensusGroupCache) -> {
+          if (!consensusGroupCache.hasHeartbeatSample()
+              || 
consensusGroupCache.getCurrentStatistics().getStatisticsNanoTimestamp() == 0) {
+            unreadyConsensusGroups.add(consensusGroupId);
+          }
+        });
+    addUnreadyReason(unreadyReasons, "consensusGroups", 
unreadyConsensusGroups);
+    return unreadyReasons;
+  }
+
+  private void addUnreadyReason(List<String> reasons, String entityName, 
List<?> unreadyEntities) {
+    if (unreadyEntities.isEmpty()) {
+      return;
+    }
+    List<?> entitiesToPrint =
+        unreadyEntities.subList(0, Math.min(MAX_UNREADY_ENTITY_PRINT, 
unreadyEntities.size()));
+    String suffix =
+        unreadyEntities.size() > MAX_UNREADY_ENTITY_PRINT
+            ? "...(" + (unreadyEntities.size() - MAX_UNREADY_ENTITY_PRINT) + " 
more)"
+            : "";
+    reasons.add(entityName + "=" + entitiesToPrint + suffix);
+  }
+
   /**
    * Safely get NodeStatus by NodeId.
    *
@@ -714,6 +859,7 @@ public class LoadCache {
   public void removeRegionGroupCache(TConsensusGroupId consensusGroupId) {
     regionGroupCacheMap.remove(consensusGroupId);
     consensusGroupCacheMap.remove(consensusGroupId);
+    consensusGroupHeartbeatSampledNodeMap.remove(consensusGroupId);
   }
 
   /**
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/consensus/ConsensusGroupCache.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/consensus/ConsensusGroupCache.java
index aa924dc29b5..7dcacc4fd80 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/consensus/ConsensusGroupCache.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/consensus/ConsensusGroupCache.java
@@ -41,7 +41,7 @@ public class ConsensusGroupCache extends AbstractLoadCache {
     synchronized (slidingWindow) {
       lastSample = (ConsensusGroupHeartbeatSample) getLastSample();
     }
-    if (lastSample != null && lastSample.getLeaderId() != UN_READY_LEADER_ID) {
+    if (lastSample != null) {
       currentStatistics.set(
           new ConsensusGroupStatistics(System.nanoTime(), 
lastSample.getLeaderId()));
     }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
index 5be81256b5c..c9989b5deae 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
@@ -300,7 +300,7 @@ public class PartitionManager {
         assignedSchemaPartition =
             
getLoadManager().allocateSchemaPartition(unassignedSchemaPartitionSlotsMap);
       } catch (final NoAvailableRegionGroupException e) {
-        status = getConsensusManager().confirmLeader();
+        status = confirmLeader();
         if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
           // The allocation might fail due to leadership change
           resp.setStatus(status);
@@ -445,7 +445,7 @@ public class PartitionManager {
         assignedDataPartition =
             
getLoadManager().allocateDataPartition(unassignedDataPartitionSlotsMap);
       } catch (DatabaseNotExistsException | NoAvailableRegionGroupException e) 
{
-        status = getConsensusManager().confirmLeader();
+        status = confirmLeader();
         if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
           // The allocation might fail due to leadership change
           resp.setStatus(status);
@@ -543,7 +543,7 @@ public class PartitionManager {
   }
 
   private TSStatus consensusWritePartitionResult(ConfigPhysicalPlan plan) {
-    TSStatus status = getConsensusManager().confirmLeader();
+    TSStatus status = confirmLeader();
     if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
       // Here we check the leadership second time
       // since the RegionGroup creating process might take some time
@@ -1597,6 +1597,16 @@ public class PartitionManager {
     return regionMaintainer;
   }
 
+  private TSStatus confirmLeader() {
+    TSStatus status = getConsensusManager().confirmLeader();
+    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
+        && !getLoadManager().isLoadReady()) {
+      return new 
TSStatus(TSStatusCode.CONFIG_NODE_LEADER_WARMING_UP.getStatusCode())
+          .setMessage(getLoadManager().getLoadReadyReason());
+    }
+    return status;
+  }
+
   private ConsensusManager getConsensusManager() {
     return configManager.getConsensusManager();
   }
diff --git 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/LoadManagerTest.java
 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/LoadManagerTest.java
index fd62c31ae36..6380fec06f7 100644
--- 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/LoadManagerTest.java
+++ 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/LoadManagerTest.java
@@ -292,4 +292,66 @@ public class LoadManagerTest {
         new Pair<>(new ConsensusGroupStatistics(newLeaderId), null),
         differentConsensusGroupStatisticsMap.get(regionGroupId));
   }
+
+  @Test
+  public void testLoadWarmUpRequiresAllEntitySamples() {
+    LoadCache loadCache = new LoadCache();
+    TConsensusGroupId regionGroupId = new 
TConsensusGroupId(TConsensusGroupType.DataRegion, 100);
+    Set<Integer> dataNodeIds = Stream.of(11, 12).collect(Collectors.toSet());
+
+    dataNodeIds.forEach(
+        dataNodeId -> loadCache.createNodeHeartbeatCache(NodeType.DataNode, 
dataNodeId));
+    loadCache.createRegionGroupHeartbeatCache("root.warmup", regionGroupId, 
dataNodeIds);
+
+    Assert.assertFalse(loadCache.isLoadWarmUpReady());
+
+    dataNodeIds.forEach(
+        dataNodeId ->
+            loadCache.cacheDataNodeHeartbeatSample(
+                dataNodeId, new NodeHeartbeatSample(NodeStatus.Running)));
+    loadCache.updateNodeStatistics(false);
+    loadCache.cacheRegionHeartbeatSample(
+        regionGroupId, 11, new RegionHeartbeatSample(RegionStatus.Running), 
false);
+    loadCache.cacheConsensusGroupHeartbeatSample(regionGroupId, 11, true, 1, 
true);
+    loadCache.updateRegionGroupStatistics();
+    loadCache.updateConsensusGroupStatistics();
+
+    Assert.assertFalse(loadCache.isLoadWarmUpReady());
+    
Assert.assertTrue(loadCache.getLoadWarmUpUnreadyReasons().toString().contains("regions="));
+
+    loadCache.cacheRegionHeartbeatSample(
+        regionGroupId, 12, new RegionHeartbeatSample(RegionStatus.Running), 
false);
+    loadCache.updateRegionGroupStatistics();
+
+    Assert.assertTrue(
+        loadCache.getLoadWarmUpUnreadyReasons().toString(), 
loadCache.isLoadWarmUpReady());
+  }
+
+  @Test
+  public void testConsensusGroupWarmUpAcceptsFullySampledWithoutLeader() {
+    LoadCache loadCache = new LoadCache();
+    TConsensusGroupId regionGroupId = new 
TConsensusGroupId(TConsensusGroupType.SchemaRegion, 101);
+    Set<Integer> dataNodeIds = Stream.of(21, 22).collect(Collectors.toSet());
+
+    dataNodeIds.forEach(
+        dataNodeId -> loadCache.createNodeHeartbeatCache(NodeType.DataNode, 
dataNodeId));
+    loadCache.createRegionGroupHeartbeatCache("root.warmup", regionGroupId, 
dataNodeIds);
+    dataNodeIds.forEach(
+        dataNodeId -> {
+          loadCache.cacheDataNodeHeartbeatSample(
+              dataNodeId, new NodeHeartbeatSample(NodeStatus.Running));
+          loadCache.cacheRegionHeartbeatSample(
+              regionGroupId, dataNodeId, new 
RegionHeartbeatSample(RegionStatus.Running), false);
+          loadCache.cacheConsensusGroupHeartbeatSample(regionGroupId, 
dataNodeId, false, 1, true);
+        });
+    loadCache.updateNodeStatistics(false);
+    loadCache.updateRegionGroupStatistics();
+    loadCache.updateConsensusGroupStatistics();
+
+    Assert.assertTrue(
+        loadCache.getLoadWarmUpUnreadyReasons().toString(), 
loadCache.isLoadWarmUpReady());
+    Assert.assertEquals(
+        ConsensusGroupStatistics.generateDefaultConsensusGroupStatistics(),
+        loadCache.getCurrentConsensusGroupStatisticsMap().get(regionGroupId));
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java
index 1f0b09f0b86..545ee026871 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java
@@ -401,6 +401,23 @@ public class ConfigNodeClient implements 
IConfigNodeRPCService.Iface, ThriftClie
         }
         return true;
       }
+      if (status.getCode() == 
TSStatusCode.CONFIG_NODE_LEADER_WARMING_UP.getStatusCode()) {
+        if (!isFirstInitiated) {
+          logger.info(
+              "ConfigNode leader {} is warming up before serving DataNode {}, 
will wait and retry."
+                  + " Reason: {}",
+              configNode,
+              config.getAddressAndPort(),
+              status.getMessage());
+        }
+        try {
+          Thread.sleep(WAIT_CN_LEADER_ELECTION_INTERVAL_MS);
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          
logger.warn(DataNodeMiscMessages.UNEXPECTED_INTERRUPTION_CONNECT_CONFIG_NODE_BREAK);
+        }
+        return true;
+      }
       return false;
     } finally {
       isFirstInitiated = false;


Reply via email to