This is an automated email from the ASF dual-hosted git repository. shuwenwei pushed a commit to branch loadModificationFileWithMemControl in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 55606d4ef4f5ccb740cbca6833aceaede4c60b27 Author: shuwenwei <[email protected]> AuthorDate: Fri May 29 09:58:08 2026 +0800 Improve query modification loading memory control --- .../fragment/QueryModificationLoader.java | 235 +++++++++++---------- .../fragment/QueryModificationLoaderTest.java | 95 ++++++++- 2 files changed, 209 insertions(+), 121 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/QueryModificationLoader.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/QueryModificationLoader.java index b8d3e321838..bdb1914a354 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/QueryModificationLoader.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/QueryModificationLoader.java @@ -76,36 +76,31 @@ class QueryModificationLoader implements AutoCloseable { } List<ModEntry> getPathModifications() throws IllegalPathException { - AtomicReference<LoadedModsResult> loadedResult = new AtomicReference<>(); + AtomicReference<LoadModsResult> loadedResult = new AtomicReference<>(); PatternTreeMap<ModEntry, PatternTreeMapFactory.ModsSerializer> cachedMods = fileModCache.computeIfAbsent( resource.getTsFileID(), ignored -> loadAllModificationsForCache(loadedResult)); if (cachedMods != null) { - // Either this loader cached the full tree successfully, or another loader had cached it. return modsTreeMatcher.match(cachedMods); } - LoadedModsResult result = loadedResult.get(); - if (result.loadedAllModEntries) { - return fallbackByMatchLoadedPatternTree(result); - } else { - return fallbackByMatchedScan(result); + LoadModsResult result = loadedResult.get(); + try { + if (result.loadedAllModEntries) { + return fallbackByMatchLoadedPatternTree(result); + } else { + return fallbackByMatchedScan(result); + } + } finally { + close(); } } private PatternTreeMap<ModEntry, PatternTreeMapFactory.ModsSerializer> - loadAllModificationsForCache(AtomicReference<LoadedModsResult> loadedResult) { - LoadedModsResult result = loadAllModificationsWithQuotaControl(); + loadAllModificationsForCache(AtomicReference<LoadModsResult> loadedResult) { + LoadModsResult result = loadAllModificationsWithQuotaControl(); loadedResult.set(result); - if (!result.loadedAllModEntries) { - return null; - } - - try { - // The cache quota has already been claimed while loading. Reserve real query memory only - // once, just before the fully loaded tree becomes visible in fileModCache. - reserveMemoryImmediately(result.cacheQuotaBytes); - } catch (MemoryNotEnoughException e) { + if (!result.cacheable) { return null; } @@ -113,57 +108,80 @@ class QueryModificationLoader implements AutoCloseable { return result.mods; } - private LoadedModsResult loadAllModificationsWithQuotaControl() { + private LoadModsResult loadAllModificationsWithQuotaControl() { PatternTreeMap<ModEntry, PatternTreeMapFactory.ModsSerializer> modifications = PatternTreeMapFactory.getModsPatternTreeMap(); + LoadModsResult result = new LoadModsResult(modifications); + if (resource.getTotalModSizeInByte() > getRemainingCacheQuota()) { + currentIterator = resource.getModEntryIterator(); + result.loadedAllModEntries = false; + result.cacheable = false; + return result; + } + currentIterator = resource.getModEntryIterator(); - long claimedCacheQuotaBytes = 0; - int readModCount = 0; + int appendedModCount = 0; boolean estimatedAfterLastAppend = false; while (currentIterator.hasNext()) { ModEntry modification = currentIterator.next(); - readModCount++; - if (!queryContext.shouldSkipModification(modification)) { - modifications.append(modification.keyOfPatternTree(), modification); - estimatedAfterLastAppend = false; + if (queryContext.shouldSkipModification(modification)) { + continue; } - if (readModCount % modsMemoryEstimateReadInterval == 0) { - long currentEstimatedSize = estimateModsTreeMemory(modifications); - checkEstimatedSizeNotDecreased(claimedCacheQuotaBytes, currentEstimatedSize); - long delta = currentEstimatedSize - claimedCacheQuotaBytes; - if (!tryClaimCacheQuota(delta)) { - return new LoadedModsResult(modifications, claimedCacheQuotaBytes, false); + modifications.append(modification.keyOfPatternTree(), modification); + appendedModCount++; + estimatedAfterLastAppend = false; + + if (appendedModCount % modsMemoryEstimateReadInterval == 0) { + if (!tryEstimateAndReserveTreeMemory(result)) { + result.loadedAllModEntries = false; + result.cacheable = false; + return result; } - claimedCacheQuotaBytes = currentEstimatedSize; estimatedAfterLastAppend = true; } } if (!estimatedAfterLastAppend) { - long finalEstimatedSize = estimateModsTreeMemory(modifications); - checkEstimatedSizeNotDecreased(claimedCacheQuotaBytes, finalEstimatedSize); - long delta = finalEstimatedSize - claimedCacheQuotaBytes; - if (!tryClaimCacheQuota(delta)) { - return new LoadedModsResult(modifications, claimedCacheQuotaBytes, false); - } - claimedCacheQuotaBytes = finalEstimatedSize; + result.cacheable = tryEstimateAndReserveTreeMemory(result); + } else { + result.cacheable = true; } - return new LoadedModsResult(modifications, claimedCacheQuotaBytes, true); + + result.loadedAllModEntries = true; + return result; } - private void checkEstimatedSizeNotDecreased( - long previousEstimatedSize, long currentEstimatedSize) { - if (currentEstimatedSize < previousEstimatedSize) { + private boolean tryEstimateAndReserveTreeMemory(LoadModsResult result) { + long currentEstimatedSize = estimateModsTreeMemory(result.mods); + long delta = currentEstimatedSize - result.reservedTreeMemoryBytes; + if (delta < 0) { throw new IllegalStateException( String.format( DataNodeQueryMessages.ESTIMATED_MODS_TREE_SIZE_DECREASED, - previousEstimatedSize, + result.reservedTreeMemoryBytes, currentEstimatedSize, resource)); } + if (delta == 0) { + return true; + } + + if (!tryClaimCacheQuota(delta)) { + return false; + } + result.cacheQuotaBytes += delta; + + try { + memoryReservationManager.reserveMemoryImmediately(delta); + } catch (MemoryNotEnoughException e) { + return false; + } + + result.reservedTreeMemoryBytes = currentEstimatedSize; + return true; } private boolean tryClaimCacheQuota(long delta) { @@ -171,10 +189,8 @@ class QueryModificationLoader implements AutoCloseable { return true; } - // During loading, this only claims FI-level cache quota. Real memory is reserved once after the - // full tree is loaded and before computeIfAbsent publishes it to fileModCache. long alreadyUsedMemoryForCachedModEntries = cachedModEntriesSize.get(); - while (alreadyUsedMemoryForCachedModEntries + delta < modsCacheSizeLimitPerFI) { + while (alreadyUsedMemoryForCachedModEntries + delta <= modsCacheSizeLimitPerFI) { if (cachedModEntriesSize.compareAndSet( alreadyUsedMemoryForCachedModEntries, alreadyUsedMemoryForCachedModEntries + delta)) { return true; @@ -184,73 +200,78 @@ class QueryModificationLoader implements AutoCloseable { return false; } - private List<ModEntry> fallbackByMatchedScan(LoadedModsResult partialTree) + private long getRemainingCacheQuota() { + return modsCacheSizeLimitPerFI - cachedModEntriesSize.get(); + } + + private List<ModEntry> fallbackByMatchedScan(LoadModsResult partialTree) throws IllegalPathException { - try { - // The full tree exceeded the FI mods cache quota. Reuse the partial tree for the part that - // has already been read, then continue scanning the same iterator by path. - List<ModEntry> matchedMods; - try { - matchedMods = new ArrayList<>(modsTreeMatcher.match(partialTree.mods)); - } finally { - partialTree.mods = null; - releaseClaimedCacheQuota(partialTree); - } - reserveMatchedModsMemory(matchedMods); + List<ModEntry> matchedMods = matchLoadedTreeAndRelease(partialTree); + long reservedMatchedModsMemoryBytes = reserveMatchedModsMemory(matchedMods); + int matchedModCount = matchedMods.size(); - while (currentIterator.hasNext()) { - ModEntry modification = currentIterator.next(); - if (queryContext.shouldSkipModification(modification)) { - continue; - } - if (modificationMatcher.test(modification)) { - reserveMatchedModMemory(modification); - matchedMods.add(modification); + while (currentIterator.hasNext()) { + ModEntry modification = currentIterator.next(); + if (queryContext.shouldSkipModification(modification)) { + continue; + } + if (modificationMatcher.test(modification)) { + matchedMods.add(modification); + matchedModCount++; + if (matchedModCount % modsMemoryEstimateReadInterval == 0) { + reservedMatchedModsMemoryBytes = + reserveMatchedModsMemoryIncrementally(matchedMods, reservedMatchedModsMemoryBytes); } } - - matchedMods = ModificationUtils.sortAndMerge(matchedMods); - return matchedMods; - } finally { - close(); } + + List<ModEntry> sortedAndMergedMods = ModificationUtils.sortAndMerge(matchedMods); + adjustMatchedModsMemoryReservation(sortedAndMergedMods, reservedMatchedModsMemoryBytes); + return sortedAndMergedMods; + } + + private List<ModEntry> fallbackByMatchLoadedPatternTree(LoadModsResult loadedTree) + throws IllegalPathException { + List<ModEntry> matchedMods = matchLoadedTreeAndRelease(loadedTree); + reserveMatchedModsMemory(matchedMods); + return matchedMods; } - private List<ModEntry> fallbackByMatchLoadedPatternTree(LoadedModsResult loadedTree) + private List<ModEntry> matchLoadedTreeAndRelease(LoadModsResult loadedTree) throws IllegalPathException { try { - // The tree was fully loaded but not cached, usually because final memory reservation failed. - // Return only the matched mods and release the cache quota claimed during loading. - List<ModEntry> matchedMods; - try { - matchedMods = new ArrayList<>(modsTreeMatcher.match(loadedTree.mods)); - } finally { - loadedTree.mods = null; - releaseClaimedCacheQuota(loadedTree); - } - reserveMatchedModsMemory(matchedMods); - return matchedMods; + return new ArrayList<>(modsTreeMatcher.match(loadedTree.mods)); } finally { - close(); + loadedTree.mods = null; + cachedModEntriesSize.addAndGet(-loadedTree.cacheQuotaBytes); + loadedTree.cacheQuotaBytes = 0; + memoryReservationManager.releaseMemoryCumulatively(loadedTree.reservedTreeMemoryBytes); + loadedTree.reservedTreeMemoryBytes = 0; } } - private void reserveMatchedModsMemory(List<ModEntry> matchedMods) { - reserveMemoryImmediately(RamUsageEstimator.shallowSizeOf(matchedMods)); - for (ModEntry matchedMod : matchedMods) { - reserveMatchedModMemory(matchedMod); - } + private long reserveMatchedModsMemory(List<ModEntry> matchedMods) { + long estimatedSize = RamUsageEstimator.sizeOfArrayList(matchedMods); + memoryReservationManager.reserveMemoryCumulatively(estimatedSize); + return estimatedSize; } - private void reserveMatchedModMemory(ModEntry matchedMod) { - reserveMemoryImmediately(RamUsageEstimator.NUM_BYTES_OBJECT_REF + matchedMod.ramBytesUsed()); + private long reserveMatchedModsMemoryIncrementally( + List<ModEntry> matchedMods, long reservedMatchedModsMemoryBytes) { + long currentEstimatedSize = RamUsageEstimator.sizeOfArrayList(matchedMods); + long delta = currentEstimatedSize - reservedMatchedModsMemoryBytes; + memoryReservationManager.reserveMemoryCumulatively(delta); + return currentEstimatedSize; } - private void reserveMemoryImmediately(long bytes) { - if (bytes > 0) { - synchronized (memoryReservationManager) { - memoryReservationManager.reserveMemoryImmediately(bytes); - } + private void adjustMatchedModsMemoryReservation( + List<ModEntry> matchedMods, long reservedMatchedModsMemoryBytes) { + long currentEstimatedSize = RamUsageEstimator.sizeOfArrayList(matchedMods); + long delta = currentEstimatedSize - reservedMatchedModsMemoryBytes; + if (delta >= 0) { + memoryReservationManager.reserveMemoryCumulatively(delta); + } else { + memoryReservationManager.releaseMemoryCumulatively(-delta); } } @@ -271,26 +292,16 @@ class QueryModificationLoader implements AutoCloseable { } } - private void releaseClaimedCacheQuota(LoadedModsResult loadedResult) { - if (loadedResult.cacheQuotaBytes > 0) { - cachedModEntriesSize.addAndGet(-loadedResult.cacheQuotaBytes); - loadedResult.cacheQuotaBytes = 0; - } - } - - private static class LoadedModsResult { + private static class LoadModsResult { private PatternTreeMap<ModEntry, PatternTreeMapFactory.ModsSerializer> mods; private long cacheQuotaBytes; - private final boolean loadedAllModEntries; + private long reservedTreeMemoryBytes; + private boolean loadedAllModEntries; + private boolean cacheable; - private LoadedModsResult( - PatternTreeMap<ModEntry, PatternTreeMapFactory.ModsSerializer> mods, - long cacheQuotaBytes, - boolean loadedAllModEntries) { + private LoadModsResult(PatternTreeMap<ModEntry, PatternTreeMapFactory.ModsSerializer> mods) { this.mods = mods; - this.cacheQuotaBytes = cacheQuotaBytes; - this.loadedAllModEntries = loadedAllModEntries; } } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/fragment/QueryModificationLoaderTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/fragment/QueryModificationLoaderTest.java index ec889a053e7..39b9c41447c 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/fragment/QueryModificationLoaderTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/fragment/QueryModificationLoaderTest.java @@ -46,7 +46,6 @@ import java.util.concurrent.atomic.AtomicLong; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; public class QueryModificationLoaderTest { @@ -90,12 +89,13 @@ public class QueryModificationLoaderTest { assertTrue(fileModCache.containsKey(resource.getTsFileID())); assertTrue(cachedModEntriesSize.get() > 0); assertTrue(memoryReservationManager.getReservedBytes() >= cachedModEntriesSize.get()); + assertTrue(memoryReservationManager.getImmediateReservationCount() > 0); } } @Test - public void testFallbackScansRemainingModsWhenQuotaExceeded() throws Exception { - TsFileResource resource = prepareResource("fallback"); + public void testFallbackScansModsWhenFileSizeExceedsRemainingQuotaBeforeLoad() throws Exception { + TsFileResource resource = prepareResource("file-size-precheck-fallback"); writeMods( resource, new TreeDeletionEntry(new MeasurementPath("root.sg.d1.s1"), 0, 10), @@ -117,6 +117,40 @@ public class QueryModificationLoaderTest { assertEquals(0, cachedModEntriesSize.get()); assertTrue(memoryReservationManager.getReservedBytes() > 0); assertEquals(0, memoryReservationManager.getRemainingImmediateFailures()); + assertEquals(0, memoryReservationManager.getImmediateReservationCount()); + } + } + + @Test + public void testFallbackScansRemainingModsWhenEstimatedTreeExceedsQuota() throws Exception { + TsFileResource resource = prepareResource("estimated-tree-quota-fallback"); + writeMods( + resource, + new TreeDeletionEntry(new MeasurementPath("root.sg.d1.s1"), 0, 10), + new TreeDeletionEntry(new MeasurementPath("root.sg.d2.s1"), 20, 30), + new TreeDeletionEntry(new MeasurementPath("root.sg.d1.s1"), 40, 50)); + + Map<TsFileID, PatternTreeMap<ModEntry, PatternTreeMapFactory.ModsSerializer>> fileModCache = + new ConcurrentHashMap<>(); + AtomicLong cachedModEntriesSize = new AtomicLong(); + CountingMemoryReservationManager memoryReservationManager = + new CountingMemoryReservationManager(); + + try (QueryModificationLoader loader = + newLoader( + resource, + resource.getTotalModSizeInByte() + 1, + fileModCache, + cachedModEntriesSize, + memoryReservationManager, + 1)) { + List<ModEntry> result = loader.getPathModifications(); + + assertEquals(2, result.size()); + assertFalse(fileModCache.containsKey(resource.getTsFileID())); + assertEquals(0, cachedModEntriesSize.get()); + assertTrue(memoryReservationManager.getReservedBytes() > 0); + assertTrue(memoryReservationManager.getCumulativeReleaseCount() > 0); } } @@ -142,7 +176,7 @@ public class QueryModificationLoaderTest { fileModCache, cachedModEntriesSize, memoryReservationManager, - 1)) { + 100)) { List<ModEntry> result = loader.getPathModifications(); assertEquals(2, result.size()); @@ -150,12 +184,13 @@ public class QueryModificationLoaderTest { assertEquals(0, cachedModEntriesSize.get()); assertTrue(memoryReservationManager.getReservedBytes() > 0); assertEquals(0, memoryReservationManager.getRemainingImmediateFailures()); + assertEquals(1, memoryReservationManager.getImmediateReservationCount()); } } @Test - public void testFallbackThrowsWhenMatchedModsReservationFailed() throws Exception { - TsFileResource resource = prepareResource("fallback-reserve-failed"); + public void testFallbackReservesMatchedModsCumulativelyWhenQuotaExceeded() throws Exception { + TsFileResource resource = prepareResource("fallback-cumulative-reserve"); writeMods( resource, new TreeDeletionEntry(new MeasurementPath("root.sg.d1.s1"), 0, 10), @@ -170,12 +205,41 @@ public class QueryModificationLoaderTest { try (QueryModificationLoader loader = newLoader(resource, 1, fileModCache, cachedModEntriesSize, memoryReservationManager, 1)) { - assertThrows(MemoryNotEnoughException.class, loader::getPathModifications); + List<ModEntry> result = loader.getPathModifications(); + assertEquals(2, result.size()); assertFalse(fileModCache.containsKey(resource.getTsFileID())); assertEquals(0, cachedModEntriesSize.get()); - assertEquals(0, memoryReservationManager.getReservedBytes()); - assertEquals(0, memoryReservationManager.getRemainingImmediateFailures()); + assertTrue(memoryReservationManager.getReservedBytes() > 0); + assertEquals(1, memoryReservationManager.getRemainingImmediateFailures()); + assertEquals(0, memoryReservationManager.getImmediateReservationCount()); + } + } + + @Test + public void testFallbackAdjustsReservedMemoryAfterSortAndMerge() throws Exception { + TsFileResource resource = prepareResource("fallback-sort-merge-adjust"); + writeMods( + resource, + new TreeDeletionEntry(new MeasurementPath("root.sg.d1.s1"), 0, 10), + new TreeDeletionEntry(new MeasurementPath("root.sg.d1.s1"), 5, 15), + new TreeDeletionEntry(new MeasurementPath("root.sg.d2.s1"), 20, 30)); + + Map<TsFileID, PatternTreeMap<ModEntry, PatternTreeMapFactory.ModsSerializer>> fileModCache = + new ConcurrentHashMap<>(); + AtomicLong cachedModEntriesSize = new AtomicLong(); + CountingMemoryReservationManager memoryReservationManager = + new CountingMemoryReservationManager(); + + try (QueryModificationLoader loader = + newLoader(resource, 1, fileModCache, cachedModEntriesSize, memoryReservationManager, 1)) { + List<ModEntry> result = loader.getPathModifications(); + + assertEquals(1, result.size()); + assertFalse(fileModCache.containsKey(resource.getTsFileID())); + assertEquals(0, cachedModEntriesSize.get()); + assertTrue(memoryReservationManager.getReservedBytes() > 0); + assertTrue(memoryReservationManager.getCumulativeReleaseCount() > 0); } } @@ -220,6 +284,8 @@ public class QueryModificationLoaderTest { private long reservedBytes; private int remainingImmediateFailures; + private int immediateReservationCount; + private int cumulativeReleaseCount; private CountingMemoryReservationManager() {} @@ -234,6 +300,7 @@ public class QueryModificationLoaderTest { @Override public void reserveMemoryImmediately() { + immediateReservationCount++; if (remainingImmediateFailures > 0) { remainingImmediateFailures--; throw new MemoryNotEnoughException("Mock memory reservation failure."); @@ -242,6 +309,7 @@ public class QueryModificationLoaderTest { @Override public void reserveMemoryImmediately(long size) { + immediateReservationCount++; if (remainingImmediateFailures > 0) { remainingImmediateFailures--; throw new MemoryNotEnoughException("Mock memory reservation failure."); @@ -251,6 +319,7 @@ public class QueryModificationLoaderTest { @Override public void releaseMemoryCumulatively(long size) { + cumulativeReleaseCount++; reservedBytes -= size; } @@ -277,5 +346,13 @@ public class QueryModificationLoaderTest { private int getRemainingImmediateFailures() { return remainingImmediateFailures; } + + private int getImmediateReservationCount() { + return immediateReservationCount; + } + + private int getCumulativeReleaseCount() { + return cumulativeReleaseCount; + } } }
