This is an automated email from the ASF dual-hosted git repository.

shuwenwei pushed a commit to branch loadModificationFileWithMemControl
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit a880d42916cb3cfc8d5de1687ba35f611b363c14
Author: shuwenwei <[email protected]>
AuthorDate: Thu May 28 16:30:40 2026 +0800

    fix
---
 .../planner/memory/MemoryReservationManager.java   |   8 +
 .../iotdb/db/i18n/DataNodeQueryMessages.java       |   2 +
 .../iotdb/db/i18n/DataNodeQueryMessages.java       |   2 +
 .../fragment/QueryModificationLoader.java          |  74 +++---
 .../memory/FakedMemoryReservationManager.java      |   3 +
 .../NotThreadSafeMemoryReservationManager.java     |   9 +
 .../memory/ThreadSafeMemoryReservationManager.java |   5 +
 .../fragment/QueryModificationLoaderTest.java      | 281 +++++++++++++++++++++
 8 files changed, 349 insertions(+), 35 deletions(-)

diff --git 
a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/plan/planner/memory/MemoryReservationManager.java
 
b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/plan/planner/memory/MemoryReservationManager.java
index 9df8d52131b..b52534b4ec2 100644
--- 
a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/plan/planner/memory/MemoryReservationManager.java
+++ 
b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/plan/planner/memory/MemoryReservationManager.java
@@ -33,6 +33,14 @@ public interface MemoryReservationManager {
   /** Reserve memory for the accumulated memory size immediately. */
   void reserveMemoryImmediately();
 
+  /**
+   * Reserve memory for the given size immediately without changing the 
accumulated pending
+   * reservation size maintained by this manager.
+   *
+   * @param size the size of memory to reserve immediately
+   */
+  void reserveMemoryImmediately(final long size);
+
   /**
    * Release memory for the given size.
    *
diff --git 
a/iotdb-core/datanode/src/main/i18n/en/org/apache/iotdb/db/i18n/DataNodeQueryMessages.java
 
b/iotdb-core/datanode/src/main/i18n/en/org/apache/iotdb/db/i18n/DataNodeQueryMessages.java
index c1c26550f9d..1c3a0fd2bee 100644
--- 
a/iotdb-core/datanode/src/main/i18n/en/org/apache/iotdb/db/i18n/DataNodeQueryMessages.java
+++ 
b/iotdb-core/datanode/src/main/i18n/en/org/apache/iotdb/db/i18n/DataNodeQueryMessages.java
@@ -136,6 +136,8 @@ public final class DataNodeQueryMessages {
 
   public static final String FREE_MORE_MEMORY_THAN_HAS_BEEN_RESERVED =
       "Free more memory than has been reserved.";
+  public static final String ESTIMATED_MODS_TREE_SIZE_DECREASED =
+      "Estimated mods tree size decreased from %d to %d for TsFile %s.";
 
   // --- Execution / Operator ---
 
diff --git 
a/iotdb-core/datanode/src/main/i18n/zh/org/apache/iotdb/db/i18n/DataNodeQueryMessages.java
 
b/iotdb-core/datanode/src/main/i18n/zh/org/apache/iotdb/db/i18n/DataNodeQueryMessages.java
index 3d2783c81b6..53806d3f517 100644
--- 
a/iotdb-core/datanode/src/main/i18n/zh/org/apache/iotdb/db/i18n/DataNodeQueryMessages.java
+++ 
b/iotdb-core/datanode/src/main/i18n/zh/org/apache/iotdb/db/i18n/DataNodeQueryMessages.java
@@ -136,6 +136,8 @@ public final class DataNodeQueryMessages {
 
   public static final String FREE_MORE_MEMORY_THAN_HAS_BEEN_RESERVED =
       "释放的内存超过已预留的量。";
+  public static final String ESTIMATED_MODS_TREE_SIZE_DECREASED =
+      "估算的 mods tree 大小从 %d 减少到 %d,TsFile:%s。";
 
   // --- Execution / Operator ---
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/QueryModificationLoader.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/QueryModificationLoader.java
index 8895a958bb2..b8d3e321838 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/QueryModificationLoader.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/QueryModificationLoader.java
@@ -23,6 +23,7 @@ import 
org.apache.iotdb.calc.exception.MemoryNotEnoughException;
 import org.apache.iotdb.calc.plan.planner.memory.MemoryReservationManager;
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.path.PatternTreeMap;
+import org.apache.iotdb.db.i18n.DataNodeQueryMessages;
 import org.apache.iotdb.db.storageengine.dataregion.modification.ModEntry;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileID;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
@@ -31,6 +32,7 @@ import 
org.apache.iotdb.db.utils.datastructure.PatternTreeMapFactory;
 
 import org.apache.tsfile.utils.RamUsageEstimator;
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicLong;
@@ -118,43 +120,50 @@ class QueryModificationLoader implements AutoCloseable {
 
     long claimedCacheQuotaBytes = 0;
     int readModCount = 0;
+    boolean estimatedAfterLastAppend = false;
 
     while (currentIterator.hasNext()) {
       ModEntry modification = currentIterator.next();
       readModCount++;
       if (!queryContext.shouldSkipModification(modification)) {
         modifications.append(modification.keyOfPatternTree(), modification);
+        estimatedAfterLastAppend = false;
       }
 
       if (readModCount % modsMemoryEstimateReadInterval == 0) {
         long currentEstimatedSize = estimateModsTreeMemory(modifications);
-        if (currentEstimatedSize < claimedCacheQuotaBytes) {
-          throw new IllegalStateException(
-              String.format(
-                  "Estimated mods tree size decreased from %d to %d for TsFile 
%s.",
-                  claimedCacheQuotaBytes, currentEstimatedSize, resource));
-        }
+        checkEstimatedSizeNotDecreased(claimedCacheQuotaBytes, 
currentEstimatedSize);
         long delta = currentEstimatedSize - claimedCacheQuotaBytes;
         if (!tryClaimCacheQuota(delta)) {
           return new LoadedModsResult(modifications, claimedCacheQuotaBytes, 
false);
         }
         claimedCacheQuotaBytes = currentEstimatedSize;
+        estimatedAfterLastAppend = true;
       }
     }
 
-    long finalEstimatedSize = estimateModsTreeMemory(modifications);
-    if (finalEstimatedSize < claimedCacheQuotaBytes) {
+    if (!estimatedAfterLastAppend) {
+      long finalEstimatedSize = estimateModsTreeMemory(modifications);
+      checkEstimatedSizeNotDecreased(claimedCacheQuotaBytes, 
finalEstimatedSize);
+      long delta = finalEstimatedSize - claimedCacheQuotaBytes;
+      if (!tryClaimCacheQuota(delta)) {
+        return new LoadedModsResult(modifications, claimedCacheQuotaBytes, 
false);
+      }
+      claimedCacheQuotaBytes = finalEstimatedSize;
+    }
+    return new LoadedModsResult(modifications, claimedCacheQuotaBytes, true);
+  }
+
+  private void checkEstimatedSizeNotDecreased(
+      long previousEstimatedSize, long currentEstimatedSize) {
+    if (currentEstimatedSize < previousEstimatedSize) {
       throw new IllegalStateException(
           String.format(
-              "Estimated mods tree size decreased from %d to %d for TsFile 
%s.",
-              claimedCacheQuotaBytes, finalEstimatedSize, resource));
-    }
-    long delta = finalEstimatedSize - claimedCacheQuotaBytes;
-    if (!tryClaimCacheQuota(delta)) {
-      return new LoadedModsResult(modifications, claimedCacheQuotaBytes, 
false);
+              DataNodeQueryMessages.ESTIMATED_MODS_TREE_SIZE_DECREASED,
+              previousEstimatedSize,
+              currentEstimatedSize,
+              resource));
     }
-    claimedCacheQuotaBytes = finalEstimatedSize;
-    return new LoadedModsResult(modifications, claimedCacheQuotaBytes, true);
   }
 
   private boolean tryClaimCacheQuota(long delta) {
@@ -182,11 +191,12 @@ class QueryModificationLoader implements AutoCloseable {
       // has already been read, then continue scanning the same iterator by 
path.
       List<ModEntry> matchedMods;
       try {
-        matchedMods = modsTreeMatcher.match(partialTree.mods);
+        matchedMods = new ArrayList<>(modsTreeMatcher.match(partialTree.mods));
       } finally {
         partialTree.mods = null;
         releaseClaimedCacheQuota(partialTree);
       }
+      reserveMatchedModsMemory(matchedMods);
 
       while (currentIterator.hasNext()) {
         ModEntry modification = currentIterator.next();
@@ -194,12 +204,12 @@ class QueryModificationLoader implements AutoCloseable {
           continue;
         }
         if (modificationMatcher.test(modification)) {
+          reserveMatchedModMemory(modification);
           matchedMods.add(modification);
         }
       }
 
       matchedMods = ModificationUtils.sortAndMerge(matchedMods);
-      reserveMatchedModsMemory(matchedMods);
       return matchedMods;
     } finally {
       close();
@@ -213,7 +223,7 @@ class QueryModificationLoader implements AutoCloseable {
       // Return only the matched mods and release the cache quota claimed 
during loading.
       List<ModEntry> matchedMods;
       try {
-        matchedMods = modsTreeMatcher.match(loadedTree.mods);
+        matchedMods = new ArrayList<>(modsTreeMatcher.match(loadedTree.mods));
       } finally {
         loadedTree.mods = null;
         releaseClaimedCacheQuota(loadedTree);
@@ -226,26 +236,20 @@ class QueryModificationLoader implements AutoCloseable {
   }
 
   private void reserveMatchedModsMemory(List<ModEntry> matchedMods) {
-    long memCost =
-        RamUsageEstimator.shallowSizeOf(matchedMods)
-            + (long) matchedMods.size() * 
RamUsageEstimator.NUM_BYTES_OBJECT_REF
-            + matchedMods.stream().mapToLong(ModEntry::ramBytesUsed).sum();
-    if (memCost > 0) {
-      reserveMemoryImmediately(memCost);
+    reserveMemoryImmediately(RamUsageEstimator.shallowSizeOf(matchedMods));
+    for (ModEntry matchedMod : matchedMods) {
+      reserveMatchedModMemory(matchedMod);
     }
   }
 
+  private void reserveMatchedModMemory(ModEntry matchedMod) {
+    reserveMemoryImmediately(RamUsageEstimator.NUM_BYTES_OBJECT_REF + 
matchedMod.ramBytesUsed());
+  }
+
   private void reserveMemoryImmediately(long bytes) {
-    synchronized (memoryReservationManager) {
-      try {
-        memoryReservationManager.reserveMemoryCumulatively(bytes);
-        memoryReservationManager.reserveMemoryImmediately();
-      } catch (MemoryNotEnoughException e) {
-        // reserveMemoryCumulatively has already added bytes to the pending 
reservation. Clear it
-        // while holding the same lock so other loader threads cannot observe 
the stale pending
-        // size.
-        memoryReservationManager.releaseMemoryVirtually(bytes);
-        throw e;
+    if (bytes > 0) {
+      synchronized (memoryReservationManager) {
+        memoryReservationManager.reserveMemoryImmediately(bytes);
       }
     }
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/FakedMemoryReservationManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/FakedMemoryReservationManager.java
index ee6e0b06cf4..7cee8034a05 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/FakedMemoryReservationManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/FakedMemoryReservationManager.java
@@ -31,6 +31,9 @@ public class FakedMemoryReservationManager implements 
MemoryReservationManager {
   @Override
   public void reserveMemoryImmediately() {}
 
+  @Override
+  public void reserveMemoryImmediately(final long size) {}
+
   @Override
   public void releaseMemoryCumulatively(long size) {}
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/NotThreadSafeMemoryReservationManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/NotThreadSafeMemoryReservationManager.java
index 7dbebaa2b50..d156628532c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/NotThreadSafeMemoryReservationManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/NotThreadSafeMemoryReservationManager.java
@@ -68,6 +68,15 @@ public class NotThreadSafeMemoryReservationManager 
implements MemoryReservationM
     }
   }
 
+  @Override
+  public void reserveMemoryImmediately(final long size) {
+    if (size != 0) {
+      LOCAL_EXECUTION_PLANNER.reserveFromFreeMemoryForOperators(
+          size, reservedBytesInTotal, queryId.getId(), contextHolder);
+      reservedBytesInTotal += size;
+    }
+  }
+
   @Override
   public void releaseMemoryCumulatively(final long size) {
     bytesToBeReleased += size;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/ThreadSafeMemoryReservationManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/ThreadSafeMemoryReservationManager.java
index d167eae354f..2a544421f3f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/ThreadSafeMemoryReservationManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/ThreadSafeMemoryReservationManager.java
@@ -41,6 +41,11 @@ public class ThreadSafeMemoryReservationManager extends 
NotThreadSafeMemoryReser
     super.reserveMemoryImmediately();
   }
 
+  @Override
+  public synchronized void reserveMemoryImmediately(final long size) {
+    super.reserveMemoryImmediately(size);
+  }
+
   @Override
   public synchronized void releaseMemoryCumulatively(long size) {
     super.releaseMemoryCumulatively(size);
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/fragment/QueryModificationLoaderTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/fragment/QueryModificationLoaderTest.java
new file mode 100644
index 00000000000..ec889a053e7
--- /dev/null
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/fragment/QueryModificationLoaderTest.java
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.execution.fragment;
+
+import org.apache.iotdb.calc.exception.MemoryNotEnoughException;
+import org.apache.iotdb.calc.plan.planner.memory.MemoryReservationManager;
+import org.apache.iotdb.commons.path.MeasurementPath;
+import org.apache.iotdb.commons.path.PatternTreeMap;
+import org.apache.iotdb.db.storageengine.dataregion.modification.ModEntry;
+import 
org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile;
+import 
org.apache.iotdb.db.storageengine.dataregion.modification.TreeDeletionEntry;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileID;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.generator.TsFileNameGenerator;
+import org.apache.iotdb.db.utils.constant.TestConstant;
+import org.apache.iotdb.db.utils.datastructure.PatternTreeMapFactory;
+
+import org.apache.tsfile.external.commons.io.FileUtils;
+import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.utils.Pair;
+import org.junit.After;
+import org.junit.Test;
+
+import java.io.File;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+
+public class QueryModificationLoaderTest {
+
+  private static final IDeviceID DEVICE_ID = 
IDeviceID.Factory.DEFAULT_FACTORY.create("root.sg.d1");
+
+  private File testDir;
+
+  @After
+  public void tearDown() throws Exception {
+    if (testDir != null && testDir.exists()) {
+      FileUtils.deleteDirectory(testDir);
+    }
+  }
+
+  @Test
+  public void testCacheLoadedModsTreeWhenQuotaEnough() throws Exception {
+    TsFileResource resource = prepareResource("cache");
+    writeMods(
+        resource,
+        new TreeDeletionEntry(new MeasurementPath("root.sg.d1.s1"), 0, 10),
+        new TreeDeletionEntry(new MeasurementPath("root.sg.d2.s1"), 20, 30));
+
+    Map<TsFileID, PatternTreeMap<ModEntry, 
PatternTreeMapFactory.ModsSerializer>> fileModCache =
+        new ConcurrentHashMap<>();
+    AtomicLong cachedModEntriesSize = new AtomicLong();
+    CountingMemoryReservationManager memoryReservationManager =
+        new CountingMemoryReservationManager();
+
+    try (QueryModificationLoader loader =
+        newLoader(
+            resource,
+            Long.MAX_VALUE,
+            fileModCache,
+            cachedModEntriesSize,
+            memoryReservationManager,
+            1)) {
+      List<ModEntry> result = loader.getPathModifications();
+
+      assertEquals(1, result.size());
+      assertTrue(fileModCache.containsKey(resource.getTsFileID()));
+      assertTrue(cachedModEntriesSize.get() > 0);
+      assertTrue(memoryReservationManager.getReservedBytes() >= 
cachedModEntriesSize.get());
+    }
+  }
+
+  @Test
+  public void testFallbackScansRemainingModsWhenQuotaExceeded() throws 
Exception {
+    TsFileResource resource = prepareResource("fallback");
+    writeMods(
+        resource,
+        new TreeDeletionEntry(new MeasurementPath("root.sg.d1.s1"), 0, 10),
+        new TreeDeletionEntry(new MeasurementPath("root.sg.d2.s1"), 20, 30),
+        new TreeDeletionEntry(new MeasurementPath("root.sg.d1.s1"), 40, 50));
+
+    Map<TsFileID, PatternTreeMap<ModEntry, 
PatternTreeMapFactory.ModsSerializer>> fileModCache =
+        new ConcurrentHashMap<>();
+    AtomicLong cachedModEntriesSize = new AtomicLong();
+    CountingMemoryReservationManager memoryReservationManager =
+        new CountingMemoryReservationManager();
+
+    try (QueryModificationLoader loader =
+        newLoader(resource, 1, fileModCache, cachedModEntriesSize, 
memoryReservationManager, 1)) {
+      List<ModEntry> result = loader.getPathModifications();
+
+      assertEquals(2, result.size());
+      assertFalse(fileModCache.containsKey(resource.getTsFileID()));
+      assertEquals(0, cachedModEntriesSize.get());
+      assertTrue(memoryReservationManager.getReservedBytes() > 0);
+      assertEquals(0, 
memoryReservationManager.getRemainingImmediateFailures());
+    }
+  }
+
+  @Test
+  public void testFallbackMatchesLoadedTreeWhenFinalReservationFailed() throws 
Exception {
+    TsFileResource resource = prepareResource("final-reserve-failed");
+    writeMods(
+        resource,
+        new TreeDeletionEntry(new MeasurementPath("root.sg.d1.s1"), 0, 10),
+        new TreeDeletionEntry(new MeasurementPath("root.sg.d2.s1"), 20, 30),
+        new TreeDeletionEntry(new MeasurementPath("root.sg.d1.s1"), 40, 50));
+
+    Map<TsFileID, PatternTreeMap<ModEntry, 
PatternTreeMapFactory.ModsSerializer>> fileModCache =
+        new ConcurrentHashMap<>();
+    AtomicLong cachedModEntriesSize = new AtomicLong();
+    CountingMemoryReservationManager memoryReservationManager =
+        new CountingMemoryReservationManager(1);
+
+    try (QueryModificationLoader loader =
+        newLoader(
+            resource,
+            Long.MAX_VALUE,
+            fileModCache,
+            cachedModEntriesSize,
+            memoryReservationManager,
+            1)) {
+      List<ModEntry> result = loader.getPathModifications();
+
+      assertEquals(2, result.size());
+      assertFalse(fileModCache.containsKey(resource.getTsFileID()));
+      assertEquals(0, cachedModEntriesSize.get());
+      assertTrue(memoryReservationManager.getReservedBytes() > 0);
+      assertEquals(0, 
memoryReservationManager.getRemainingImmediateFailures());
+    }
+  }
+
+  @Test
+  public void testFallbackThrowsWhenMatchedModsReservationFailed() throws 
Exception {
+    TsFileResource resource = prepareResource("fallback-reserve-failed");
+    writeMods(
+        resource,
+        new TreeDeletionEntry(new MeasurementPath("root.sg.d1.s1"), 0, 10),
+        new TreeDeletionEntry(new MeasurementPath("root.sg.d2.s1"), 20, 30),
+        new TreeDeletionEntry(new MeasurementPath("root.sg.d1.s1"), 40, 50));
+
+    Map<TsFileID, PatternTreeMap<ModEntry, 
PatternTreeMapFactory.ModsSerializer>> fileModCache =
+        new ConcurrentHashMap<>();
+    AtomicLong cachedModEntriesSize = new AtomicLong();
+    CountingMemoryReservationManager memoryReservationManager =
+        new CountingMemoryReservationManager(1);
+
+    try (QueryModificationLoader loader =
+        newLoader(resource, 1, fileModCache, cachedModEntriesSize, 
memoryReservationManager, 1)) {
+      assertThrows(MemoryNotEnoughException.class, 
loader::getPathModifications);
+
+      assertFalse(fileModCache.containsKey(resource.getTsFileID()));
+      assertEquals(0, cachedModEntriesSize.get());
+      assertEquals(0, memoryReservationManager.getReservedBytes());
+      assertEquals(0, 
memoryReservationManager.getRemainingImmediateFailures());
+    }
+  }
+
+  private QueryModificationLoader newLoader(
+      TsFileResource resource,
+      long modsCacheSizeLimitPerFI,
+      Map<TsFileID, PatternTreeMap<ModEntry, 
PatternTreeMapFactory.ModsSerializer>> fileModCache,
+      AtomicLong cachedModEntriesSize,
+      MemoryReservationManager memoryReservationManager,
+      int modsMemoryEstimateReadInterval) {
+    QueryContext queryContext = new QueryContext(false, false);
+    return new QueryModificationLoader(
+        queryContext,
+        resource,
+        memoryReservationManager,
+        modsCacheSizeLimitPerFI,
+        modsMemoryEstimateReadInterval,
+        fileModCache,
+        cachedModEntriesSize,
+        modification -> modification.affects(DEVICE_ID) && 
modification.affects("s1"),
+        modsTree -> queryContext.getPathModifications(modsTree, DEVICE_ID, 
"s1"));
+  }
+
+  private TsFileResource prepareResource(String name) {
+    testDir = new File(TestConstant.BASE_OUTPUT_PATH, 
"QueryModificationLoaderTest-" + name);
+    testDir.mkdirs();
+    File tsFile =
+        new 
File(TsFileNameGenerator.generateNewTsFilePath(testDir.getAbsolutePath(), 1, 1, 
0, 0));
+    return new TsFileResource(tsFile);
+  }
+
+  private void writeMods(TsFileResource resource, TreeDeletionEntry... 
modifications)
+      throws Exception {
+    try (ModificationFile modificationFile = resource.getModFileForWrite()) {
+      for (TreeDeletionEntry modification : modifications) {
+        modificationFile.write(modification);
+      }
+    }
+  }
+
+  private static class CountingMemoryReservationManager implements 
MemoryReservationManager {
+
+    private long reservedBytes;
+    private int remainingImmediateFailures;
+
+    private CountingMemoryReservationManager() {}
+
+    private CountingMemoryReservationManager(int remainingImmediateFailures) {
+      this.remainingImmediateFailures = remainingImmediateFailures;
+    }
+
+    @Override
+    public void reserveMemoryCumulatively(long size) {
+      reservedBytes += size;
+    }
+
+    @Override
+    public void reserveMemoryImmediately() {
+      if (remainingImmediateFailures > 0) {
+        remainingImmediateFailures--;
+        throw new MemoryNotEnoughException("Mock memory reservation failure.");
+      }
+    }
+
+    @Override
+    public void reserveMemoryImmediately(long size) {
+      if (remainingImmediateFailures > 0) {
+        remainingImmediateFailures--;
+        throw new MemoryNotEnoughException("Mock memory reservation failure.");
+      }
+      reservedBytes += size;
+    }
+
+    @Override
+    public void releaseMemoryCumulatively(long size) {
+      reservedBytes -= size;
+    }
+
+    @Override
+    public void releaseAllReservedMemory() {
+      reservedBytes = 0;
+    }
+
+    @Override
+    public Pair<Long, Long> releaseMemoryVirtually(long size) {
+      reservedBytes -= size;
+      return new Pair<>(size, 0L);
+    }
+
+    @Override
+    public void reserveMemoryVirtually(long bytesToBeReserved, long 
bytesAlreadyReserved) {
+      reservedBytes += bytesToBeReserved + bytesAlreadyReserved;
+    }
+
+    private long getReservedBytes() {
+      return reservedBytes;
+    }
+
+    private int getRemainingImmediateFailures() {
+      return remainingImmediateFailures;
+    }
+  }
+}

Reply via email to