This is an automated email from the ASF dual-hosted git repository.

shuwenwei pushed a commit to branch loadModificationFileWithMemControl
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 55606d4ef4f5ccb740cbca6833aceaede4c60b27
Author: shuwenwei <[email protected]>
AuthorDate: Fri May 29 09:58:08 2026 +0800

    Improve query modification loading memory control
---
 .../fragment/QueryModificationLoader.java          | 235 +++++++++++----------
 .../fragment/QueryModificationLoaderTest.java      |  95 ++++++++-
 2 files changed, 209 insertions(+), 121 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/QueryModificationLoader.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/QueryModificationLoader.java
index b8d3e321838..bdb1914a354 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/QueryModificationLoader.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/QueryModificationLoader.java
@@ -76,36 +76,31 @@ class QueryModificationLoader implements AutoCloseable {
   }
 
   List<ModEntry> getPathModifications() throws IllegalPathException {
-    AtomicReference<LoadedModsResult> loadedResult = new AtomicReference<>();
+    AtomicReference<LoadModsResult> loadedResult = new AtomicReference<>();
     PatternTreeMap<ModEntry, PatternTreeMapFactory.ModsSerializer> cachedMods =
         fileModCache.computeIfAbsent(
             resource.getTsFileID(), ignored -> 
loadAllModificationsForCache(loadedResult));
     if (cachedMods != null) {
-      // Either this loader cached the full tree successfully, or another 
loader had cached it.
       return modsTreeMatcher.match(cachedMods);
     }
 
-    LoadedModsResult result = loadedResult.get();
-    if (result.loadedAllModEntries) {
-      return fallbackByMatchLoadedPatternTree(result);
-    } else {
-      return fallbackByMatchedScan(result);
+    LoadModsResult result = loadedResult.get();
+    try {
+      if (result.loadedAllModEntries) {
+        return fallbackByMatchLoadedPatternTree(result);
+      } else {
+        return fallbackByMatchedScan(result);
+      }
+    } finally {
+      close();
     }
   }
 
   private PatternTreeMap<ModEntry, PatternTreeMapFactory.ModsSerializer>
-      loadAllModificationsForCache(AtomicReference<LoadedModsResult> 
loadedResult) {
-    LoadedModsResult result = loadAllModificationsWithQuotaControl();
+      loadAllModificationsForCache(AtomicReference<LoadModsResult> 
loadedResult) {
+    LoadModsResult result = loadAllModificationsWithQuotaControl();
     loadedResult.set(result);
-    if (!result.loadedAllModEntries) {
-      return null;
-    }
-
-    try {
-      // The cache quota has already been claimed while loading. Reserve real 
query memory only
-      // once, just before the fully loaded tree becomes visible in 
fileModCache.
-      reserveMemoryImmediately(result.cacheQuotaBytes);
-    } catch (MemoryNotEnoughException e) {
+    if (!result.cacheable) {
       return null;
     }
 
@@ -113,57 +108,80 @@ class QueryModificationLoader implements AutoCloseable {
     return result.mods;
   }
 
-  private LoadedModsResult loadAllModificationsWithQuotaControl() {
+  private LoadModsResult loadAllModificationsWithQuotaControl() {
     PatternTreeMap<ModEntry, PatternTreeMapFactory.ModsSerializer> 
modifications =
         PatternTreeMapFactory.getModsPatternTreeMap();
+    LoadModsResult result = new LoadModsResult(modifications);
+    if (resource.getTotalModSizeInByte() > getRemainingCacheQuota()) {
+      currentIterator = resource.getModEntryIterator();
+      result.loadedAllModEntries = false;
+      result.cacheable = false;
+      return result;
+    }
+
     currentIterator = resource.getModEntryIterator();
 
-    long claimedCacheQuotaBytes = 0;
-    int readModCount = 0;
+    int appendedModCount = 0;
     boolean estimatedAfterLastAppend = false;
 
     while (currentIterator.hasNext()) {
       ModEntry modification = currentIterator.next();
-      readModCount++;
-      if (!queryContext.shouldSkipModification(modification)) {
-        modifications.append(modification.keyOfPatternTree(), modification);
-        estimatedAfterLastAppend = false;
+      if (queryContext.shouldSkipModification(modification)) {
+        continue;
       }
 
-      if (readModCount % modsMemoryEstimateReadInterval == 0) {
-        long currentEstimatedSize = estimateModsTreeMemory(modifications);
-        checkEstimatedSizeNotDecreased(claimedCacheQuotaBytes, 
currentEstimatedSize);
-        long delta = currentEstimatedSize - claimedCacheQuotaBytes;
-        if (!tryClaimCacheQuota(delta)) {
-          return new LoadedModsResult(modifications, claimedCacheQuotaBytes, 
false);
+      modifications.append(modification.keyOfPatternTree(), modification);
+      appendedModCount++;
+      estimatedAfterLastAppend = false;
+
+      if (appendedModCount % modsMemoryEstimateReadInterval == 0) {
+        if (!tryEstimateAndReserveTreeMemory(result)) {
+          result.loadedAllModEntries = false;
+          result.cacheable = false;
+          return result;
         }
-        claimedCacheQuotaBytes = currentEstimatedSize;
         estimatedAfterLastAppend = true;
       }
     }
 
     if (!estimatedAfterLastAppend) {
-      long finalEstimatedSize = estimateModsTreeMemory(modifications);
-      checkEstimatedSizeNotDecreased(claimedCacheQuotaBytes, 
finalEstimatedSize);
-      long delta = finalEstimatedSize - claimedCacheQuotaBytes;
-      if (!tryClaimCacheQuota(delta)) {
-        return new LoadedModsResult(modifications, claimedCacheQuotaBytes, 
false);
-      }
-      claimedCacheQuotaBytes = finalEstimatedSize;
+      result.cacheable = tryEstimateAndReserveTreeMemory(result);
+    } else {
+      result.cacheable = true;
     }
-    return new LoadedModsResult(modifications, claimedCacheQuotaBytes, true);
+
+    result.loadedAllModEntries = true;
+    return result;
   }
 
-  private void checkEstimatedSizeNotDecreased(
-      long previousEstimatedSize, long currentEstimatedSize) {
-    if (currentEstimatedSize < previousEstimatedSize) {
+  private boolean tryEstimateAndReserveTreeMemory(LoadModsResult result) {
+    long currentEstimatedSize = estimateModsTreeMemory(result.mods);
+    long delta = currentEstimatedSize - result.reservedTreeMemoryBytes;
+    if (delta < 0) {
       throw new IllegalStateException(
           String.format(
               DataNodeQueryMessages.ESTIMATED_MODS_TREE_SIZE_DECREASED,
-              previousEstimatedSize,
+              result.reservedTreeMemoryBytes,
               currentEstimatedSize,
               resource));
     }
+    if (delta == 0) {
+      return true;
+    }
+
+    if (!tryClaimCacheQuota(delta)) {
+      return false;
+    }
+    result.cacheQuotaBytes += delta;
+
+    try {
+      memoryReservationManager.reserveMemoryImmediately(delta);
+    } catch (MemoryNotEnoughException e) {
+      return false;
+    }
+
+    result.reservedTreeMemoryBytes = currentEstimatedSize;
+    return true;
   }
 
   private boolean tryClaimCacheQuota(long delta) {
@@ -171,10 +189,8 @@ class QueryModificationLoader implements AutoCloseable {
       return true;
     }
 
-    // During loading, this only claims FI-level cache quota. Real memory is 
reserved once after the
-    // full tree is loaded and before computeIfAbsent publishes it to 
fileModCache.
     long alreadyUsedMemoryForCachedModEntries = cachedModEntriesSize.get();
-    while (alreadyUsedMemoryForCachedModEntries + delta < 
modsCacheSizeLimitPerFI) {
+    while (alreadyUsedMemoryForCachedModEntries + delta <= 
modsCacheSizeLimitPerFI) {
       if (cachedModEntriesSize.compareAndSet(
           alreadyUsedMemoryForCachedModEntries, 
alreadyUsedMemoryForCachedModEntries + delta)) {
         return true;
@@ -184,73 +200,78 @@ class QueryModificationLoader implements AutoCloseable {
     return false;
   }
 
-  private List<ModEntry> fallbackByMatchedScan(LoadedModsResult partialTree)
+  private long getRemainingCacheQuota() {
+    return modsCacheSizeLimitPerFI - cachedModEntriesSize.get();
+  }
+
+  private List<ModEntry> fallbackByMatchedScan(LoadModsResult partialTree)
       throws IllegalPathException {
-    try {
-      // The full tree exceeded the FI mods cache quota. Reuse the partial 
tree for the part that
-      // has already been read, then continue scanning the same iterator by 
path.
-      List<ModEntry> matchedMods;
-      try {
-        matchedMods = new ArrayList<>(modsTreeMatcher.match(partialTree.mods));
-      } finally {
-        partialTree.mods = null;
-        releaseClaimedCacheQuota(partialTree);
-      }
-      reserveMatchedModsMemory(matchedMods);
+    List<ModEntry> matchedMods = matchLoadedTreeAndRelease(partialTree);
+    long reservedMatchedModsMemoryBytes = 
reserveMatchedModsMemory(matchedMods);
+    int matchedModCount = matchedMods.size();
 
-      while (currentIterator.hasNext()) {
-        ModEntry modification = currentIterator.next();
-        if (queryContext.shouldSkipModification(modification)) {
-          continue;
-        }
-        if (modificationMatcher.test(modification)) {
-          reserveMatchedModMemory(modification);
-          matchedMods.add(modification);
+    while (currentIterator.hasNext()) {
+      ModEntry modification = currentIterator.next();
+      if (queryContext.shouldSkipModification(modification)) {
+        continue;
+      }
+      if (modificationMatcher.test(modification)) {
+        matchedMods.add(modification);
+        matchedModCount++;
+        if (matchedModCount % modsMemoryEstimateReadInterval == 0) {
+          reservedMatchedModsMemoryBytes =
+              reserveMatchedModsMemoryIncrementally(matchedMods, 
reservedMatchedModsMemoryBytes);
         }
       }
-
-      matchedMods = ModificationUtils.sortAndMerge(matchedMods);
-      return matchedMods;
-    } finally {
-      close();
     }
+
+    List<ModEntry> sortedAndMergedMods = 
ModificationUtils.sortAndMerge(matchedMods);
+    adjustMatchedModsMemoryReservation(sortedAndMergedMods, 
reservedMatchedModsMemoryBytes);
+    return sortedAndMergedMods;
+  }
+
+  private List<ModEntry> fallbackByMatchLoadedPatternTree(LoadModsResult 
loadedTree)
+      throws IllegalPathException {
+    List<ModEntry> matchedMods = matchLoadedTreeAndRelease(loadedTree);
+    reserveMatchedModsMemory(matchedMods);
+    return matchedMods;
   }
 
-  private List<ModEntry> fallbackByMatchLoadedPatternTree(LoadedModsResult 
loadedTree)
+  private List<ModEntry> matchLoadedTreeAndRelease(LoadModsResult loadedTree)
       throws IllegalPathException {
     try {
-      // The tree was fully loaded but not cached, usually because final 
memory reservation failed.
-      // Return only the matched mods and release the cache quota claimed 
during loading.
-      List<ModEntry> matchedMods;
-      try {
-        matchedMods = new ArrayList<>(modsTreeMatcher.match(loadedTree.mods));
-      } finally {
-        loadedTree.mods = null;
-        releaseClaimedCacheQuota(loadedTree);
-      }
-      reserveMatchedModsMemory(matchedMods);
-      return matchedMods;
+      return new ArrayList<>(modsTreeMatcher.match(loadedTree.mods));
     } finally {
-      close();
+      loadedTree.mods = null;
+      cachedModEntriesSize.addAndGet(-loadedTree.cacheQuotaBytes);
+      loadedTree.cacheQuotaBytes = 0;
+      
memoryReservationManager.releaseMemoryCumulatively(loadedTree.reservedTreeMemoryBytes);
+      loadedTree.reservedTreeMemoryBytes = 0;
     }
   }
 
-  private void reserveMatchedModsMemory(List<ModEntry> matchedMods) {
-    reserveMemoryImmediately(RamUsageEstimator.shallowSizeOf(matchedMods));
-    for (ModEntry matchedMod : matchedMods) {
-      reserveMatchedModMemory(matchedMod);
-    }
+  private long reserveMatchedModsMemory(List<ModEntry> matchedMods) {
+    long estimatedSize = RamUsageEstimator.sizeOfArrayList(matchedMods);
+    memoryReservationManager.reserveMemoryCumulatively(estimatedSize);
+    return estimatedSize;
   }
 
-  private void reserveMatchedModMemory(ModEntry matchedMod) {
-    reserveMemoryImmediately(RamUsageEstimator.NUM_BYTES_OBJECT_REF + 
matchedMod.ramBytesUsed());
+  private long reserveMatchedModsMemoryIncrementally(
+      List<ModEntry> matchedMods, long reservedMatchedModsMemoryBytes) {
+    long currentEstimatedSize = RamUsageEstimator.sizeOfArrayList(matchedMods);
+    long delta = currentEstimatedSize - reservedMatchedModsMemoryBytes;
+    memoryReservationManager.reserveMemoryCumulatively(delta);
+    return currentEstimatedSize;
   }
 
-  private void reserveMemoryImmediately(long bytes) {
-    if (bytes > 0) {
-      synchronized (memoryReservationManager) {
-        memoryReservationManager.reserveMemoryImmediately(bytes);
-      }
+  private void adjustMatchedModsMemoryReservation(
+      List<ModEntry> matchedMods, long reservedMatchedModsMemoryBytes) {
+    long currentEstimatedSize = RamUsageEstimator.sizeOfArrayList(matchedMods);
+    long delta = currentEstimatedSize - reservedMatchedModsMemoryBytes;
+    if (delta >= 0) {
+      memoryReservationManager.reserveMemoryCumulatively(delta);
+    } else {
+      memoryReservationManager.releaseMemoryCumulatively(-delta);
     }
   }
 
@@ -271,26 +292,16 @@ class QueryModificationLoader implements AutoCloseable {
     }
   }
 
-  private void releaseClaimedCacheQuota(LoadedModsResult loadedResult) {
-    if (loadedResult.cacheQuotaBytes > 0) {
-      cachedModEntriesSize.addAndGet(-loadedResult.cacheQuotaBytes);
-      loadedResult.cacheQuotaBytes = 0;
-    }
-  }
-
-  private static class LoadedModsResult {
+  private static class LoadModsResult {
 
     private PatternTreeMap<ModEntry, PatternTreeMapFactory.ModsSerializer> 
mods;
     private long cacheQuotaBytes;
-    private final boolean loadedAllModEntries;
+    private long reservedTreeMemoryBytes;
+    private boolean loadedAllModEntries;
+    private boolean cacheable;
 
-    private LoadedModsResult(
-        PatternTreeMap<ModEntry, PatternTreeMapFactory.ModsSerializer> mods,
-        long cacheQuotaBytes,
-        boolean loadedAllModEntries) {
+    private LoadModsResult(PatternTreeMap<ModEntry, 
PatternTreeMapFactory.ModsSerializer> mods) {
       this.mods = mods;
-      this.cacheQuotaBytes = cacheQuotaBytes;
-      this.loadedAllModEntries = loadedAllModEntries;
     }
   }
 
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/fragment/QueryModificationLoaderTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/fragment/QueryModificationLoaderTest.java
index ec889a053e7..39b9c41447c 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/fragment/QueryModificationLoaderTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/fragment/QueryModificationLoaderTest.java
@@ -46,7 +46,6 @@ import java.util.concurrent.atomic.AtomicLong;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
 
 public class QueryModificationLoaderTest {
@@ -90,12 +89,13 @@ public class QueryModificationLoaderTest {
       assertTrue(fileModCache.containsKey(resource.getTsFileID()));
       assertTrue(cachedModEntriesSize.get() > 0);
       assertTrue(memoryReservationManager.getReservedBytes() >= 
cachedModEntriesSize.get());
+      assertTrue(memoryReservationManager.getImmediateReservationCount() > 0);
     }
   }
 
   @Test
-  public void testFallbackScansRemainingModsWhenQuotaExceeded() throws 
Exception {
-    TsFileResource resource = prepareResource("fallback");
+  public void 
testFallbackScansModsWhenFileSizeExceedsRemainingQuotaBeforeLoad() throws 
Exception {
+    TsFileResource resource = prepareResource("file-size-precheck-fallback");
     writeMods(
         resource,
         new TreeDeletionEntry(new MeasurementPath("root.sg.d1.s1"), 0, 10),
@@ -117,6 +117,40 @@ public class QueryModificationLoaderTest {
       assertEquals(0, cachedModEntriesSize.get());
       assertTrue(memoryReservationManager.getReservedBytes() > 0);
       assertEquals(0, 
memoryReservationManager.getRemainingImmediateFailures());
+      assertEquals(0, memoryReservationManager.getImmediateReservationCount());
+    }
+  }
+
+  @Test
+  public void testFallbackScansRemainingModsWhenEstimatedTreeExceedsQuota() 
throws Exception {
+    TsFileResource resource = prepareResource("estimated-tree-quota-fallback");
+    writeMods(
+        resource,
+        new TreeDeletionEntry(new MeasurementPath("root.sg.d1.s1"), 0, 10),
+        new TreeDeletionEntry(new MeasurementPath("root.sg.d2.s1"), 20, 30),
+        new TreeDeletionEntry(new MeasurementPath("root.sg.d1.s1"), 40, 50));
+
+    Map<TsFileID, PatternTreeMap<ModEntry, 
PatternTreeMapFactory.ModsSerializer>> fileModCache =
+        new ConcurrentHashMap<>();
+    AtomicLong cachedModEntriesSize = new AtomicLong();
+    CountingMemoryReservationManager memoryReservationManager =
+        new CountingMemoryReservationManager();
+
+    try (QueryModificationLoader loader =
+        newLoader(
+            resource,
+            resource.getTotalModSizeInByte() + 1,
+            fileModCache,
+            cachedModEntriesSize,
+            memoryReservationManager,
+            1)) {
+      List<ModEntry> result = loader.getPathModifications();
+
+      assertEquals(2, result.size());
+      assertFalse(fileModCache.containsKey(resource.getTsFileID()));
+      assertEquals(0, cachedModEntriesSize.get());
+      assertTrue(memoryReservationManager.getReservedBytes() > 0);
+      assertTrue(memoryReservationManager.getCumulativeReleaseCount() > 0);
     }
   }
 
@@ -142,7 +176,7 @@ public class QueryModificationLoaderTest {
             fileModCache,
             cachedModEntriesSize,
             memoryReservationManager,
-            1)) {
+            100)) {
       List<ModEntry> result = loader.getPathModifications();
 
       assertEquals(2, result.size());
@@ -150,12 +184,13 @@ public class QueryModificationLoaderTest {
       assertEquals(0, cachedModEntriesSize.get());
       assertTrue(memoryReservationManager.getReservedBytes() > 0);
       assertEquals(0, 
memoryReservationManager.getRemainingImmediateFailures());
+      assertEquals(1, memoryReservationManager.getImmediateReservationCount());
     }
   }
 
   @Test
-  public void testFallbackThrowsWhenMatchedModsReservationFailed() throws 
Exception {
-    TsFileResource resource = prepareResource("fallback-reserve-failed");
+  public void testFallbackReservesMatchedModsCumulativelyWhenQuotaExceeded() 
throws Exception {
+    TsFileResource resource = prepareResource("fallback-cumulative-reserve");
     writeMods(
         resource,
         new TreeDeletionEntry(new MeasurementPath("root.sg.d1.s1"), 0, 10),
@@ -170,12 +205,41 @@ public class QueryModificationLoaderTest {
 
     try (QueryModificationLoader loader =
         newLoader(resource, 1, fileModCache, cachedModEntriesSize, 
memoryReservationManager, 1)) {
-      assertThrows(MemoryNotEnoughException.class, 
loader::getPathModifications);
+      List<ModEntry> result = loader.getPathModifications();
 
+      assertEquals(2, result.size());
       assertFalse(fileModCache.containsKey(resource.getTsFileID()));
       assertEquals(0, cachedModEntriesSize.get());
-      assertEquals(0, memoryReservationManager.getReservedBytes());
-      assertEquals(0, 
memoryReservationManager.getRemainingImmediateFailures());
+      assertTrue(memoryReservationManager.getReservedBytes() > 0);
+      assertEquals(1, 
memoryReservationManager.getRemainingImmediateFailures());
+      assertEquals(0, memoryReservationManager.getImmediateReservationCount());
+    }
+  }
+
+  @Test
+  public void testFallbackAdjustsReservedMemoryAfterSortAndMerge() throws 
Exception {
+    TsFileResource resource = prepareResource("fallback-sort-merge-adjust");
+    writeMods(
+        resource,
+        new TreeDeletionEntry(new MeasurementPath("root.sg.d1.s1"), 0, 10),
+        new TreeDeletionEntry(new MeasurementPath("root.sg.d1.s1"), 5, 15),
+        new TreeDeletionEntry(new MeasurementPath("root.sg.d2.s1"), 20, 30));
+
+    Map<TsFileID, PatternTreeMap<ModEntry, 
PatternTreeMapFactory.ModsSerializer>> fileModCache =
+        new ConcurrentHashMap<>();
+    AtomicLong cachedModEntriesSize = new AtomicLong();
+    CountingMemoryReservationManager memoryReservationManager =
+        new CountingMemoryReservationManager();
+
+    try (QueryModificationLoader loader =
+        newLoader(resource, 1, fileModCache, cachedModEntriesSize, 
memoryReservationManager, 1)) {
+      List<ModEntry> result = loader.getPathModifications();
+
+      assertEquals(1, result.size());
+      assertFalse(fileModCache.containsKey(resource.getTsFileID()));
+      assertEquals(0, cachedModEntriesSize.get());
+      assertTrue(memoryReservationManager.getReservedBytes() > 0);
+      assertTrue(memoryReservationManager.getCumulativeReleaseCount() > 0);
     }
   }
 
@@ -220,6 +284,8 @@ public class QueryModificationLoaderTest {
 
     private long reservedBytes;
     private int remainingImmediateFailures;
+    private int immediateReservationCount;
+    private int cumulativeReleaseCount;
 
     private CountingMemoryReservationManager() {}
 
@@ -234,6 +300,7 @@ public class QueryModificationLoaderTest {
 
     @Override
     public void reserveMemoryImmediately() {
+      immediateReservationCount++;
       if (remainingImmediateFailures > 0) {
         remainingImmediateFailures--;
         throw new MemoryNotEnoughException("Mock memory reservation failure.");
@@ -242,6 +309,7 @@ public class QueryModificationLoaderTest {
 
     @Override
     public void reserveMemoryImmediately(long size) {
+      immediateReservationCount++;
       if (remainingImmediateFailures > 0) {
         remainingImmediateFailures--;
         throw new MemoryNotEnoughException("Mock memory reservation failure.");
@@ -251,6 +319,7 @@ public class QueryModificationLoaderTest {
 
     @Override
     public void releaseMemoryCumulatively(long size) {
+      cumulativeReleaseCount++;
       reservedBytes -= size;
     }
 
@@ -277,5 +346,13 @@ public class QueryModificationLoaderTest {
     private int getRemainingImmediateFailures() {
       return remainingImmediateFailures;
     }
+
+    private int getImmediateReservationCount() {
+      return immediateReservationCount;
+    }
+
+    private int getCumulativeReleaseCount() {
+      return cumulativeReleaseCount;
+    }
   }
 }

Reply via email to