This is an automated email from the ASF dual-hosted git repository.

CRZbulabula pushed a commit to branch confignode-readonly-disk-check
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit a6a8291e5883b09a45f8d2d6c89c0bdbcb8f2af0
Author: Yongzao <[email protected]>
AuthorDate: Wed May 20 11:27:23 2026 +0800

    Add ConfigNode ReadOnly state with DiskFull/DiskCrash heartbeat self-check
    
    ConfigNode now reports its own NodeStatus.ReadOnly when its critical
    directories (systemDir, consensusDir) are unwritable or near-full,
    mirroring the existing DataNode behavior. NodeStatus reasons are
    extended with a new DISK_CRASH constant alongside DISK_FULL, and the
    ConfigNode heartbeat carries status/statusReason back to the leader.
    
    - node-commons: new DiskChecker utility (probe + state-machine apply),
      with priority DiskCrash > DiskFull and recovery to Running when the
      reason was disk-related. i18n messages added in en + zh.
    - thrift-confignode: TConfigNodeHeartbeatResp gains optional status
      and statusReason fields (forward-compatible).
    - confignode: leader self-checks before fanning out heartbeats;
      follower self-checks on receive and reports back; cache reads from
      CommonConfig for the leader's self entry, otherwise from the sample.
    - datanode: FolderManager exposes a static hasAnyAbnormalFolder()
      aggregator; sampleDiskLoad treats any ABNORMAL folder as DiskCrash
      (which wins over DiskFull) and reuses DiskChecker.apply.
---
 .../iotdb/confignode/conf/ConfigNodeConfig.java    |  10 +
 .../load/cache/node/ConfigNodeHeartbeatCache.java  |  48 +++--
 .../load/cache/node/NodeHeartbeatSample.java       |   9 +-
 .../manager/load/service/HeartbeatService.java     |   8 +
 .../thrift/ConfigNodeRPCServiceProcessor.java      |   9 +
 .../impl/DataNodeInternalRPCServiceImpl.java       |  27 ++-
 .../storageengine/rescon/disk/FolderManager.java   |  43 ++++
 .../apache/iotdb/commons/i18n/CommonMessages.java  |  10 +
 .../apache/iotdb/commons/i18n/CommonMessages.java  |  10 +
 .../apache/iotdb/commons/cluster/DiskChecker.java  | 146 ++++++++++++++
 .../apache/iotdb/commons/cluster/NodeStatus.java   |   1 +
 .../iotdb/commons/cluster/DiskCheckerTest.java     | 224 +++++++++++++++++++++
 .../src/main/thrift/confignode.thrift              |   7 +
 13 files changed, 523 insertions(+), 29 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
index aacbe20203f..203032256d6 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
@@ -41,6 +41,7 @@ import org.slf4j.LoggerFactory;
 import java.io.File;
 import java.lang.reflect.Field;
 import java.util.Arrays;
