This is an automated email from the ASF dual-hosted git repository. CRZbulabula pushed a commit to branch confignode-readonly-disk-check in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit a6a8291e5883b09a45f8d2d6c89c0bdbcb8f2af0 Author: Yongzao <[email protected]> AuthorDate: Wed May 20 11:27:23 2026 +0800 Add ConfigNode ReadOnly state with DiskFull/DiskCrash heartbeat self-check ConfigNode now reports its own NodeStatus.ReadOnly when its critical directories (systemDir, consensusDir) are unwritable or near-full, mirroring the existing DataNode behavior. NodeStatus reasons are extended with a new DISK_CRASH constant alongside DISK_FULL, and the ConfigNode heartbeat carries status/statusReason back to the leader. - node-commons: new DiskChecker utility (probe + state-machine apply), with priority DiskCrash > DiskFull and recovery to Running when the reason was disk-related. i18n messages added in en + zh. - thrift-confignode: TConfigNodeHeartbeatResp gains optional status and statusReason fields (forward-compatible). - confignode: leader self-checks before fanning out heartbeats; follower self-checks on receive and reports back; cache reads from CommonConfig for the leader's self entry, otherwise from the sample. - datanode: FolderManager exposes a static hasAnyAbnormalFolder() aggregator; sampleDiskLoad treats any ABNORMAL folder as DiskCrash (which wins over DiskFull) and reuses DiskChecker.apply. --- .../iotdb/confignode/conf/ConfigNodeConfig.java | 10 + .../load/cache/node/ConfigNodeHeartbeatCache.java | 48 +++-- .../load/cache/node/NodeHeartbeatSample.java | 9 +- .../manager/load/service/HeartbeatService.java | 8 + .../thrift/ConfigNodeRPCServiceProcessor.java | 9 + .../impl/DataNodeInternalRPCServiceImpl.java | 27 ++- .../storageengine/rescon/disk/FolderManager.java | 43 ++++ .../apache/iotdb/commons/i18n/CommonMessages.java | 10 + .../apache/iotdb/commons/i18n/CommonMessages.java | 10 + .../apache/iotdb/commons/cluster/DiskChecker.java | 146 ++++++++++++++ .../apache/iotdb/commons/cluster/NodeStatus.java | 1 + .../iotdb/commons/cluster/DiskCheckerTest.java | 224 +++++++++++++++++++++ .../src/main/thrift/confignode.thrift | 7 + 13 files changed, 523 insertions(+), 29 deletions(-) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java index aacbe20203f..203032256d6 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java @@ -41,6 +41,7 @@ import org.slf4j.LoggerFactory; import java.io.File; import java.lang.reflect.Field; import java.util.Arrays; +import java.util.List; public class ConfigNodeConfig { @@ -506,6 +507,15 @@ public class ConfigNodeConfig { this.consensusDir = consensusDir; } + /** + * Directories whose loss would render this ConfigNode unable to serve. Used by the periodic + * disk-health check on both leader (in HeartbeatService loop) and followers (in the + * heartbeat-receive path). + */ + public List<String> getCriticalDirs() { + return Arrays.asList(systemDir, consensusDir); + } + public String getConfigNodeConsensusProtocolClass() { return configNodeConsensusProtocolClass; } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/ConfigNodeHeartbeatCache.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/ConfigNodeHeartbeatCache.java index 4d675e1e8a4..7228fca648c 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/ConfigNodeHeartbeatCache.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/ConfigNodeHeartbeatCache.java @@ -20,6 +20,7 @@ package org.apache.iotdb.confignode.manager.load.cache.node; import org.apache.iotdb.commons.cluster.NodeStatus; +import org.apache.iotdb.commons.conf.CommonDescriptor; import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor; import org.apache.iotdb.confignode.manager.load.cache.AbstractHeartbeatSample; @@ -49,28 +50,39 @@ public class ConfigNodeHeartbeatCache extends BaseNodeCache { @Override public synchronized void updateCurrentStatistics(boolean forceUpdate) { - // Skip itself and the Removing status can not be updated - if (nodeId == CURRENT_NODE_ID || NodeStatus.Removing.equals(getNodeStatus())) { + // Removing status can not be updated + if (NodeStatus.Removing.equals(getNodeStatus())) { return; } - NodeHeartbeatSample lastSample; - // Update Node status - NodeStatus status; long currentNanoTime = System.nanoTime(); - final List<AbstractHeartbeatSample> heartbeatHistory; - synchronized (slidingWindow) { - lastSample = (NodeHeartbeatSample) getLastSample(); - heartbeatHistory = Collections.unmodifiableList(slidingWindow); + NodeStatus status; + String statusReason; + + if (nodeId == CURRENT_NODE_ID) { + // Self entry: heartbeat loop never sends to itself, so mirror the status that + // this ConfigNode's local disk-check / startup wrote into CommonConfig. + status = CommonDescriptor.getInstance().getConfig().getNodeStatus(); + statusReason = CommonDescriptor.getInstance().getConfig().getStatusReason(); + } else { + NodeHeartbeatSample lastSample; + final List<AbstractHeartbeatSample> heartbeatHistory; + synchronized (slidingWindow) { + lastSample = (NodeHeartbeatSample) getLastSample(); + heartbeatHistory = Collections.unmodifiableList(slidingWindow); - if (lastSample == null) { - /* First heartbeat not received from this ConfigNode, status is UNKNOWN */ - status = NodeStatus.Unknown; - } else if (!failureDetector.isAvailable(nodeId, heartbeatHistory)) { - /* Failure detector decides that this ConfigNode is UNKNOWN */ - status = NodeStatus.Unknown; - } else { - status = lastSample.getStatus(); + if (lastSample == null) { + /* First heartbeat not received from this ConfigNode, status is UNKNOWN */ + status = NodeStatus.Unknown; + statusReason = null; + } else if (!failureDetector.isAvailable(nodeId, heartbeatHistory)) { + /* Failure detector decides that this ConfigNode is UNKNOWN */ + status = NodeStatus.Unknown; + statusReason = null; + } else { + status = lastSample.getStatus(); + statusReason = lastSample.getStatusReason(); + } } } @@ -79,6 +91,6 @@ public class ConfigNodeHeartbeatCache extends BaseNodeCache { // TODO: Construct load score module long loadScore = NodeStatus.isNormalStatus(status) ? 0 : Long.MAX_VALUE; - currentStatistics.set(new NodeStatistics(currentNanoTime, status, null, loadScore)); + currentStatistics.set(new NodeStatistics(currentNanoTime, status, statusReason, loadScore)); } } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/NodeHeartbeatSample.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/NodeHeartbeatSample.java index 8217593f5d6..5ead20d291f 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/NodeHeartbeatSample.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/NodeHeartbeatSample.java @@ -74,8 +74,13 @@ public class NodeHeartbeatSample extends AbstractHeartbeatSample { /** Constructor for ConfigNode sample. */ public NodeHeartbeatSample(TConfigNodeHeartbeatResp heartbeatResp) { super(heartbeatResp.getTimestamp()); - this.status = NodeStatus.Running; - this.statusReason = null; + // Old ConfigNodes don't populate status/statusReason — fall back to Running/null + // so a rolling upgrade leaves the leader's view of legacy peers unchanged. + this.status = + heartbeatResp.isSetStatus() + ? NodeStatus.parse(heartbeatResp.getStatus()) + : NodeStatus.Running; + this.statusReason = heartbeatResp.isSetStatusReason() ? heartbeatResp.getStatusReason() : null; this.loadSample = null; } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java index a491b3960c3..454a2741207 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java @@ -24,9 +24,11 @@ import org.apache.iotdb.common.rpc.thrift.TAINodeConfiguration; import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation; import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration; import org.apache.iotdb.common.rpc.thrift.TEndPoint; +import org.apache.iotdb.commons.cluster.DiskChecker; import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory; import org.apache.iotdb.commons.concurrent.ThreadName; import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil; +import org.apache.iotdb.commons.conf.CommonDescriptor; import org.apache.iotdb.commons.pipe.config.PipeConfig; import org.apache.iotdb.confignode.client.async.AsyncAINodeHeartbeatClientPool; import org.apache.iotdb.confignode.client.async.AsyncConfigNodeHeartbeatClientPool; @@ -126,6 +128,12 @@ public class HeartbeatService { .ifPresent( consensusManager -> { if (getConsensusManager().isLeader()) { + // Leader self-checks its own disk health before fanning out heartbeats. + // Followers run the same check when receiving each heartbeat request + // (see ConfigNodeRPCServiceProcessor#getConfigNodeHeartBeat). + DiskChecker.checkAndApply( + ConfigNodeDescriptor.getInstance().getConf().getCriticalDirs(), + CommonDescriptor.getInstance().getConfig().getDiskSpaceWarningThreshold()); // Send heartbeat requests to all the registered ConfigNodes pingRegisteredConfigNodes( genConfigNodeHeartbeatReq(), getNodeManager().getRegisteredConfigNodes()); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java index dbb35839045..c027f8234bf 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java @@ -40,6 +40,7 @@ import org.apache.iotdb.common.rpc.thrift.TTestConnectionResp; import org.apache.iotdb.commons.auth.entity.PrivilegeModelType; import org.apache.iotdb.commons.auth.entity.PrivilegeType; import org.apache.iotdb.commons.auth.entity.PrivilegeUnion; +import org.apache.iotdb.commons.cluster.DiskChecker; import org.apache.iotdb.commons.conf.CommonConfig; import org.apache.iotdb.commons.conf.CommonDescriptor; import org.apache.iotdb.commons.consensus.ConsensusGroupId; @@ -1085,6 +1086,14 @@ public class ConfigNodeRPCServiceProcessor implements IConfigNodeRPCService.Ifac public TConfigNodeHeartbeatResp getConfigNodeHeartBeat(TConfigNodeHeartbeatReq heartbeatReq) { TConfigNodeHeartbeatResp resp = new TConfigNodeHeartbeatResp(); resp.setTimestamp(heartbeatReq.getTimestamp()); + // Follower self-check: probe critical dirs each time the leader pings us. + // The leader runs the same check in its HeartbeatService loop. + DiskChecker.checkAndApply( + configNodeConfig.getCriticalDirs(), commonConfig.getDiskSpaceWarningThreshold()); + resp.setStatus(commonConfig.getNodeStatus().getStatus()); + if (commonConfig.getStatusReason() != null) { + resp.setStatusReason(commonConfig.getStatusReason()); + } return resp; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java index e5c99640564..1ab957b5dfc 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java @@ -49,6 +49,7 @@ import org.apache.iotdb.commons.audit.AuditLogOperation; import org.apache.iotdb.commons.audit.UserEntity; import org.apache.iotdb.commons.auth.entity.PrivilegeType; import org.apache.iotdb.commons.client.request.AsyncRequestContext; +import org.apache.iotdb.commons.cluster.DiskChecker; import org.apache.iotdb.commons.cluster.NodeStatus; import org.apache.iotdb.commons.concurrent.Await; import org.apache.iotdb.commons.concurrent.AwaitTimeoutException; @@ -213,6 +214,7 @@ import org.apache.iotdb.db.storageengine.dataregion.modification.IDPredicate; import org.apache.iotdb.db.storageengine.dataregion.modification.TableDeletionEntry; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileManager; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; +import org.apache.iotdb.db.storageengine.rescon.disk.FolderManager; import org.apache.iotdb.db.storageengine.rescon.quotas.DataNodeSpaceQuotaManager; import org.apache.iotdb.db.storageengine.rescon.quotas.DataNodeThrottleQuotaManager; import org.apache.iotdb.db.subscription.agent.SubscriptionAgent; @@ -2424,29 +2426,36 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface SYSTEM) .getValue(); + // Derive the disk status: an ABNORMAL data folder observed by any FolderManager wins over + // a full-disk reading, and a low free-space ratio still drives DISK_FULL when nothing is + // crashed. DiskChecker.apply then performs the actual NodeStatus transition (including the + // "ReadOnly(DiskFull|DiskCrash) -> Running" recovery for the DiskFull path). + DiskChecker.DiskStatus diskStatus = DiskChecker.DiskStatus.NORMAL; + if (FolderManager.hasAnyAbnormalFolder()) { + diskStatus = DiskChecker.DiskStatus.DISK_CRASH; + } if (availableDisk != 0 && totalDisk != 0) { double freeDiskRatio = availableDisk / totalDisk; loadSample.setFreeDiskSpace(availableDisk); loadSample.setDiskUsageRate(1d - freeDiskRatio); - // Reset NodeStatus if necessary - if (freeDiskRatio < commonConfig.getDiskSpaceWarningThreshold()) { + if (diskStatus == DiskChecker.DiskStatus.NORMAL + && freeDiskRatio < commonConfig.getDiskSpaceWarningThreshold()) { LOGGER.warn( "The available disk space is : {}, " + "the total disk space is : {}, " + "and the remaining disk usage ratio: {} is " - + "less than disk_space_warning_threshold: {}, set system to readonly!", + + "less than disk_space_warning_threshold: {}.", RamUsageEstimator.humanReadableUnits((long) availableDisk), RamUsageEstimator.humanReadableUnits((long) totalDisk), freeDiskRatio, commonConfig.getDiskSpaceWarningThreshold()); - commonConfig.setNodeStatus(NodeStatus.ReadOnly); - commonConfig.setStatusReason(NodeStatus.DISK_FULL); - } else if (NodeStatus.ReadOnly.equals(commonConfig.getNodeStatus()) - && NodeStatus.DISK_FULL.equals(commonConfig.getStatusReason())) { - commonConfig.setNodeStatus(NodeStatus.Running); - commonConfig.setStatusReason(null); + diskStatus = DiskChecker.DiskStatus.DISK_FULL; } + } else if (diskStatus == DiskChecker.DiskStatus.NORMAL) { + // Metrics not available yet — fall back to no-op so we don't churn the status. + return; } + DiskChecker.apply(diskStatus); } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/disk/FolderManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/disk/FolderManager.java index 77a332152ab..80f739abbc3 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/disk/FolderManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/disk/FolderManager.java @@ -33,13 +33,25 @@ import org.apache.iotdb.db.storageengine.rescon.disk.strategy.SequenceStrategy; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.lang.ref.WeakReference; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.concurrent.CopyOnWriteArrayList; public class FolderManager { private static final Logger logger = LoggerFactory.getLogger(FolderManager.class); + /** + * Registry of every live {@link FolderManager} instance so the DataNode heartbeat path can ask + * "is any folder anywhere on this node currently ABNORMAL?" without each subsystem having to push + * state into a central reporter. Weak references avoid keeping short-lived managers alive (e.g. + * those created per snapshot/load). + */ + private static final List<WeakReference<FolderManager>> ALL_INSTANCES = + new CopyOnWriteArrayList<>(); + /** Represents the operational states of a data folder. */ public enum FolderState { /** Indicates the folder is functioning normally with no issues. */ @@ -62,6 +74,7 @@ public class FolderManager { throws DiskSpaceInsufficientException { this.folders = folders; folders.forEach(dir -> foldersStates.put(dir, FolderState.HEALTHY)); + ALL_INSTANCES.add(new WeakReference<>(this)); switch (type) { case SEQUENCE_STRATEGY: this.selectStrategy = new SequenceStrategy(); @@ -147,4 +160,34 @@ public class FolderManager { public List<String> getFolders() { return folders; } + + /** + * Walks every live FolderManager instance and reports whether any folder is currently {@link + * FolderState#ABNORMAL}. Used by the DataNode heartbeat path to derive a {@code + * NodeStatus.ReadOnly(DiskCrash)} signal from already-observed write failures. + * + * <p>Stale (GC'd) weak references are pruned as a side effect. + */ + public static boolean hasAnyAbnormalFolder() { + boolean anyAbnormal = false; + Iterator<WeakReference<FolderManager>> it = ALL_INSTANCES.iterator(); + while (it.hasNext()) { + FolderManager fm = it.next().get(); + if (fm == null) { + continue; + } + for (FolderState state : fm.foldersStates.values()) { + if (state == FolderState.ABNORMAL) { + anyAbnormal = true; + break; + } + } + if (anyAbnormal) { + break; + } + } + // Prune dead entries. CopyOnWriteArrayList tolerates concurrent removeIf safely. + ALL_INSTANCES.removeIf(ref -> ref.get() == null); + return anyAbnormal; + } } diff --git a/iotdb-core/node-commons/src/main/i18n/en/org/apache/iotdb/commons/i18n/CommonMessages.java b/iotdb-core/node-commons/src/main/i18n/en/org/apache/iotdb/commons/i18n/CommonMessages.java index 5a2e4196197..462f01f5f34 100644 --- a/iotdb-core/node-commons/src/main/i18n/en/org/apache/iotdb/commons/i18n/CommonMessages.java +++ b/iotdb-core/node-commons/src/main/i18n/en/org/apache/iotdb/commons/i18n/CommonMessages.java @@ -37,6 +37,16 @@ public final class CommonMessages { public static final String NODE_STATUS_NOT_EXIST = "NodeStatus %s doesn't exist."; public static final String UNKNOWN_NODE_STATUS = "Unknown NodeStatus %s."; + // --- disk health --- + public static final String DISK_FULL_SET_READ_ONLY = + "Free disk space ratio is below the configured threshold; set node status to ReadOnly(DiskFull)."; + public static final String DISK_CRASH_SET_READ_ONLY = + "Detected unwritable disk directory; set node status to ReadOnly(DiskCrash)."; + public static final String DISK_CRASH_PROBE_FAILED = + "Disk health probe write failed for directory {}."; + public static final String DISK_RECOVERED_SET_RUNNING = + "Disk health recovered (previous reason: {}); set node status to Running."; + // --- consensus --- public static final String UNRECOGNIZED_CONSENSUS_GROUP_ID = "Unrecognized ConsensusGroupId: %s"; diff --git a/iotdb-core/node-commons/src/main/i18n/zh/org/apache/iotdb/commons/i18n/CommonMessages.java b/iotdb-core/node-commons/src/main/i18n/zh/org/apache/iotdb/commons/i18n/CommonMessages.java index a4b72b9e427..dfe57a5f9c6 100644 --- a/iotdb-core/node-commons/src/main/i18n/zh/org/apache/iotdb/commons/i18n/CommonMessages.java +++ b/iotdb-core/node-commons/src/main/i18n/zh/org/apache/iotdb/commons/i18n/CommonMessages.java @@ -36,6 +36,16 @@ public final class CommonMessages { public static final String NODE_STATUS_NOT_EXIST = "NodeStatus %s 不存在。"; public static final String UNKNOWN_NODE_STATUS = "未知 NodeStatus %s。"; + // --- disk health --- + public static final String DISK_FULL_SET_READ_ONLY = + "磁盘剩余空间比例低于配置阈值,将节点状态设为 ReadOnly(DiskFull)。"; + public static final String DISK_CRASH_SET_READ_ONLY = + "检测到不可写的磁盘目录,将节点状态设为 ReadOnly(DiskCrash)。"; + public static final String DISK_CRASH_PROBE_FAILED = + "对目录 {} 进行磁盘健康探测时写入失败。"; + public static final String DISK_RECOVERED_SET_RUNNING = + "磁盘健康已恢复(先前原因:{}),将节点状态设为 Running。"; + // --- consensus --- public static final String UNRECOGNIZED_CONSENSUS_GROUP_ID = "无法识别的 ConsensusGroupId:%s"; diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/cluster/DiskChecker.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/cluster/DiskChecker.java new file mode 100644 index 00000000000..4632ddf28dd --- /dev/null +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/cluster/DiskChecker.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.commons.cluster; + +import org.apache.iotdb.commons.conf.CommonConfig; +import org.apache.iotdb.commons.conf.CommonDescriptor; +import org.apache.iotdb.commons.i18n.CommonMessages; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.List; + +/** + * Shared utility used by both ConfigNode and DataNode to evaluate the health of a set of critical + * directories and, optionally, drive the global {@link NodeStatus} on {@link CommonConfig}. + * + * <p>Only transitions between {@link NodeStatus#Running} and {@link NodeStatus#ReadOnly} with + * reason {@link NodeStatus#DISK_FULL}/{@link NodeStatus#DISK_CRASH} are managed here. Other + * ReadOnly reasons (e.g. manual) are left untouched. + */ +public class DiskChecker { + + private static final Logger LOGGER = LoggerFactory.getLogger(DiskChecker.class); + + private static final byte[] PROBE_PAYLOAD = new byte[] {0x01}; + private static final String PROBE_PREFIX = "iotdb-disk-probe-"; + private static final String PROBE_SUFFIX = ".tmp"; + + public enum DiskStatus { + NORMAL, + DISK_FULL, + DISK_CRASH + } + + private DiskChecker() {} + + /** + * Evaluate the supplied directories. A single unwritable directory yields {@link + * DiskStatus#DISK_CRASH}; any directory whose usable/total ratio is below the threshold yields + * {@link DiskStatus#DISK_FULL} (when no crash is detected); otherwise {@link DiskStatus#NORMAL}. + */ + public static DiskStatus check(List<String> dirs, double freeRatioThreshold) { + if (dirs == null || dirs.isEmpty()) { + return DiskStatus.NORMAL; + } + boolean anyFull = false; + for (String dir : dirs) { + if (dir == null || dir.isEmpty()) { + continue; + } + File f = new File(dir); + if (!f.isDirectory()) { + LOGGER.warn(CommonMessages.DISK_CRASH_PROBE_FAILED, dir); + return DiskStatus.DISK_CRASH; + } + try { + Path probe = Files.createTempFile(Paths.get(dir), PROBE_PREFIX, PROBE_SUFFIX); + try { + Files.write(probe, PROBE_PAYLOAD); + } finally { + Files.deleteIfExists(probe); + } + } catch (IOException e) { + LOGGER.warn(CommonMessages.DISK_CRASH_PROBE_FAILED, dir, e); + return DiskStatus.DISK_CRASH; + } + long total = f.getTotalSpace(); + long usable = f.getUsableSpace(); + if (total > 0 && (double) usable / total < freeRatioThreshold) { + anyFull = true; + } + } + return anyFull ? DiskStatus.DISK_FULL : DiskStatus.NORMAL; + } + + /** + * Run {@link #check} and apply the result to {@link CommonConfig}. See class javadoc for + * transition rules. + */ + public static void checkAndApply(List<String> dirs, double freeRatioThreshold) { + apply(check(dirs, freeRatioThreshold)); + } + + /** Visible for tests; package-public callers should prefer {@link #checkAndApply}. */ + public static void apply(DiskStatus result) { + CommonConfig config = CommonDescriptor.getInstance().getConfig(); + NodeStatus currentStatus = config.getNodeStatus(); + String currentReason = config.getStatusReason(); + boolean currentlyFull = + NodeStatus.ReadOnly.equals(currentStatus) && NodeStatus.DISK_FULL.equals(currentReason); + boolean currentlyCrash = + NodeStatus.ReadOnly.equals(currentStatus) && NodeStatus.DISK_CRASH.equals(currentReason); + + switch (result) { + case DISK_CRASH: + if (!currentlyCrash) { + LOGGER.warn(CommonMessages.DISK_CRASH_SET_READ_ONLY); + config.setNodeStatus(NodeStatus.ReadOnly); + config.setStatusReason(NodeStatus.DISK_CRASH); + } + break; + case DISK_FULL: + // DiskCrash has higher priority — do not downgrade an existing crash to full. + if (currentlyCrash) { + return; + } + if (!currentlyFull) { + LOGGER.warn(CommonMessages.DISK_FULL_SET_READ_ONLY); + config.setNodeStatus(NodeStatus.ReadOnly); + config.setStatusReason(NodeStatus.DISK_FULL); + } + break; + case NORMAL: + default: + if (currentlyFull || currentlyCrash) { + LOGGER.info(CommonMessages.DISK_RECOVERED_SET_RUNNING, currentReason); + config.setNodeStatus(NodeStatus.Running); + config.setStatusReason(null); + } + break; + } + } +} diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/cluster/NodeStatus.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/cluster/NodeStatus.java index 518a9faaed2..e37b44edafd 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/cluster/NodeStatus.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/cluster/NodeStatus.java @@ -35,6 +35,7 @@ public enum NodeStatus { /** Only query statements are permitted */ ReadOnly("ReadOnly"); public static final String DISK_FULL = "DiskFull"; + public static final String DISK_CRASH = "DiskCrash"; private final String status; diff --git a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/cluster/DiskCheckerTest.java b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/cluster/DiskCheckerTest.java new file mode 100644 index 00000000000..fde492c1090 --- /dev/null +++ b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/cluster/DiskCheckerTest.java @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.commons.cluster; + +import org.apache.iotdb.commons.conf.CommonConfig; +import org.apache.iotdb.commons.conf.CommonDescriptor; + +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.util.Collections; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +public class DiskCheckerTest { + + @Rule public TemporaryFolder tmp = new TemporaryFolder(); + + private NodeStatus savedStatus; + private String savedReason; + + @Before + public void setUp() { + CommonConfig config = CommonDescriptor.getInstance().getConfig(); + savedStatus = config.getNodeStatus(); + savedReason = config.getStatusReason(); + config.setNodeStatus(NodeStatus.Running); + config.setStatusReason(null); + } + + @After + public void tearDown() { + CommonConfig config = CommonDescriptor.getInstance().getConfig(); + config.setNodeStatus(savedStatus); + config.setStatusReason(savedReason); + } + + @Test + public void checkReturnsNormalForWritableDirectoryWithSpace() throws Exception { + File dir = tmp.newFolder(); + // threshold 0.0 → any positive usable space passes + assertEquals( + DiskChecker.DiskStatus.NORMAL, + DiskChecker.check(Collections.singletonList(dir.getAbsolutePath()), 0.0)); + } + + @Test + public void checkReturnsDiskFullWhenBelowThreshold() throws Exception { + File dir = tmp.newFolder(); + // threshold > 1 forces every directory to be reported as full + assertEquals( + DiskChecker.DiskStatus.DISK_FULL, + DiskChecker.check(Collections.singletonList(dir.getAbsolutePath()), 2.0)); + } + + @Test + public void checkReturnsDiskCrashWhenDirectoryMissing() { + String missing = new File(tmp.getRoot(), "does-not-exist").getAbsolutePath(); + assertEquals( + DiskChecker.DiskStatus.DISK_CRASH, + DiskChecker.check(Collections.singletonList(missing), 0.0)); + } + + @Test + public void checkReturnsDiskCrashWhenPathIsAFile() throws Exception { + File file = tmp.newFile(); + assertEquals( + DiskChecker.DiskStatus.DISK_CRASH, + DiskChecker.check(Collections.singletonList(file.getAbsolutePath()), 0.0)); + } + + @Test + public void checkPrioritizesCrashOverFull() throws Exception { + File healthy = tmp.newFolder(); + String missing = new File(tmp.getRoot(), "missing").getAbsolutePath(); + // Even with a "full" threshold the missing dir trumps it. + assertEquals( + DiskChecker.DiskStatus.DISK_CRASH, + DiskChecker.check(java.util.Arrays.asList(healthy.getAbsolutePath(), missing), 2.0)); + } + + @Test + public void checkSkipsNullAndEmptyEntries() throws Exception { + File dir = tmp.newFolder(); + assertEquals( + DiskChecker.DiskStatus.NORMAL, + DiskChecker.check(java.util.Arrays.asList(null, "", dir.getAbsolutePath()), 0.0)); + } + + @Test + public void applyDiskFullSetsReadOnlyFromRunning() { + DiskChecker.apply(DiskChecker.DiskStatus.DISK_FULL); + CommonConfig config = CommonDescriptor.getInstance().getConfig(); + assertEquals(NodeStatus.ReadOnly, config.getNodeStatus()); + assertEquals(NodeStatus.DISK_FULL, config.getStatusReason()); + } + + @Test + public void applyDiskCrashSetsReadOnlyFromRunning() { + DiskChecker.apply(DiskChecker.DiskStatus.DISK_CRASH); + CommonConfig config = CommonDescriptor.getInstance().getConfig(); + assertEquals(NodeStatus.ReadOnly, config.getNodeStatus()); + assertEquals(NodeStatus.DISK_CRASH, config.getStatusReason()); + } + + @Test + public void applyDiskCrashUpgradesFromDiskFull() { + DiskChecker.apply(DiskChecker.DiskStatus.DISK_FULL); + DiskChecker.apply(DiskChecker.DiskStatus.DISK_CRASH); + CommonConfig config = CommonDescriptor.getInstance().getConfig(); + assertEquals(NodeStatus.ReadOnly, config.getNodeStatus()); + assertEquals(NodeStatus.DISK_CRASH, config.getStatusReason()); + } + + @Test + public void applyDiskFullDoesNotDowngradeDiskCrash() { + DiskChecker.apply(DiskChecker.DiskStatus.DISK_CRASH); + DiskChecker.apply(DiskChecker.DiskStatus.DISK_FULL); + CommonConfig config = CommonDescriptor.getInstance().getConfig(); + assertEquals(NodeStatus.ReadOnly, config.getNodeStatus()); + assertEquals( + "DiskCrash must outrank DiskFull", NodeStatus.DISK_CRASH, config.getStatusReason()); + } + + @Test + public void applyNormalRecoversFromDiskFull() { + DiskChecker.apply(DiskChecker.DiskStatus.DISK_FULL); + DiskChecker.apply(DiskChecker.DiskStatus.NORMAL); + CommonConfig config = CommonDescriptor.getInstance().getConfig(); + assertEquals(NodeStatus.Running, config.getNodeStatus()); + assertNull(config.getStatusReason()); + } + + @Test + public void applyNormalRecoversFromDiskCrash() { + DiskChecker.apply(DiskChecker.DiskStatus.DISK_CRASH); + DiskChecker.apply(DiskChecker.DiskStatus.NORMAL); + CommonConfig config = CommonDescriptor.getInstance().getConfig(); + assertEquals(NodeStatus.Running, config.getNodeStatus()); + assertNull(config.getStatusReason()); + } + + @Test + public void applyLeavesNonDiskReadOnlyReasonUntouched() { + CommonConfig config = CommonDescriptor.getInstance().getConfig(); + config.setNodeStatus(NodeStatus.ReadOnly); + config.setStatusReason("ManualMaintenance"); + + DiskChecker.apply(DiskChecker.DiskStatus.NORMAL); + assertEquals(NodeStatus.ReadOnly, config.getNodeStatus()); + assertEquals("ManualMaintenance", config.getStatusReason()); + + DiskChecker.apply(DiskChecker.DiskStatus.DISK_FULL); + // DISK_FULL only fires when not already DiskFull/DiskCrash — it does take over here, + // mirroring the existing behavior for the legacy sampleDiskLoad path. + assertEquals(NodeStatus.ReadOnly, config.getNodeStatus()); + assertEquals(NodeStatus.DISK_FULL, config.getStatusReason()); + } + + @Test + public void applyIsIdempotentForRepeatedDiskCrash() { + DiskChecker.apply(DiskChecker.DiskStatus.DISK_CRASH); + NodeStatus before = CommonDescriptor.getInstance().getConfig().getNodeStatus(); + String reasonBefore = CommonDescriptor.getInstance().getConfig().getStatusReason(); + DiskChecker.apply(DiskChecker.DiskStatus.DISK_CRASH); + assertEquals(before, CommonDescriptor.getInstance().getConfig().getNodeStatus()); + assertEquals(reasonBefore, CommonDescriptor.getInstance().getConfig().getStatusReason()); + } + + @Test + public void checkAndApplyDrivesStatusEndToEnd() throws Exception { + File healthy = tmp.newFolder(); + DiskChecker.checkAndApply(Collections.singletonList(healthy.getAbsolutePath()), 0.0); + assertEquals(NodeStatus.Running, CommonDescriptor.getInstance().getConfig().getNodeStatus()); + + String missing = new File(tmp.getRoot(), "still-missing").getAbsolutePath(); + DiskChecker.checkAndApply(Collections.singletonList(missing), 0.0); + assertEquals(NodeStatus.ReadOnly, CommonDescriptor.getInstance().getConfig().getNodeStatus()); + assertEquals( + NodeStatus.DISK_CRASH, CommonDescriptor.getInstance().getConfig().getStatusReason()); + + DiskChecker.checkAndApply(Collections.singletonList(healthy.getAbsolutePath()), 0.0); + assertEquals(NodeStatus.Running, CommonDescriptor.getInstance().getConfig().getNodeStatus()); + assertNull(CommonDescriptor.getInstance().getConfig().getStatusReason()); + } + + @Test + public void emptyDirListIsNormal() { + assertEquals(DiskChecker.DiskStatus.NORMAL, DiskChecker.check(Collections.emptyList(), 1.0)); + assertEquals(DiskChecker.DiskStatus.NORMAL, DiskChecker.check(null, 1.0)); + } + + @Test + public void smokeProbeFileIsDeleted() throws Exception { + File dir = tmp.newFolder(); + DiskChecker.check(Collections.singletonList(dir.getAbsolutePath()), 0.0); + File[] leftovers = dir.listFiles(); + assertTrue( + "Disk probe should clean up its temp file", leftovers == null || leftovers.length == 0); + } +} diff --git a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift index 22529ffbb73..b6f16c0ca99 100644 --- a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift +++ b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift @@ -524,6 +524,13 @@ struct TConfigNodeHeartbeatResp { 1: required i64 timestamp 2: optional string activateStatus 3: optional common.TLicense license + // Reported ConfigNode status (e.g. Running, ReadOnly). Optional for forward + // compatibility — old ConfigNodes do not populate this field, in which case + // the leader falls back to assuming Running. + 4: optional string status + // Optional human/machine readable reason accompanying ReadOnly status, + // e.g. NodeStatus.DISK_FULL or NodeStatus.DISK_CRASH. + 5: optional string statusReason } struct TAddConsensusGroupReq {
