This is an automated email from the ASF dual-hosted git repository. Caideyipi pushed a commit to branch fix-log-throttling in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit f966af03207f76392f61af6dab741ebceae72139 Author: Caideyipi <[email protected]> AuthorDate: Tue Jun 9 18:06:52 2026 +0800 Limit repeated warning logs --- .../iotdb/db/auth/ClusterAuthorityFetcher.java | 37 +++++++-- .../org/apache/iotdb/db/auth/LoginLockManager.java | 97 +++++++++++++++++++--- .../iotdb/db/auth/AuthorizerManagerTest.java | 34 ++++++++ .../apache/iotdb/db/auth/LoginLockManagerTest.java | 64 ++++++++++++++ .../apache/iotdb/commons/utils/JVMCommonUtils.java | 46 ++++++++-- .../iotdb/commons/utils/JVMCommonUtilsTest.java | 43 ++++++++++ 6 files changed, 297 insertions(+), 24 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/auth/ClusterAuthorityFetcher.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/auth/ClusterAuthorityFetcher.java index 641ead173d2..5910b9f7ae9 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/auth/ClusterAuthorityFetcher.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/auth/ClusterAuthorityFetcher.java @@ -72,6 +72,7 @@ import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; import java.util.function.BiFunction; import static org.apache.iotdb.commons.auth.utils.AuthUtils.constructAuthorityScope; @@ -89,6 +90,8 @@ public class ClusterAuthorityFetcher implements IAuthorityFetcher { ConfigNodeClientManager.getInstance(); private static final String CONNECTERROR = "Failed to connect to config node."; + private static final long CONNECT_ERROR_LOG_INTERVAL_MS = 60_000L; + private static final AtomicLong LAST_CONNECT_ERROR_LOG_TIME = new AtomicLong(0L); public ClusterAuthorityFetcher(IAuthorCache iAuthorCache) { this.iAuthorCache = iAuthorCache; @@ -370,8 +373,9 @@ public class ClusterAuthorityFetcher implements IAuthorityFetcher { try (ConfigNodeClient configNodeClient = CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { authizedPatternTree = configNodeClient.fetchAuthizedPatternTree(req); + resetConfigNodeConnectionErrorLogTime(); } catch (ClientManagerException | TException e) { - LOGGER.error(CONNECTERROR); + logConfigNodeConnectionError(); authizedPatternTree.setStatus( RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR, CONNECTERROR)); } @@ -397,6 +401,7 @@ public class ClusterAuthorityFetcher implements IAuthorityFetcher { statementToAuthorizerReq((RelationalAuthorStatement) plan)) : configNodeClient.operatePermission( statementToAuthorizerReq((AuthorStatement) plan)); + resetConfigNodeConnectionErrorLogTime(); if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != tsStatus.getCode()) { future.setException(new IoTDBException(tsStatus)); } else { @@ -406,7 +411,7 @@ public class ClusterAuthorityFetcher implements IAuthorityFetcher { } catch (AuthException e) { future.setException(e); } catch (ClientManagerException | TException e) { - LOGGER.error(CONNECTERROR); + logConfigNodeConnectionError(); future.setException(e); } return future; @@ -471,6 +476,7 @@ public class ClusterAuthorityFetcher implements IAuthorityFetcher { ? configNodeClient.queryRPermission( statementToAuthorizerReq((RelationalAuthorStatement) plan)) : configNodeClient.queryPermission(statementToAuthorizerReq((AuthorStatement) plan)); + resetConfigNodeConnectionErrorLogTime(); if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != authorizerResp.getStatus().getCode()) { future.setException( new IoTDBException( @@ -481,7 +487,7 @@ public class ClusterAuthorityFetcher implements IAuthorityFetcher { } catch (AuthException e) { future.setException(e); } catch (ClientManagerException | TException e) { - LOGGER.error(CONNECTERROR); + logConfigNodeConnectionError(); authorizerResp.setStatus( RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR, CONNECTERROR)); future.setException(new IoTDBException(authorizerResp.getStatus())); @@ -561,8 +567,9 @@ public class ClusterAuthorityFetcher implements IAuthorityFetcher { CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { // Send request to some API server status = configNodeClient.login(req); + resetConfigNodeConnectionErrorLogTime(); } catch (ClientManagerException | TException e) { - LOGGER.error(CONNECTERROR); + logConfigNodeConnectionError(); status = new TPermissionInfoResp(); status.setStatus(RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR, CONNECTERROR)); } finally { @@ -593,8 +600,9 @@ public class ClusterAuthorityFetcher implements IAuthorityFetcher { CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { // Send request to some API server permissionInfoResp = configNodeClient.getUser(userName); + resetConfigNodeConnectionErrorLogTime(); } catch (ClientManagerException | TException e) { - LOGGER.error(CONNECTERROR); + logConfigNodeConnectionError(); } if (permissionInfoResp != null && permissionInfoResp.getStatus().getCode() @@ -629,8 +637,9 @@ public class ClusterAuthorityFetcher implements IAuthorityFetcher { CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { // Send request to some API server permissionInfoResp = configNodeClient.checkUserPrivileges(req); + resetConfigNodeConnectionErrorLogTime(); } catch (ClientManagerException | TException e) { - LOGGER.error(CONNECTERROR); + logConfigNodeConnectionError(); permissionInfoResp = new TPermissionInfoResp(); permissionInfoResp.setStatus( RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR, CONNECTERROR)); @@ -660,8 +669,9 @@ public class ClusterAuthorityFetcher implements IAuthorityFetcher { CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { // Send request to some API server permissionInfoResp = configNodeClient.checkRoleOfUser(req); + resetConfigNodeConnectionErrorLogTime(); } catch (ClientManagerException | TException e) { - LOGGER.error(CONNECTERROR); + logConfigNodeConnectionError(); permissionInfoResp = new TPermissionInfoResp(); permissionInfoResp.setStatus( RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR, CONNECTERROR)); @@ -715,6 +725,19 @@ public class ClusterAuthorityFetcher implements IAuthorityFetcher { return user; } + static void logConfigNodeConnectionError() { + long now = System.currentTimeMillis(); + long lastLogTime = LAST_CONNECT_ERROR_LOG_TIME.get(); + if ((lastLogTime == 0 || now - lastLogTime >= CONNECT_ERROR_LOG_INTERVAL_MS) + && LAST_CONNECT_ERROR_LOG_TIME.compareAndSet(lastLogTime, now)) { + LOGGER.error(CONNECTERROR); + } + } + + static void resetConfigNodeConnectionErrorLogTime() { + LAST_CONNECT_ERROR_LOG_TIME.set(0L); + } + /** Cache role. */ public Role cacheRole(String roleName, TPermissionInfoResp tPermissionInfoResp) { TRoleResp resp = tPermissionInfoResp.getRoleInfo().get(roleName); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/auth/LoginLockManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/auth/LoginLockManager.java index 6015d1d538f..330e021f567 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/auth/LoginLockManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/auth/LoginLockManager.java @@ -39,6 +39,8 @@ import java.util.concurrent.ConcurrentMap; public class LoginLockManager { private static final Logger LOGGER = LoggerFactory.getLogger(LoginLockManager.class); + private static final int MULTIPLE_USERS_FOR_IP_WARNING_THRESHOLD = 50; + private static final int MULTIPLE_IPS_FOR_USER_WARNING_THRESHOLD = 100; // Configuration parameters private final int failedLoginAttempts; @@ -48,6 +50,8 @@ public class LoginLockManager { // Lock records storage (in-memory only) private final ConcurrentMap<Long, UserLockInfo> userLocks = new ConcurrentHashMap<>(); private final ConcurrentMap<String, UserLockInfo> userIpLocks = new ConcurrentHashMap<>(); + private final Set<String> warnedIpsWithMultipleUsers = ConcurrentHashMap.newKeySet(); + private final Set<Long> warnedUsersWithMultipleIpLocks = ConcurrentHashMap.newKeySet(); // Exempt users who should never be locked (only valid if request is from local host) private static final Set<Long> EXEMPT_USERS; @@ -214,7 +218,7 @@ public class LoginLockManager { existing.addFailureTime(now); // Check if threshold reached (log only when it just reaches) int failCountIp = existing.getFailureCount(); - if (failCountIp >= failedLoginAttempts) { + if (failCountIp == failedLoginAttempts) { LOGGER.info(DataNodeMiscMessages.IP_LOCKED, ip, userId); } return existing; @@ -236,7 +240,7 @@ public class LoginLockManager { existing.addFailureTime(now); // Check if threshold reached (log only when it just reaches) int failCountUser = existing.getFailureCount(); - if (failCountUser >= failedLoginAttemptsPerUser) { + if (failCountUser == failedLoginAttemptsPerUser) { LOGGER.info( "User ID '{}' locked due to {} failed attempts", userId, @@ -262,6 +266,7 @@ public class LoginLockManager { String userIpKey = buildUserIpKey(userId, ip); userIpLocks.remove(userIpKey); userLocks.remove(userId); + resetPotentialAttackWarningsIfBelowThreshold(userId, ip); } /** @@ -272,15 +277,27 @@ public class LoginLockManager { */ public void unlock(long userId, String ip) { if (ip == null || ip.isEmpty()) { + Set<String> affectedIps = new HashSet<>(); + for (String key : userIpLocks.keySet()) { + if (key.startsWith(userId + "@")) { + String[] parts = key.split("@", 2); + if (parts.length == 2) { + affectedIps.add(parts[1]); + } + } + } // Unlock global user lock userLocks.remove(userId); // Also remove all IP locks for this user userIpLocks.keySet().removeIf(key -> key.startsWith(userId + "@")); + warnedUsersWithMultipleIpLocks.remove(userId); + affectedIps.forEach(this::resetIpWarningIfBelowThreshold); LOGGER.info(DataNodeMiscMessages.USER_UNLOCKED_MANUAL, userId); } else { // Unlock specific user@ip lock String userIpKey = buildUserIpKey(userId, ip); userIpLocks.remove(userIpKey); + resetPotentialAttackWarningsIfBelowThreshold(userId, ip); LOGGER.info(DataNodeMiscMessages.IP_UNLOCKED_MANUAL, ip, userId); } } @@ -289,6 +306,8 @@ public class LoginLockManager { public void cleanExpiredLocks() { long now = System.currentTimeMillis(); long cutoffTime = now - (passwordLockTimeMinutes * 60 * 1000L); + Set<Long> affectedUsers = new HashSet<>(); + Set<String> affectedIps = new HashSet<>(); // Clean expired user locks userLocks @@ -315,6 +334,10 @@ public class LoginLockManager { info.removeOldFailures(cutoffTime); if (info.getFailureCount() == 0) { final String[] parts = entry.getKey().split("@", 2); + if (parts.length == 2) { + affectedUsers.add(Long.parseLong(parts[0])); + affectedIps.add(parts[1]); + } LOGGER.info( DataNodeMiscMessages.IP_UNLOCKED_EXPIRED, parts.length == 2 ? parts[1] : "", @@ -323,6 +346,9 @@ public class LoginLockManager { } return false; }); + + affectedUsers.forEach(this::resetUserWarningIfBelowThreshold); + affectedIps.forEach(this::resetIpWarningIfBelowThreshold); } // Helper methods @@ -332,15 +358,21 @@ public class LoginLockManager { private void checkForPotentialAttacks(long userId, String ip) { // Check if IP is locked by many users - Set<Long> usersForIp = new HashSet<>(); - for (String key : userIpLocks.keySet()) { - if (key.endsWith("@" + ip)) { - usersForIp.add(Long.parseLong(key.split("@")[0])); + if (ip != null && !ip.isEmpty()) { + Set<Long> usersForIp = new HashSet<>(); + for (String key : userIpLocks.keySet()) { + if (key.endsWith("@" + ip)) { + usersForIp.add(Long.parseLong(key.split("@")[0])); + } } - } - if (usersForIp.size() > 50) { - LOGGER.warn(DataNodeMiscMessages.IP_LOCKED_MULTIPLE_USERS, ip, usersForIp.size()); + if (usersForIp.size() > MULTIPLE_USERS_FOR_IP_WARNING_THRESHOLD) { + if (warnedIpsWithMultipleUsers.add(ip)) { + LOGGER.warn(DataNodeMiscMessages.IP_LOCKED_MULTIPLE_USERS, ip, usersForIp.size()); + } + } else { + warnedIpsWithMultipleUsers.remove(ip); + } } // Check if user has many IP locks @@ -351,9 +383,52 @@ public class LoginLockManager { } } - if (ipsForUser.size() > 100) { - LOGGER.warn(DataNodeMiscMessages.USER_MULTIPLE_IP_LOCKS, userId, ipsForUser.size()); + if (ipsForUser.size() > MULTIPLE_IPS_FOR_USER_WARNING_THRESHOLD) { + if (warnedUsersWithMultipleIpLocks.add(userId)) { + LOGGER.warn(DataNodeMiscMessages.USER_MULTIPLE_IP_LOCKS, userId, ipsForUser.size()); + } + } else { + warnedUsersWithMultipleIpLocks.remove(userId); + } + } + + private void resetPotentialAttackWarningsIfBelowThreshold(long userId, String ip) { + resetUserWarningIfBelowThreshold(userId); + if (ip != null && !ip.isEmpty()) { + resetIpWarningIfBelowThreshold(ip); + } + } + + private void resetUserWarningIfBelowThreshold(long userId) { + if (countIpsForUser(userId) <= MULTIPLE_IPS_FOR_USER_WARNING_THRESHOLD) { + warnedUsersWithMultipleIpLocks.remove(userId); + } + } + + private void resetIpWarningIfBelowThreshold(String ip) { + if (countUsersForIp(ip) <= MULTIPLE_USERS_FOR_IP_WARNING_THRESHOLD) { + warnedIpsWithMultipleUsers.remove(ip); + } + } + + private int countUsersForIp(String ip) { + Set<Long> usersForIp = new HashSet<>(); + for (String key : userIpLocks.keySet()) { + if (key.endsWith("@" + ip)) { + usersForIp.add(Long.parseLong(key.split("@")[0])); + } + } + return usersForIp.size(); + } + + private int countIpsForUser(long userId) { + Set<String> ipsForUser = new HashSet<>(); + for (String key : userIpLocks.keySet()) { + if (key.startsWith(userId + "@")) { + ipsForUser.add(key.split("@")[1]); + } } + return ipsForUser.size(); } public static LoginLockManager getInstance() { diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/auth/AuthorizerManagerTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/auth/AuthorizerManagerTest.java index 7023a3d9fac..9ad64081ff9 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/auth/AuthorizerManagerTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/auth/AuthorizerManagerTest.java @@ -30,8 +30,11 @@ import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp; import org.apache.iotdb.confignode.rpc.thrift.TUserResp; import org.apache.iotdb.rpc.TSStatusCode; +import ch.qos.logback.classic.spi.ILoggingEvent; +import ch.qos.logback.core.read.ListAppender; import org.junit.Assert; import org.junit.Test; +import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.Collections; @@ -44,6 +47,31 @@ public class AuthorizerManagerTest { ClusterAuthorityFetcher authorityFetcher = new ClusterAuthorityFetcher(new BasicAuthorityCache()); + @Test + public void configNodeConnectionErrorLoggedOnlyOnceUntilReset() { + ch.qos.logback.classic.Logger logger = + (ch.qos.logback.classic.Logger) LoggerFactory.getLogger(ClusterAuthorityFetcher.class); + ListAppender<ILoggingEvent> appender = new ListAppender<>(); + appender.setContext(logger.getLoggerContext()); + appender.start(); + logger.addAppender(appender); + + ClusterAuthorityFetcher.resetConfigNodeConnectionErrorLogTime(); + try { + ClusterAuthorityFetcher.logConfigNodeConnectionError(); + ClusterAuthorityFetcher.logConfigNodeConnectionError(); + Assert.assertEquals(1, countLogEvents(appender, "Failed to connect to config node.")); + + ClusterAuthorityFetcher.resetConfigNodeConnectionErrorLogTime(); + ClusterAuthorityFetcher.logConfigNodeConnectionError(); + Assert.assertEquals(2, countLogEvents(appender, "Failed to connect to config node.")); + } finally { + ClusterAuthorityFetcher.resetConfigNodeConnectionErrorLogTime(); + logger.detachAppender(appender); + appender.stop(); + } + } + @Test public void permissionCacheTest() throws IllegalPathException { User user = new User(); @@ -258,4 +286,10 @@ public class AuthorizerManagerTest { TSStatusCode.SUCCESS_STATUS.getStatusCode(), authorityFetcher.checkUserSysPrivilegesGrantOpt("user1", PrivilegeType.USE_CQ).getCode()); } + + private long countLogEvents(ListAppender<ILoggingEvent> appender, String messagePattern) { + return appender.list.stream() + .filter(event -> messagePattern.equals(event.getMessage())) + .count(); + } } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/auth/LoginLockManagerTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/auth/LoginLockManagerTest.java index 0a9e95886a8..b6c4de9698b 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/auth/LoginLockManagerTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/auth/LoginLockManagerTest.java @@ -20,9 +20,13 @@ package org.apache.iotdb.db.auth; import org.apache.iotdb.db.auth.LoginLockManager.UserLockInfo; +import org.apache.iotdb.db.i18n.DataNodeMiscMessages; +import ch.qos.logback.classic.spi.ILoggingEvent; +import ch.qos.logback.core.read.ListAppender; import org.junit.Before; import org.junit.Test; +import org.slf4j.LoggerFactory; import java.lang.reflect.Field; import java.net.InetAddress; @@ -141,6 +145,66 @@ public class LoginLockManagerTest { lockManager.checkLock(TEST_USER_ID, TEST_IP)); } + @Test + public void testLockMessagesLoggedOnlyWhenThresholdFirstReached() { + ch.qos.logback.classic.Logger logger = + (ch.qos.logback.classic.Logger) LoggerFactory.getLogger(LoginLockManager.class); + ListAppender<ILoggingEvent> appender = new ListAppender<>(); + appender.setContext(logger.getLoggerContext()); + appender.start(); + logger.addAppender(appender); + + try { + for (int i = 0; i < failedLoginAttemptsPerUser + 2; i++) { + lockManager.recordFailure(TEST_USER_ID, TEST_IP); + } + + assertEquals(1, countLogEvents(appender, DataNodeMiscMessages.IP_LOCKED)); + assertEquals(1, countLogEvents(appender, "User ID '{}' locked due to {} failed attempts")); + } finally { + logger.detachAppender(appender); + appender.stop(); + } + } + + @Test + public void testPotentialAttackWarningsLoggedOnlyOnceWhileThresholdExceeded() { + ch.qos.logback.classic.Logger logger = + (ch.qos.logback.classic.Logger) LoggerFactory.getLogger(LoginLockManager.class); + ListAppender<ILoggingEvent> appender = new ListAppender<>(); + appender.setContext(logger.getLoggerContext()); + appender.start(); + logger.addAppender(appender); + + try { + for (int i = 0; i < 55; i++) { + lockManager.recordFailure(TEST_USER_ID + i, TEST_IP); + } + for (int i = 0; i < 5; i++) { + lockManager.recordFailure(TEST_USER_ID + i, TEST_IP); + } + + for (int i = 0; i < 105; i++) { + lockManager.recordFailure(OTHER_USER_ID, "172.16.0." + i); + } + for (int i = 0; i < 5; i++) { + lockManager.recordFailure(OTHER_USER_ID, "172.16.0." + i); + } + + assertEquals(1, countLogEvents(appender, DataNodeMiscMessages.IP_LOCKED_MULTIPLE_USERS)); + assertEquals(1, countLogEvents(appender, DataNodeMiscMessages.USER_MULTIPLE_IP_LOCKS)); + } finally { + logger.detachAppender(appender); + appender.stop(); + } + } + + private long countLogEvents(ListAppender<ILoggingEvent> appender, String messagePattern) { + return appender.list.stream() + .filter(event -> messagePattern.equals(event.getMessage())) + .count(); + } + @Test public void testGlobalUserLockAfterMaxAttempts() { for (int i = 0; i < failedLoginAttemptsPerUser; i++) { diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/JVMCommonUtils.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/JVMCommonUtils.java index ccbe525d3d5..4221be8ec8f 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/JVMCommonUtils.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/JVMCommonUtils.java @@ -31,11 +31,15 @@ import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Stream; public class JVMCommonUtils { private static final Logger LOGGER = LoggerFactory.getLogger(JVMCommonUtils.class); + private static final long DISK_WARNING_PRINT_INTERVAL_MS = 3600 * 1000L; /** Default executor pool maximum size. */ public static final int MAX_EXECUTOR_POOL_SIZE = Math.max(100, getCpuCores() * 5); @@ -44,6 +48,10 @@ public class JVMCommonUtils { private static double diskSpaceWarningThreshold = CommonDescriptor.getInstance().getConfig().getDiskSpaceWarningThreshold(); + private static final ConcurrentMap<String, Long> cannotGetFreeSpaceLastPrintTimeMap = + new ConcurrentHashMap<>(); + private static final ConcurrentMap<String, Long> diskAboveWarningThresholdLastPrintTimeMap = + new ConcurrentHashMap<>(); /** * get JDK version. @@ -87,16 +95,20 @@ public class JVMCommonUtils { long freeSpace = IOUtils.retryNoException(5, 2000L, dirFile::getFreeSpace, space -> space > 0).orElse(0L); if (freeSpace == 0) { - LOGGER.warn(UtilMessages.CANNOT_GET_FREE_SPACE, dir); + if (shouldPrintDiskWarning(cannotGetFreeSpaceLastPrintTimeMap, dir)) { + LOGGER.warn(UtilMessages.CANNOT_GET_FREE_SPACE, dir); + } + } else { + cannotGetFreeSpaceLastPrintTimeMap.remove(dir); } long totalSpace = dirFile.getTotalSpace(); double ratio = 1.0 * freeSpace / totalSpace; if (ratio <= diskSpaceWarningThreshold) { - LOGGER.warn( - "{} is above the warning threshold, free space {}, total space {}", - dir, - freeSpace, - totalSpace); + if (shouldPrintDiskWarning(diskAboveWarningThresholdLastPrintTimeMap, dir)) { + LOGGER.warn(UtilMessages.DISK_ABOVE_WARNING_THRESHOLD, dir, freeSpace, totalSpace); + } + } else { + diskAboveWarningThresholdLastPrintTimeMap.remove(dir); } return ratio; } catch (Exception e) { @@ -128,4 +140,26 @@ public class JVMCommonUtils { public static void setDiskSpaceWarningThreshold(double threshold) { diskSpaceWarningThreshold = threshold; } + + @TestOnly + static void resetDiskWarningLastPrintTimes() { + cannotGetFreeSpaceLastPrintTimeMap.clear(); + diskAboveWarningThresholdLastPrintTimeMap.clear(); + } + + private static boolean shouldPrintDiskWarning( + ConcurrentMap<String, Long> lastPrintTimeMap, String dir) { + long now = System.currentTimeMillis(); + AtomicBoolean shouldPrint = new AtomicBoolean(false); + lastPrintTimeMap.compute( + dir, + (key, lastPrintTime) -> { + if (lastPrintTime == null || now - lastPrintTime > DISK_WARNING_PRINT_INTERVAL_MS) { + shouldPrint.set(true); + return now; + } + return lastPrintTime; + }); + return shouldPrint.get(); + } } diff --git a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/utils/JVMCommonUtilsTest.java b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/utils/JVMCommonUtilsTest.java index 4a247cfb765..3cab5a55f12 100644 --- a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/utils/JVMCommonUtilsTest.java +++ b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/utils/JVMCommonUtilsTest.java @@ -19,8 +19,18 @@ package org.apache.iotdb.commons.utils; +import org.apache.iotdb.commons.conf.CommonDescriptor; +import org.apache.iotdb.commons.i18n.UtilMessages; + +import ch.qos.logback.classic.spi.ILoggingEvent; +import ch.qos.logback.core.read.ListAppender; import org.junit.Assert; import org.junit.Test; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; public class JVMCommonUtilsTest { @@ -39,4 +49,37 @@ public class JVMCommonUtilsTest { Assert.fail(); } } + + @Test + public void getDiskFreeRatioWarnsOnlyOnceWhileDiskWarningPersists() throws IOException { + Path dir = Files.createTempDirectory("jvm-common-utils-test"); + ch.qos.logback.classic.Logger logger = + (ch.qos.logback.classic.Logger) LoggerFactory.getLogger(JVMCommonUtils.class); + ListAppender<ILoggingEvent> appender = new ListAppender<>(); + appender.setContext(logger.getLoggerContext()); + appender.start(); + logger.addAppender(appender); + + JVMCommonUtils.resetDiskWarningLastPrintTimes(); + JVMCommonUtils.setDiskSpaceWarningThreshold(1.1); + try { + JVMCommonUtils.getDiskFreeRatio(dir.toString()); + JVMCommonUtils.getDiskFreeRatio(dir.toString()); + + Assert.assertEquals(1, countLogEvents(appender, UtilMessages.DISK_ABOVE_WARNING_THRESHOLD)); + } finally { + JVMCommonUtils.setDiskSpaceWarningThreshold( + CommonDescriptor.getInstance().getConfig().getDiskSpaceWarningThreshold()); + JVMCommonUtils.resetDiskWarningLastPrintTimes(); + logger.detachAppender(appender); + appender.stop(); + Files.deleteIfExists(dir); + } + } + + private long countLogEvents(ListAppender<ILoggingEvent> appender, String messagePattern) { + return appender.list.stream() + .filter(event -> messagePattern.equals(event.getMessage())) + .count(); + } }