+import java.util.List;
 
 public class ConfigNodeConfig {
 
@@ -506,6 +507,15 @@ public class ConfigNodeConfig {
     this.consensusDir = consensusDir;
   }
 
+  /**
+   * Directories whose loss would render this ConfigNode unable to serve. Used 
by the periodic
+   * disk-health check on both leader (in HeartbeatService loop) and followers 
(in the
+   * heartbeat-receive path).
+   */
+  public List<String> getCriticalDirs() {
+    return Arrays.asList(systemDir, consensusDir);
+  }
+
   public String getConfigNodeConsensusProtocolClass() {
     return configNodeConsensusProtocolClass;
   }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/ConfigNodeHeartbeatCache.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/ConfigNodeHeartbeatCache.java
index 4d675e1e8a4..7228fca648c 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/ConfigNodeHeartbeatCache.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/ConfigNodeHeartbeatCache.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.confignode.manager.load.cache.node;
 
 import org.apache.iotdb.commons.cluster.NodeStatus;
+import org.apache.iotdb.commons.conf.CommonDescriptor;
 import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
 import org.apache.iotdb.confignode.manager.load.cache.AbstractHeartbeatSample;
 
@@ -49,28 +50,39 @@ public class ConfigNodeHeartbeatCache extends BaseNodeCache 
{
 
   @Override
   public synchronized void updateCurrentStatistics(boolean forceUpdate) {
-    // Skip itself and the Removing status can not be updated
-    if (nodeId == CURRENT_NODE_ID || 
NodeStatus.Removing.equals(getNodeStatus())) {
+    // Removing status can not be updated
+    if (NodeStatus.Removing.equals(getNodeStatus())) {
       return;
     }
 
-    NodeHeartbeatSample lastSample;
-    // Update Node status
-    NodeStatus status;
     long currentNanoTime = System.nanoTime();
-    final List<AbstractHeartbeatSample> heartbeatHistory;
-    synchronized (slidingWindow) {
-      lastSample = (NodeHeartbeatSample) getLastSample();
-      heartbeatHistory = Collections.unmodifiableList(slidingWindow);
+    NodeStatus status;
+    String statusReason;
+
+    if (nodeId == CURRENT_NODE_ID) {
+      // Self entry: heartbeat loop never sends to itself, so mirror the 
status that
+      // this ConfigNode's local disk-check / startup wrote into CommonConfig.
+      status = CommonDescriptor.getInstance().getConfig().getNodeStatus();
+      statusReason = 
CommonDescriptor.getInstance().getConfig().getStatusReason();
+    } else {
+      NodeHeartbeatSample lastSample;
+      final List<AbstractHeartbeatSample> heartbeatHistory;
+      synchronized (slidingWindow) {
+        lastSample = (NodeHeartbeatSample) getLastSample();
+        heartbeatHistory = Collections.unmodifiableList(slidingWindow);
 
-      if (lastSample == null) {
-        /* First heartbeat not received from this ConfigNode, status is 
UNKNOWN */
-        status = NodeStatus.Unknown;
-      } else if (!failureDetector.isAvailable(nodeId, heartbeatHistory)) {
-        /* Failure detector decides that this ConfigNode is UNKNOWN */
-        status = NodeStatus.Unknown;
-      } else {
-        status = lastSample.getStatus();
+        if (lastSample == null) {
+          /* First heartbeat not received from this ConfigNode, status is 
UNKNOWN */
+          status = NodeStatus.Unknown;
+          statusReason = null;
+        } else if (!failureDetector.isAvailable(nodeId, heartbeatHistory)) {
+          /* Failure detector decides that this ConfigNode is UNKNOWN */
+          status = NodeStatus.Unknown;
+          statusReason = null;
+        } else {
+          status = lastSample.getStatus();
+          statusReason = lastSample.getStatusReason();
+        }
       }
     }
 
@@ -79,6 +91,6 @@ public class ConfigNodeHeartbeatCache extends BaseNodeCache {
     // TODO: Construct load score module
     long loadScore = NodeStatus.isNormalStatus(status) ? 0 : Long.MAX_VALUE;
 
-    currentStatistics.set(new NodeStatistics(currentNanoTime, status, null, 
loadScore));
+    currentStatistics.set(new NodeStatistics(currentNanoTime, status, 
statusReason, loadScore));
   }
 }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/NodeHeartbeatSample.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/NodeHeartbeatSample.java
index 8217593f5d6..5ead20d291f 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/NodeHeartbeatSample.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/NodeHeartbeatSample.java
@@ -74,8 +74,13 @@ public class NodeHeartbeatSample extends 
AbstractHeartbeatSample {
   /** Constructor for ConfigNode sample. */
   public NodeHeartbeatSample(TConfigNodeHeartbeatResp heartbeatResp) {
     super(heartbeatResp.getTimestamp());
-    this.status = NodeStatus.Running;
-    this.statusReason = null;
+    // Old ConfigNodes don't populate status/statusReason — fall back to 
Running/null
+    // so a rolling upgrade leaves the leader's view of legacy peers unchanged.
+    this.status =
+        heartbeatResp.isSetStatus()
+            ? NodeStatus.parse(heartbeatResp.getStatus())
+            : NodeStatus.Running;
+    this.statusReason = heartbeatResp.isSetStatusReason() ? 
heartbeatResp.getStatusReason() : null;
     this.loadSample = null;
   }
 
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java
index a491b3960c3..454a2741207 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java
@@ -24,9 +24,11 @@ import 
org.apache.iotdb.common.rpc.thrift.TAINodeConfiguration;
 import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.cluster.DiskChecker;
 import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.commons.concurrent.ThreadName;
 import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
+import org.apache.iotdb.commons.conf.CommonDescriptor;
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.confignode.client.async.AsyncAINodeHeartbeatClientPool;
 import 
org.apache.iotdb.confignode.client.async.AsyncConfigNodeHeartbeatClientPool;
@@ -126,6 +128,12 @@ public class HeartbeatService {
         .ifPresent(
             consensusManager -> {
               if (getConsensusManager().isLeader()) {
+                // Leader self-checks its own disk health before fanning out 
heartbeats.
+                // Followers run the same check when receiving each heartbeat 
request
+                // (see ConfigNodeRPCServiceProcessor#getConfigNodeHeartBeat).
+                DiskChecker.checkAndApply(
+                    
ConfigNodeDescriptor.getInstance().getConf().getCriticalDirs(),
+                    
CommonDescriptor.getInstance().getConfig().getDiskSpaceWarningThreshold());
                 // Send heartbeat requests to all the registered ConfigNodes
                 pingRegisteredConfigNodes(
                     genConfigNodeHeartbeatReq(), 
getNodeManager().getRegisteredConfigNodes());
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
index dbb35839045..c027f8234bf 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
@@ -40,6 +40,7 @@ import org.apache.iotdb.common.rpc.thrift.TTestConnectionResp;
 import org.apache.iotdb.commons.auth.entity.PrivilegeModelType;
 import org.apache.iotdb.commons.auth.entity.PrivilegeType;
 import org.apache.iotdb.commons.auth.entity.PrivilegeUnion;
+import org.apache.iotdb.commons.cluster.DiskChecker;
 import org.apache.iotdb.commons.conf.CommonConfig;
 import org.apache.iotdb.commons.conf.CommonDescriptor;
 import org.apache.iotdb.commons.consensus.ConsensusGroupId;
@@ -1085,6 +1086,14 @@ public class ConfigNodeRPCServiceProcessor implements 
IConfigNodeRPCService.Ifac
   public TConfigNodeHeartbeatResp 
getConfigNodeHeartBeat(TConfigNodeHeartbeatReq heartbeatReq) {
     TConfigNodeHeartbeatResp resp = new TConfigNodeHeartbeatResp();
     resp.setTimestamp(heartbeatReq.getTimestamp());
+    // Follower self-check: probe critical dirs each time the leader pings us.
+    // The leader runs the same check in its HeartbeatService loop.
+    DiskChecker.checkAndApply(
+        configNodeConfig.getCriticalDirs(), 
commonConfig.getDiskSpaceWarningThreshold());
+    resp.setStatus(commonConfig.getNodeStatus().getStatus());
+    if (commonConfig.getStatusReason() != null) {
+      resp.setStatusReason(commonConfig.getStatusReason());
+    }
     return resp;
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
index e5c99640564..1ab957b5dfc 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -49,6 +49,7 @@ import org.apache.iotdb.commons.audit.AuditLogOperation;
 import org.apache.iotdb.commons.audit.UserEntity;
 import org.apache.iotdb.commons.auth.entity.PrivilegeType;
 import org.apache.iotdb.commons.client.request.AsyncRequestContext;
+import org.apache.iotdb.commons.cluster.DiskChecker;
 import org.apache.iotdb.commons.cluster.NodeStatus;
 import org.apache.iotdb.commons.concurrent.Await;
 import org.apache.iotdb.commons.concurrent.AwaitTimeoutException;
@@ -213,6 +214,7 @@ import 
org.apache.iotdb.db.storageengine.dataregion.modification.IDPredicate;
 import 
org.apache.iotdb.db.storageengine.dataregion.modification.TableDeletionEntry;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileManager;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+import org.apache.iotdb.db.storageengine.rescon.disk.FolderManager;
 import 
org.apache.iotdb.db.storageengine.rescon.quotas.DataNodeSpaceQuotaManager;
 import 
org.apache.iotdb.db.storageengine.rescon.quotas.DataNodeThrottleQuotaManager;
 import org.apache.iotdb.db.subscription.agent.SubscriptionAgent;
@@ -2424,29 +2426,36 @@ public class DataNodeInternalRPCServiceImpl implements 
IDataNodeRPCService.Iface
                 SYSTEM)
             .getValue();
 
+    // Derive the disk status: an ABNORMAL data folder observed by any 
FolderManager wins over
+    // a full-disk reading, and a low free-space ratio still drives DISK_FULL 
when nothing is
+    // crashed. DiskChecker.apply then performs the actual NodeStatus 
transition (including the
+    // "ReadOnly(DiskFull|DiskCrash) -> Running" recovery for the DiskFull 
path).
+    DiskChecker.DiskStatus diskStatus = DiskChecker.DiskStatus.NORMAL;
+    if (FolderManager.hasAnyAbnormalFolder()) {
+      diskStatus = DiskChecker.DiskStatus.DISK_CRASH;
+    }
     if (availableDisk != 0 && totalDisk != 0) {
       double freeDiskRatio = availableDisk / totalDisk;
       loadSample.setFreeDiskSpace(availableDisk);
       loadSample.setDiskUsageRate(1d - freeDiskRatio);
-      // Reset NodeStatus if necessary
-      if (freeDiskRatio < commonConfig.getDiskSpaceWarningThreshold()) {
+      if (diskStatus == DiskChecker.DiskStatus.NORMAL
+          && freeDiskRatio < commonConfig.getDiskSpaceWarningThreshold()) {
         LOGGER.warn(
             "The available disk space is : {}, "
                 + "the total disk space is : {}, "
                 + "and the remaining disk usage ratio: {} is "
-                + "less than disk_space_warning_threshold: {}, set system to 
readonly!",
+                + "less than disk_space_warning_threshold: {}.",
             RamUsageEstimator.humanReadableUnits((long) availableDisk),
             RamUsageEstimator.humanReadableUnits((long) totalDisk),
             freeDiskRatio,
             commonConfig.getDiskSpaceWarningThreshold());
-        commonConfig.setNodeStatus(NodeStatus.ReadOnly);
-        commonConfig.setStatusReason(NodeStatus.DISK_FULL);
-      } else if (NodeStatus.ReadOnly.equals(commonConfig.getNodeStatus())
-          && NodeStatus.DISK_FULL.equals(commonConfig.getStatusReason())) {
-        commonConfig.setNodeStatus(NodeStatus.Running);
-        commonConfig.setStatusReason(null);
+        diskStatus = DiskChecker.DiskStatus.DISK_FULL;
       }
+    } else if (diskStatus == DiskChecker.DiskStatus.NORMAL) {
+      // Metrics not available yet — fall back to no-op so we don't churn the 
status.
+      return;
     }
+    DiskChecker.apply(diskStatus);
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/disk/FolderManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/disk/FolderManager.java
index 77a332152ab..80f739abbc3 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/disk/FolderManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/disk/FolderManager.java
@@ -33,13 +33,25 @@ import 
org.apache.iotdb.db.storageengine.rescon.disk.strategy.SequenceStrategy;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.lang.ref.WeakReference;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CopyOnWriteArrayList;
 
 public class FolderManager {
   private static final Logger logger = 
LoggerFactory.getLogger(FolderManager.class);
 
+  /**
+   * Registry of every live {@link FolderManager} instance so the DataNode 
heartbeat path can ask
+   * "is any folder anywhere on this node currently ABNORMAL?" without each 
subsystem having to push
+   * state into a central reporter. Weak references avoid keeping short-lived 
managers alive (e.g.
+   * those created per snapshot/load).
+   */
+  private static final List<WeakReference<FolderManager>> ALL_INSTANCES =
+      new CopyOnWriteArrayList<>();
+
   /** Represents the operational states of a data folder. */
   public enum FolderState {
     /** Indicates the folder is functioning normally with no issues. */
@@ -62,6 +74,7 @@ public class FolderManager {
       throws DiskSpaceInsufficientException {
     this.folders = folders;
     folders.forEach(dir -> foldersStates.put(dir, FolderState.HEALTHY));
+    ALL_INSTANCES.add(new WeakReference<>(this));
     switch (type) {
       case SEQUENCE_STRATEGY:
         this.selectStrategy = new SequenceStrategy();
@@ -147,4 +160,34 @@ public class FolderManager {
   public List<String> getFolders() {
     return folders;
   }
+
+  /**
+   * Walks every live FolderManager instance and reports whether any folder is 
currently {@link
+   * FolderState#ABNORMAL}. Used by the DataNode heartbeat path to derive a 
{@code
+   * NodeStatus.ReadOnly(DiskCrash)} signal from already-observed write 
failures.
+   *
+   * <p>Stale (GC'd) weak references are pruned as a side effect.
+   */
+  public static boolean hasAnyAbnormalFolder() {
+    boolean anyAbnormal = false;
+    Iterator<WeakReference<FolderManager>> it = ALL_INSTANCES.iterator();
+    while (it.hasNext()) {
+      FolderManager fm = it.next().get();
+      if (fm == null) {
+        continue;
+      }
+      for (FolderState state : fm.foldersStates.values()) {
+        if (state == FolderState.ABNORMAL) {
+          anyAbnormal = true;
+          break;
+        }
+      }
+      if (anyAbnormal) {
+        break;
+      }
+    }
+    // Prune dead entries. CopyOnWriteArrayList tolerates concurrent removeIf 
safely.
+    ALL_INSTANCES.removeIf(ref -> ref.get() == null);
+    return anyAbnormal;
+  }
 }
diff --git 
a/iotdb-core/node-commons/src/main/i18n/en/org/apache/iotdb/commons/i18n/CommonMessages.java
 
b/iotdb-core/node-commons/src/main/i18n/en/org/apache/iotdb/commons/i18n/CommonMessages.java
index 5a2e4196197..462f01f5f34 100644
--- 
a/iotdb-core/node-commons/src/main/i18n/en/org/apache/iotdb/commons/i18n/CommonMessages.java
+++ 
b/iotdb-core/node-commons/src/main/i18n/en/org/apache/iotdb/commons/i18n/CommonMessages.java
@@ -37,6 +37,16 @@ public final class CommonMessages {
   public static final String NODE_STATUS_NOT_EXIST = "NodeStatus %s doesn't 
exist.";
   public static final String UNKNOWN_NODE_STATUS = "Unknown NodeStatus %s.";
 
+  // --- disk health ---
+  public static final String DISK_FULL_SET_READ_ONLY =
+      "Free disk space ratio is below the configured threshold; set node 
status to ReadOnly(DiskFull).";
+  public static final String DISK_CRASH_SET_READ_ONLY =
+      "Detected unwritable disk directory; set node status to 
ReadOnly(DiskCrash).";
+  public static final String DISK_CRASH_PROBE_FAILED =
+      "Disk health probe write failed for directory {}.";
+  public static final String DISK_RECOVERED_SET_RUNNING =
+      "Disk health recovered (previous reason: {}); set node status to 
Running.";
+
   // --- consensus ---
   public static final String UNRECOGNIZED_CONSENSUS_GROUP_ID =
       "Unrecognized ConsensusGroupId: %s";
diff --git 
a/iotdb-core/node-commons/src/main/i18n/zh/org/apache/iotdb/commons/i18n/CommonMessages.java
 
b/iotdb-core/node-commons/src/main/i18n/zh/org/apache/iotdb/commons/i18n/CommonMessages.java
index a4b72b9e427..dfe57a5f9c6 100644
--- 
a/iotdb-core/node-commons/src/main/i18n/zh/org/apache/iotdb/commons/i18n/CommonMessages.java
+++ 
b/iotdb-core/node-commons/src/main/i18n/zh/org/apache/iotdb/commons/i18n/CommonMessages.java
@@ -36,6 +36,16 @@ public final class CommonMessages {
   public static final String NODE_STATUS_NOT_EXIST = "NodeStatus %s 不存在。";
   public static final String UNKNOWN_NODE_STATUS = "未知 NodeStatus %s。";
 
+  // --- disk health ---
+  public static final String DISK_FULL_SET_READ_ONLY =
+      "磁盘剩余空间比例低于配置阈值,将节点状态设为 ReadOnly(DiskFull)。";
+  public static final String DISK_CRASH_SET_READ_ONLY =
+      "检测到不可写的磁盘目录,将节点状态设为 ReadOnly(DiskCrash)。";
+  public static final String DISK_CRASH_PROBE_FAILED =
+      "对目录 {} 进行磁盘健康探测时写入失败。";
+  public static final String DISK_RECOVERED_SET_RUNNING =
+      "磁盘健康已恢复(先前原因:{}),将节点状态设为 Running。";
+
   // --- consensus ---
   public static final String UNRECOGNIZED_CONSENSUS_GROUP_ID =
       "无法识别的 ConsensusGroupId:%s";
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/cluster/DiskChecker.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/cluster/DiskChecker.java
new file mode 100644
index 00000000000..4632ddf28dd
--- /dev/null
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/cluster/DiskChecker.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.cluster;
+
+import org.apache.iotdb.commons.conf.CommonConfig;
+import org.apache.iotdb.commons.conf.CommonDescriptor;
+import org.apache.iotdb.commons.i18n.CommonMessages;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.List;
+
+/**
+ * Shared utility used by both ConfigNode and DataNode to evaluate the health 
of a set of critical
+ * directories and, optionally, drive the global {@link NodeStatus} on {@link 
CommonConfig}.
+ *
+ * <p>Only transitions between {@link NodeStatus#Running} and {@link 
NodeStatus#ReadOnly} with
+ * reason {@link NodeStatus#DISK_FULL}/{@link NodeStatus#DISK_CRASH} are 
managed here. Other
+ * ReadOnly reasons (e.g. manual) are left untouched.
+ */
+public class DiskChecker {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(DiskChecker.class);
+
+  private static final byte[] PROBE_PAYLOAD = new byte[] {0x01};
+  private static final String PROBE_PREFIX = "iotdb-disk-probe-";
+  private static final String PROBE_SUFFIX = ".tmp";
+
+  public enum DiskStatus {
+    NORMAL,
+    DISK_FULL,
+    DISK_CRASH
+  }
+
+  private DiskChecker() {}
+
+  /**
+   * Evaluate the supplied directories. A single unwritable directory yields 
{@link
+   * DiskStatus#DISK_CRASH}; any directory whose usable/total ratio is below 
the threshold yields
+   * {@link DiskStatus#DISK_FULL} (when no crash is detected); otherwise 
{@link DiskStatus#NORMAL}.
+   */
+  public static DiskStatus check(List<String> dirs, double freeRatioThreshold) 
{
+    if (dirs == null || dirs.isEmpty()) {
+      return DiskStatus.NORMAL;
+    }
+    boolean anyFull = false;
+    for (String dir : dirs) {
+      if (dir == null || dir.isEmpty()) {
+        continue;
+      }
+      File f = new File(dir);
+      if (!f.isDirectory()) {
+        LOGGER.warn(CommonMessages.DISK_CRASH_PROBE_FAILED, dir);
+        return DiskStatus.DISK_CRASH;
+      }
+      try {
+        Path probe = Files.createTempFile(Paths.get(dir), PROBE_PREFIX, 
PROBE_SUFFIX);
+        try {
+          Files.write(probe, PROBE_PAYLOAD);
+        } finally {
+          Files.deleteIfExists(probe);
+        }
+      } catch (IOException e) {
+        LOGGER.warn(CommonMessages.DISK_CRASH_PROBE_FAILED, dir, e);
+        return DiskStatus.DISK_CRASH;
+      }
+      long total = f.getTotalSpace();
+      long usable = f.getUsableSpace();
+      if (total > 0 && (double) usable / total < freeRatioThreshold) {
+        anyFull = true;
+      }
+    }
+    return anyFull ? DiskStatus.DISK_FULL : DiskStatus.NORMAL;
+  }
+
+  /**
+   * Run {@link #check} and apply the result to {@link CommonConfig}. See 
class javadoc for
+   * transition rules.
+   */
+  public static void checkAndApply(List<String> dirs, double 
freeRatioThreshold) {
+    apply(check(dirs, freeRatioThreshold));
+  }
+
+  /** Visible for tests; package-public callers should prefer {@link 
#checkAndApply}. */
+  public static void apply(DiskStatus result) {
+    CommonConfig config = CommonDescriptor.getInstance().getConfig();
+    NodeStatus currentStatus = config.getNodeStatus();
+    String currentReason = config.getStatusReason();
+    boolean currentlyFull =
+        NodeStatus.ReadOnly.equals(currentStatus) && 
NodeStatus.DISK_FULL.equals(currentReason);
+    boolean currentlyCrash =
+        NodeStatus.ReadOnly.equals(currentStatus) && 
NodeStatus.DISK_CRASH.equals(currentReason);
+
+    switch (result) {
+      case DISK_CRASH:
+        if (!currentlyCrash) {
+          LOGGER.warn(CommonMessages.DISK_CRASH_SET_READ_ONLY);
+          config.setNodeStatus(NodeStatus.ReadOnly);
+          config.setStatusReason(NodeStatus.DISK_CRASH);
+        }
+        break;
+      case DISK_FULL:
+        // DiskCrash has higher priority — do not downgrade an existing crash 
to full.
+        if (currentlyCrash) {
+          return;
+        }
+        if (!currentlyFull) {
+          LOGGER.warn(CommonMessages.DISK_FULL_SET_READ_ONLY);
+          config.setNodeStatus(NodeStatus.ReadOnly);
+          config.setStatusReason(NodeStatus.DISK_FULL);
+        }
+        break;
+      case NORMAL:
+      default:
+        if (currentlyFull || currentlyCrash) {
+          LOGGER.info(CommonMessages.DISK_RECOVERED_SET_RUNNING, 
currentReason);
+          config.setNodeStatus(NodeStatus.Running);
+          config.setStatusReason(null);
+        }
+        break;
+    }
+  }
+}
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/cluster/NodeStatus.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/cluster/NodeStatus.java
index 518a9faaed2..e37b44edafd 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/cluster/NodeStatus.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/cluster/NodeStatus.java
@@ -35,6 +35,7 @@ public enum NodeStatus {
   /** Only query statements are permitted */
   ReadOnly("ReadOnly");
   public static final String DISK_FULL = "DiskFull";
+  public static final String DISK_CRASH = "DiskCrash";
 
   private final String status;
 
diff --git 
a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/cluster/DiskCheckerTest.java
 
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/cluster/DiskCheckerTest.java
new file mode 100644
index 00000000000..fde492c1090
--- /dev/null
+++ 
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/cluster/DiskCheckerTest.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.cluster;
+
+import org.apache.iotdb.commons.conf.CommonConfig;
+import org.apache.iotdb.commons.conf.CommonDescriptor;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.util.Collections;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+public class DiskCheckerTest {
+
+  @Rule public TemporaryFolder tmp = new TemporaryFolder();
+
+  private NodeStatus savedStatus;
+  private String savedReason;
+
+  @Before
+  public void setUp() {
+    CommonConfig config = CommonDescriptor.getInstance().getConfig();
+    savedStatus = config.getNodeStatus();
+    savedReason = config.getStatusReason();
+    config.setNodeStatus(NodeStatus.Running);
+    config.setStatusReason(null);
+  }
+
+  @After
+  public void tearDown() {
+    CommonConfig config = CommonDescriptor.getInstance().getConfig();
+    config.setNodeStatus(savedStatus);
+    config.setStatusReason(savedReason);
+  }
+
+  @Test
+  public void checkReturnsNormalForWritableDirectoryWithSpace() throws 
Exception {
+    File dir = tmp.newFolder();
+    // threshold 0.0 → any positive usable space passes
+    assertEquals(
+        DiskChecker.DiskStatus.NORMAL,
+        DiskChecker.check(Collections.singletonList(dir.getAbsolutePath()), 
0.0));
+  }
+
+  @Test
+  public void checkReturnsDiskFullWhenBelowThreshold() throws Exception {
+    File dir = tmp.newFolder();
+    // threshold > 1 forces every directory to be reported as full
+    assertEquals(
+        DiskChecker.DiskStatus.DISK_FULL,
+        DiskChecker.check(Collections.singletonList(dir.getAbsolutePath()), 
2.0));
+  }
+
+  @Test
+  public void checkReturnsDiskCrashWhenDirectoryMissing() {
+    String missing = new File(tmp.getRoot(), 
"does-not-exist").getAbsolutePath();
+    assertEquals(
+        DiskChecker.DiskStatus.DISK_CRASH,
+        DiskChecker.check(Collections.singletonList(missing), 0.0));
+  }
+
+  @Test
+  public void checkReturnsDiskCrashWhenPathIsAFile() throws Exception {
+    File file = tmp.newFile();
+    assertEquals(
+        DiskChecker.DiskStatus.DISK_CRASH,
+        DiskChecker.check(Collections.singletonList(file.getAbsolutePath()), 
0.0));
+  }
+
+  @Test
+  public void checkPrioritizesCrashOverFull() throws Exception {
+    File healthy = tmp.newFolder();
+    String missing = new File(tmp.getRoot(), "missing").getAbsolutePath();
+    // Even with a "full" threshold the missing dir trumps it.
+    assertEquals(
+        DiskChecker.DiskStatus.DISK_CRASH,
+        DiskChecker.check(java.util.Arrays.asList(healthy.getAbsolutePath(), 
missing), 2.0));
+  }
+
+  @Test
+  public void checkSkipsNullAndEmptyEntries() throws Exception {
+    File dir = tmp.newFolder();
+    assertEquals(
+        DiskChecker.DiskStatus.NORMAL,
+        DiskChecker.check(java.util.Arrays.asList(null, "", 
dir.getAbsolutePath()), 0.0));
+  }
+
+  @Test
+  public void applyDiskFullSetsReadOnlyFromRunning() {
+    DiskChecker.apply(DiskChecker.DiskStatus.DISK_FULL);
+    CommonConfig config = CommonDescriptor.getInstance().getConfig();
+    assertEquals(NodeStatus.ReadOnly, config.getNodeStatus());
+    assertEquals(NodeStatus.DISK_FULL, config.getStatusReason());
+  }
+
+  @Test
+  public void applyDiskCrashSetsReadOnlyFromRunning() {
+    DiskChecker.apply(DiskChecker.DiskStatus.DISK_CRASH);
+    CommonConfig config = CommonDescriptor.getInstance().getConfig();
+    assertEquals(NodeStatus.ReadOnly, config.getNodeStatus());
+    assertEquals(NodeStatus.DISK_CRASH, config.getStatusReason());
+  }
+
+  @Test
+  public void applyDiskCrashUpgradesFromDiskFull() {
+    DiskChecker.apply(DiskChecker.DiskStatus.DISK_FULL);
+    DiskChecker.apply(DiskChecker.DiskStatus.DISK_CRASH);
+    CommonConfig config = CommonDescriptor.getInstance().getConfig();
+    assertEquals(NodeStatus.ReadOnly, config.getNodeStatus());
+    assertEquals(NodeStatus.DISK_CRASH, config.getStatusReason());
+  }
+
+  @Test
+  public void applyDiskFullDoesNotDowngradeDiskCrash() {
+    DiskChecker.apply(DiskChecker.DiskStatus.DISK_CRASH);
+    DiskChecker.apply(DiskChecker.DiskStatus.DISK_FULL);
+    CommonConfig config = CommonDescriptor.getInstance().getConfig();
+    assertEquals(NodeStatus.ReadOnly, config.getNodeStatus());
+    assertEquals(
+        "DiskCrash must outrank DiskFull", NodeStatus.DISK_CRASH, 
config.getStatusReason());
+  }
+
+  @Test
+  public void applyNormalRecoversFromDiskFull() {
+    DiskChecker.apply(DiskChecker.DiskStatus.DISK_FULL);
+    DiskChecker.apply(DiskChecker.DiskStatus.NORMAL);
+    CommonConfig config = CommonDescriptor.getInstance().getConfig();
+    assertEquals(NodeStatus.Running, config.getNodeStatus());
+    assertNull(config.getStatusReason());
+  }
+
+  @Test
+  public void applyNormalRecoversFromDiskCrash() {
+    DiskChecker.apply(DiskChecker.DiskStatus.DISK_CRASH);
+    DiskChecker.apply(DiskChecker.DiskStatus.NORMAL);
+    CommonConfig config = CommonDescriptor.getInstance().getConfig();
+    assertEquals(NodeStatus.Running, config.getNodeStatus());
+    assertNull(config.getStatusReason());
+  }
+
+  @Test
+  public void applyLeavesNonDiskReadOnlyReasonUntouched() {
+    CommonConfig config = CommonDescriptor.getInstance().getConfig();
+    config.setNodeStatus(NodeStatus.ReadOnly);
+    config.setStatusReason("ManualMaintenance");
+
+    DiskChecker.apply(DiskChecker.DiskStatus.NORMAL);
+    assertEquals(NodeStatus.ReadOnly, config.getNodeStatus());
+    assertEquals("ManualMaintenance", config.getStatusReason());
+
+    DiskChecker.apply(DiskChecker.DiskStatus.DISK_FULL);
+    // DISK_FULL only fires when not already DiskFull/DiskCrash — it does take 
over here,
+    // mirroring the existing behavior for the legacy sampleDiskLoad path.
+    assertEquals(NodeStatus.ReadOnly, config.getNodeStatus());
+    assertEquals(NodeStatus.DISK_FULL, config.getStatusReason());
+  }
+
+  @Test
+  public void applyIsIdempotentForRepeatedDiskCrash() {
+    DiskChecker.apply(DiskChecker.DiskStatus.DISK_CRASH);
+    NodeStatus before = 
CommonDescriptor.getInstance().getConfig().getNodeStatus();
+    String reasonBefore = 
CommonDescriptor.getInstance().getConfig().getStatusReason();
+    DiskChecker.apply(DiskChecker.DiskStatus.DISK_CRASH);
+    assertEquals(before, 
CommonDescriptor.getInstance().getConfig().getNodeStatus());
+    assertEquals(reasonBefore, 
CommonDescriptor.getInstance().getConfig().getStatusReason());
+  }
+
+  @Test
+  public void checkAndApplyDrivesStatusEndToEnd() throws Exception {
+    File healthy = tmp.newFolder();
+    
DiskChecker.checkAndApply(Collections.singletonList(healthy.getAbsolutePath()), 
0.0);
+    assertEquals(NodeStatus.Running, 
CommonDescriptor.getInstance().getConfig().getNodeStatus());
+
+    String missing = new File(tmp.getRoot(), 
"still-missing").getAbsolutePath();
+    DiskChecker.checkAndApply(Collections.singletonList(missing), 0.0);
+    assertEquals(NodeStatus.ReadOnly, 
CommonDescriptor.getInstance().getConfig().getNodeStatus());
+    assertEquals(
+        NodeStatus.DISK_CRASH, 
CommonDescriptor.getInstance().getConfig().getStatusReason());
+
+    
DiskChecker.checkAndApply(Collections.singletonList(healthy.getAbsolutePath()), 
0.0);
+    assertEquals(NodeStatus.Running, 
CommonDescriptor.getInstance().getConfig().getNodeStatus());
+    assertNull(CommonDescriptor.getInstance().getConfig().getStatusReason());
+  }
+
+  @Test
+  public void emptyDirListIsNormal() {
+    assertEquals(DiskChecker.DiskStatus.NORMAL, 
DiskChecker.check(Collections.emptyList(), 1.0));
+    assertEquals(DiskChecker.DiskStatus.NORMAL, DiskChecker.check(null, 1.0));
+  }
+
+  @Test
+  public void smokeProbeFileIsDeleted() throws Exception {
+    File dir = tmp.newFolder();
+    DiskChecker.check(Collections.singletonList(dir.getAbsolutePath()), 0.0);
+    File[] leftovers = dir.listFiles();
+    assertTrue(
+        "Disk probe should clean up its temp file", leftovers == null || 
leftovers.length == 0);
+  }
+}
diff --git a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift 
b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
index 22529ffbb73..b6f16c0ca99 100644
--- a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
+++ b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
@@ -524,6 +524,13 @@ struct TConfigNodeHeartbeatResp {
     1: required i64 timestamp
     2: optional string activateStatus
     3: optional common.TLicense license
+    // Reported ConfigNode status (e.g. Running, ReadOnly). Optional for 
forward
+    // compatibility — old ConfigNodes do not populate this field, in which 
case
+    // the leader falls back to assuming Running.
+    4: optional string status
+    // Optional human/machine readable reason accompanying ReadOnly status,
+    // e.g. NodeStatus.DISK_FULL or NodeStatus.DISK_CRASH.
+    5: optional string statusReason
 }
 
 struct TAddConsensusGroupReq {


Reply via email to