This is an automated email from the ASF dual-hosted git repository. JackieTien97 pushed a commit to branch rc/2.0.10 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 5fc7f26b4035395d2d33da83e4f9bea2b9d58bc7 Author: Hongzhi Gao <[email protected]> AuthorDate: Mon Jun 22 15:41:01 2026 +0800 Fix/min folder occupied space cache (#17996) * fix(consensus): cache occupied space in MinFolderOccupiedSpaceFirstStrategy MinFolderOccupiedSpaceFirstStrategy recomputed every folder's occupied space via a full Files.walk on every folder selection. While receiving a snapshot made of hundreds of thousands of tiny files, this turned each per-file allocation into a full directory-tree scan, making the cost quadratic. addPeer stalled (observed ~1 file / 4-5s) and syncLag stayed high. Cache the occupied space per folder and only recompute it periodically: an incremental selection counter is kept and, once a count threshold or a time interval is reached, the cached state is reset and the occupied space is recomputed. Selection semantics (pick the least occupied folder) are preserved while the number of full directory scans is bounded. Add a mocked unit test (DirectoryStrategyTest) and a real-filesystem integration test (MinFolderOccupiedSpaceFirstStrategyRealFsTest) covering the caching, the count-based refresh and the reset. * fix(consensus): make occupied space cache refresh configurable --- .../strategy/DirectoryStrategyTest.java | 35 ++++++ .../conf/iotdb-system.properties.template | 12 ++ .../apache/iotdb/commons/conf/CommonConfig.java | 26 +++++ .../iotdb/commons/conf/CommonDescriptor.java | 25 ++++ .../MinFolderOccupiedSpaceFirstStrategy.java | 102 +++++++++++++++-- ...FolderOccupiedSpaceFirstStrategyRealFsTest.java | 127 +++++++++++++++++++++ 6 files changed, 318 insertions(+), 9 deletions(-) diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/conf/directories/strategy/DirectoryStrategyTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/conf/directories/strategy/DirectoryStrategyTest.java index 6343dcc1699..b3036f5ac00 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/conf/directories/strategy/DirectoryStrategyTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/conf/directories/strategy/DirectoryStrategyTest.java @@ -140,12 +140,47 @@ public class DirectoryStrategyTest { PowerMockito.when(JVMCommonUtils.getOccupiedSpace(dataDirList.get(minIndex))) .thenReturn(Long.MAX_VALUE); + // The occupied space is cached, so the new value is reflected only after a cache refresh. + minFolderOccupiedSpaceFirstStrategy.invalidateCache(); minIndex = getIndexOfMinOccupiedSpace(); for (int i = 0; i < dataDirList.size(); i++) { assertEquals(minIndex, minFolderOccupiedSpaceFirstStrategy.nextFolderIndex()); } } + @Test + public void testMinFolderOccupiedSpaceFirstStrategyCachesOccupiedSpace() + throws DiskSpaceInsufficientException, IOException { + MinFolderOccupiedSpaceFirstStrategy strategy = new MinFolderOccupiedSpaceFirstStrategy(); + // Disable the time-based refresh so the count-based refresh is the only trigger under test. + strategy.setRefreshIntervalMs(Long.MAX_VALUE); + strategy.setRefreshSelectionThreshold(3); + strategy.setFolders(dataDirList); + + int minIndex = getIndexOfMinOccupiedSpace(); + // The first selection builds the cache via a single round of getOccupiedSpace calls. + assertEquals(minIndex, strategy.nextFolderIndex()); + assertEquals(1, strategy.getSelectionsSinceRefresh()); + + // Mutate the occupied space of every folder so that a different folder becomes the least + // occupied one. While the cache is still valid the selection must not change, which proves the + // strategy is no longer re-walking the directory tree on every selection. + for (int i = 0; i < dataDirList.size(); i++) { + boolean available = !fullDirIndexSet.contains(i); + PowerMockito.when(JVMCommonUtils.getOccupiedSpace(dataDirList.get(i))) + .thenReturn(available ? (long) (dataDirList.size() - i) : Long.MAX_VALUE); + } + assertEquals(minIndex, strategy.nextFolderIndex()); + assertEquals(minIndex, strategy.nextFolderIndex()); + assertEquals(3, strategy.getSelectionsSinceRefresh()); + + // The threshold has been reached, so the next selection resets the state, recomputes the + // occupied space and reflects the mutated values. + int newMinIndex = getIndexOfMinOccupiedSpace(); + assertEquals(newMinIndex, strategy.nextFolderIndex()); + assertEquals(1, strategy.getSelectionsSinceRefresh()); + } + private int getIndexOfMinOccupiedSpace() throws IOException { int index = -1; long minOccupied = Long.MAX_VALUE; diff --git a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template index e76d7adc1a9..463192ecd55 100644 --- a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template +++ b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template @@ -764,6 +764,18 @@ topology_probing_timeout_ratio=0.5 # Datatype: double(percentage) disk_space_warning_threshold=0.05 +# The refresh interval in milliseconds for the occupied-space cache used by +# MinFolderOccupiedSpaceFirstStrategy. The default is 60000 (60 seconds). +# effectiveMode: restart +# Datatype: long +min_folder_occupied_space_cache_refresh_interval_ms=60000 + +# The number of folder selections after which MinFolderOccupiedSpaceFirstStrategy refreshes its +# occupied-space cache. The default is 1000. +# effectiveMode: restart +# Datatype: int +min_folder_occupied_space_cache_refresh_selection_threshold=1000 + # Purpose: for data partition repair # The number of threads used for parallel scanning in the partition table recovery # effectiveMode: restart diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java index 6a8956e423b..bcff47d1472 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java @@ -185,6 +185,12 @@ public class CommonConfig { /** Disk Monitor. */ private double diskSpaceWarningThreshold = 0.05; + /** Refresh interval for MinFolderOccupiedSpaceFirstStrategy occupied-space cache. */ + private long minFolderOccupiedSpaceCacheRefreshIntervalMs = 60_000L; + + /** Refresh selection threshold for MinFolderOccupiedSpaceFirstStrategy occupied-space cache. */ + private int minFolderOccupiedSpaceCacheRefreshSelectionThreshold = 1000; + /** Time partition origin in milliseconds. */ private long timePartitionOrigin = 0; @@ -746,6 +752,26 @@ public class CommonConfig { this.diskSpaceWarningThreshold = diskSpaceWarningThreshold; } + public long getMinFolderOccupiedSpaceCacheRefreshIntervalMs() { + return minFolderOccupiedSpaceCacheRefreshIntervalMs; + } + + public void setMinFolderOccupiedSpaceCacheRefreshIntervalMs( + long minFolderOccupiedSpaceCacheRefreshIntervalMs) { + this.minFolderOccupiedSpaceCacheRefreshIntervalMs = + minFolderOccupiedSpaceCacheRefreshIntervalMs; + } + + public int getMinFolderOccupiedSpaceCacheRefreshSelectionThreshold() { + return minFolderOccupiedSpaceCacheRefreshSelectionThreshold; + } + + public void setMinFolderOccupiedSpaceCacheRefreshSelectionThreshold( + int minFolderOccupiedSpaceCacheRefreshSelectionThreshold) { + this.minFolderOccupiedSpaceCacheRefreshSelectionThreshold = + minFolderOccupiedSpaceCacheRefreshSelectionThreshold; + } + public boolean isReadOnly() { return status == NodeStatus.ReadOnly; } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java index 5cd954a09f7..b72cf7aca00 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java @@ -229,6 +229,31 @@ public class CommonDescriptor { String.valueOf(config.getDiskSpaceWarningThreshold())) .trim())); + long minFolderOccupiedSpaceCacheRefreshIntervalMs = + Long.parseLong( + properties + .getProperty( + "min_folder_occupied_space_cache_refresh_interval_ms", + String.valueOf(config.getMinFolderOccupiedSpaceCacheRefreshIntervalMs())) + .trim()); + if (minFolderOccupiedSpaceCacheRefreshIntervalMs > 0) { + config.setMinFolderOccupiedSpaceCacheRefreshIntervalMs( + minFolderOccupiedSpaceCacheRefreshIntervalMs); + } + + int minFolderOccupiedSpaceCacheRefreshSelectionThreshold = + Integer.parseInt( + properties + .getProperty( + "min_folder_occupied_space_cache_refresh_selection_threshold", + String.valueOf( + config.getMinFolderOccupiedSpaceCacheRefreshSelectionThreshold())) + .trim()); + if (minFolderOccupiedSpaceCacheRefreshSelectionThreshold > 0) { + config.setMinFolderOccupiedSpaceCacheRefreshSelectionThreshold( + minFolderOccupiedSpaceCacheRefreshSelectionThreshold); + } + config.setTimestampPrecision( properties.getProperty("timestamp_precision", config.getTimestampPrecision()).trim()); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/disk/strategy/MinFolderOccupiedSpaceFirstStrategy.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/disk/strategy/MinFolderOccupiedSpaceFirstStrategy.java index d7345eb505c..285b8b1d5b0 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/disk/strategy/MinFolderOccupiedSpaceFirstStrategy.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/disk/strategy/MinFolderOccupiedSpaceFirstStrategy.java @@ -18,20 +18,56 @@ */ package org.apache.iotdb.commons.disk.strategy; +import org.apache.iotdb.commons.conf.CommonDescriptor; import org.apache.iotdb.commons.exception.DiskSpaceInsufficientException; import org.apache.iotdb.commons.i18n.UtilMessages; import org.apache.iotdb.commons.utils.JVMCommonUtils; +import org.apache.iotdb.commons.utils.TestOnly; import java.io.IOException; +/** + * Selects the folder with the least occupied space. + * + * <p>Computing the occupied space of a folder requires a full {@code Files.walk} of its directory + * tree, which is very expensive when a folder holds a huge number of (small) files, e.g. while a + * snapshot consisting of hundreds of thousands of tiny files is being received. Re-walking on every + * selection turned the per-file cost into a full-tree scan and made the overall cost quadratic. + * + * <p>To avoid that, the occupied space of every folder is cached and only recomputed periodically. + * Between two refreshes an incremental selection counter is maintained; once enough folders have + * been selected (or enough time has elapsed) the cached state is reset and the occupied space is + * recomputed. This keeps the selection semantics (pick the least occupied folder) while bounding + * the number of full directory scans. + */ public class MinFolderOccupiedSpaceFirstStrategy extends DirectoryStrategy { + private long refreshIntervalMs = + CommonDescriptor.getInstance().getConfig().getMinFolderOccupiedSpaceCacheRefreshIntervalMs(); + private int refreshSelectionThreshold = + CommonDescriptor.getInstance() + .getConfig() + .getMinFolderOccupiedSpaceCacheRefreshSelectionThreshold(); + + /** Cached occupied space per folder, captured at the last refresh. */ + private long[] cachedOccupiedSpace; + + /** Incremental count of selections made since the last refresh. */ + private int selectionsSinceRefresh; + + /** Timestamp (ms) of the last refresh; a negative value means the cache must be (re)built. */ + private long lastRefreshTimeMs = -1; + @Override public int nextFolderIndex() throws DiskSpaceInsufficientException { return getMinOccupiedSpaceFolder(); } - private int getMinOccupiedSpaceFolder() throws DiskSpaceInsufficientException { + private synchronized int getMinOccupiedSpaceFolder() throws DiskSpaceInsufficientException { + if (needRefresh()) { + refreshOccupiedSpace(); + } + int minIndex = -1; long minSpace = Long.MAX_VALUE; @@ -43,14 +79,7 @@ public class MinFolderOccupiedSpaceFirstStrategy extends DirectoryStrategy { if (!JVMCommonUtils.hasSpace(folder)) { continue; } - - long space = 0; - try { - space = JVMCommonUtils.getOccupiedSpace(folder); - } catch (IOException e) { - LOGGER.error(UtilMessages.CANNOT_CALCULATE_OCCUPIED_SPACE, folder, e); - continue; - } + long space = cachedOccupiedSpace[i]; if (space < minSpace) { minSpace = space; minIndex = i; @@ -61,6 +90,61 @@ public class MinFolderOccupiedSpaceFirstStrategy extends DirectoryStrategy { throw new DiskSpaceInsufficientException(folders); } + selectionsSinceRefresh++; return minIndex; } + + private boolean needRefresh() { + if (cachedOccupiedSpace == null || cachedOccupiedSpace.length != folders.size()) { + return true; + } + if (lastRefreshTimeMs < 0 || selectionsSinceRefresh >= refreshSelectionThreshold) { + return true; + } + return System.currentTimeMillis() - lastRefreshTimeMs >= refreshIntervalMs; + } + + /** Recompute the occupied space of every folder and reset the incremental state. */ + private void refreshOccupiedSpace() { + if (cachedOccupiedSpace == null || cachedOccupiedSpace.length != folders.size()) { + cachedOccupiedSpace = new long[folders.size()]; + } + for (int i = 0; i < folders.size(); i++) { + String folder = folders.get(i); + if (isUnavailableFolder(folder) || !JVMCommonUtils.hasSpace(folder)) { + // Folder is not a selection candidate; keep it deprioritized without paying for a walk. + cachedOccupiedSpace[i] = Long.MAX_VALUE; + continue; + } + try { + cachedOccupiedSpace[i] = JVMCommonUtils.getOccupiedSpace(folder); + } catch (IOException e) { + LOGGER.error(UtilMessages.CANNOT_CALCULATE_OCCUPIED_SPACE, folder, e); + cachedOccupiedSpace[i] = Long.MAX_VALUE; + } + } + selectionsSinceRefresh = 0; + lastRefreshTimeMs = System.currentTimeMillis(); + } + + @TestOnly + public void setRefreshIntervalMs(long refreshIntervalMs) { + this.refreshIntervalMs = refreshIntervalMs; + } + + @TestOnly + public void setRefreshSelectionThreshold(int refreshSelectionThreshold) { + this.refreshSelectionThreshold = refreshSelectionThreshold; + } + + /** Forces the next selection to recompute the occupied space of every folder. */ + @TestOnly + public synchronized void invalidateCache() { + this.lastRefreshTimeMs = -1; + } + + @TestOnly + public synchronized int getSelectionsSinceRefresh() { + return selectionsSinceRefresh; + } } diff --git a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/disk/MinFolderOccupiedSpaceFirstStrategyRealFsTest.java b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/disk/MinFolderOccupiedSpaceFirstStrategyRealFsTest.java new file mode 100644 index 00000000000..6f4797233c5 --- /dev/null +++ b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/disk/MinFolderOccupiedSpaceFirstStrategyRealFsTest.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.commons.disk; + +import org.apache.iotdb.commons.conf.CommonDescriptor; +import org.apache.iotdb.commons.disk.strategy.DirectoryStrategyType; +import org.apache.iotdb.commons.disk.strategy.MinFolderOccupiedSpaceFirstStrategy; +import org.apache.iotdb.commons.exception.DiskSpaceInsufficientException; +import org.apache.iotdb.commons.utils.JVMCommonUtils; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Comparator; +import java.util.List; +import java.util.stream.Stream; + +import static org.junit.Assert.assertEquals; + +/** + * Integration test that drives {@link MinFolderOccupiedSpaceFirstStrategy} and {@link + * FolderManager} against the real filesystem (no mocking). It exercises the actual {@code + * Files.walk}-based occupied-space computation and verifies both the selection semantics and the + * caching behaviour that fixes the snapshot-receive hotspot (a full directory scan on every single + * selection). + */ +public class MinFolderOccupiedSpaceFirstStrategyRealFsTest { + + private final List<Path> tempDirs = new ArrayList<>(); + private List<String> folders; + private double originalThreshold; + + @Before + public void setUp() throws IOException { + // The temp dirs live on the test machine's disk; make hasSpace() deterministic regardless of + // how full that disk happens to be. + originalThreshold = CommonDescriptor.getInstance().getConfig().getDiskSpaceWarningThreshold(); + JVMCommonUtils.setDiskSpaceWarningThreshold(0.0); + + folders = new ArrayList<>(); + for (int i = 0; i < 3; i++) { + Path dir = Files.createTempDirectory("min-occupied-strategy-" + i + "-"); + tempDirs.add(dir); + folders.add(dir.toFile().getAbsolutePath()); + } + } + + @After + public void tearDown() throws IOException { + JVMCommonUtils.setDiskSpaceWarningThreshold(originalThreshold); + for (Path dir : tempDirs) { + if (!Files.exists(dir)) { + continue; + } + try (Stream<Path> walk = Files.walk(dir)) { + walk.sorted(Comparator.reverseOrder()).map(Path::toFile).forEach(File::delete); + } + } + } + + private void writeFile(int folderIndex, String name, int sizeBytes) throws IOException { + File file = new File(folders.get(folderIndex), name); + byte[] payload = new byte[sizeBytes]; + Arrays.fill(payload, (byte) 1); + Files.write(file.toPath(), payload); + } + + @Test + public void selectsLeastOccupiedRealDirectory() + throws DiskSpaceInsufficientException, IOException { + // Folder 2 holds a real 1 MiB file, folders 0 and 1 are empty. + writeFile(2, "occupied.bin", 1024 * 1024); + + FolderManager folderManager = + new FolderManager(folders, DirectoryStrategyType.MIN_FOLDER_OCCUPIED_SPACE_FIRST_STRATEGY); + + // The least occupied folder (folder 0, ties broken by index) must be chosen. + assertEquals(folders.get(0), folderManager.getNextFolder()); + } + + @Test + public void cachesOccupiedSpaceAndRefreshesAgainstRealFiles() + throws DiskSpaceInsufficientException, IOException { + MinFolderOccupiedSpaceFirstStrategy strategy = new MinFolderOccupiedSpaceFirstStrategy(); + // Disable the time-based refresh so the count-based refresh is the only trigger under test. + strategy.setRefreshIntervalMs(Long.MAX_VALUE); + strategy.setRefreshSelectionThreshold(2); + strategy.setFolders(folders); + + // All folders are empty on disk, so folder 0 is selected and the cache is built. + assertEquals(0, strategy.nextFolderIndex()); + + // Make folder 0 the most occupied folder on disk by writing a real 2 MiB file into it. + writeFile(0, "big.bin", 2 * 1024 * 1024); + + // The cache is still valid (threshold not reached), so the selection must not change though + // folder 0 is now the largest: this proves the strategy did not re-walk the directory tree. + assertEquals(0, strategy.nextFolderIndex()); + + // The threshold is now reached: the next selection refreshes the cache via a real walk + // and correctly avoids the now-largest folder 0, picking the least occupied folder 1. + assertEquals(1, strategy.nextFolderIndex()); + } +}
