This is an automated email from the ASF dual-hosted git repository. shuwenwei pushed a commit to branch loadModificationFileWithMemControl in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit a880d42916cb3cfc8d5de1687ba35f611b363c14 Author: shuwenwei <[email protected]> AuthorDate: Thu May 28 16:30:40 2026 +0800 fix --- .../planner/memory/MemoryReservationManager.java | 8 + .../iotdb/db/i18n/DataNodeQueryMessages.java | 2 + .../iotdb/db/i18n/DataNodeQueryMessages.java | 2 + .../fragment/QueryModificationLoader.java | 74 +++--- .../memory/FakedMemoryReservationManager.java | 3 + .../NotThreadSafeMemoryReservationManager.java | 9 + .../memory/ThreadSafeMemoryReservationManager.java | 5 + .../fragment/QueryModificationLoaderTest.java | 281 +++++++++++++++++++++ 8 files changed, 349 insertions(+), 35 deletions(-) diff --git a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/plan/planner/memory/MemoryReservationManager.java b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/plan/planner/memory/MemoryReservationManager.java index 9df8d52131b..b52534b4ec2 100644 --- a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/plan/planner/memory/MemoryReservationManager.java +++ b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/plan/planner/memory/MemoryReservationManager.java @@ -33,6 +33,14 @@ public interface MemoryReservationManager { /** Reserve memory for the accumulated memory size immediately. */ void reserveMemoryImmediately(); + /** + * Reserve memory for the given size immediately without changing the accumulated pending + * reservation size maintained by this manager. + * + * @param size the size of memory to reserve immediately + */ + void reserveMemoryImmediately(final long size); + /** * Release memory for the given size. * diff --git a/iotdb-core/datanode/src/main/i18n/en/org/apache/iotdb/db/i18n/DataNodeQueryMessages.java b/iotdb-core/datanode/src/main/i18n/en/org/apache/iotdb/db/i18n/DataNodeQueryMessages.java index c1c26550f9d..1c3a0fd2bee 100644 --- a/iotdb-core/datanode/src/main/i18n/en/org/apache/iotdb/db/i18n/DataNodeQueryMessages.java +++ b/iotdb-core/datanode/src/main/i18n/en/org/apache/iotdb/db/i18n/DataNodeQueryMessages.java @@ -136,6 +136,8 @@ public final class DataNodeQueryMessages { public static final String FREE_MORE_MEMORY_THAN_HAS_BEEN_RESERVED = "Free more memory than has been reserved."; + public static final String ESTIMATED_MODS_TREE_SIZE_DECREASED = + "Estimated mods tree size decreased from %d to %d for TsFile %s."; // --- Execution / Operator --- diff --git a/iotdb-core/datanode/src/main/i18n/zh/org/apache/iotdb/db/i18n/DataNodeQueryMessages.java b/iotdb-core/datanode/src/main/i18n/zh/org/apache/iotdb/db/i18n/DataNodeQueryMessages.java index 3d2783c81b6..53806d3f517 100644 --- a/iotdb-core/datanode/src/main/i18n/zh/org/apache/iotdb/db/i18n/DataNodeQueryMessages.java +++ b/iotdb-core/datanode/src/main/i18n/zh/org/apache/iotdb/db/i18n/DataNodeQueryMessages.java @@ -136,6 +136,8 @@ public final class DataNodeQueryMessages { public static final String FREE_MORE_MEMORY_THAN_HAS_BEEN_RESERVED = "释放的内存超过已预留的量。"; + public static final String ESTIMATED_MODS_TREE_SIZE_DECREASED = + "估算的 mods tree 大小从 %d 减少到 %d,TsFile:%s。"; // --- Execution / Operator --- diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/QueryModificationLoader.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/QueryModificationLoader.java index 8895a958bb2..b8d3e321838 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/QueryModificationLoader.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/QueryModificationLoader.java @@ -23,6 +23,7 @@ import org.apache.iotdb.calc.exception.MemoryNotEnoughException; import org.apache.iotdb.calc.plan.planner.memory.MemoryReservationManager; import org.apache.iotdb.commons.exception.IllegalPathException; import org.apache.iotdb.commons.path.PatternTreeMap; +import org.apache.iotdb.db.i18n.DataNodeQueryMessages; import org.apache.iotdb.db.storageengine.dataregion.modification.ModEntry; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileID; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; @@ -31,6 +32,7 @@ import org.apache.iotdb.db.utils.datastructure.PatternTreeMapFactory; import org.apache.tsfile.utils.RamUsageEstimator; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicLong; @@ -118,43 +120,50 @@ class QueryModificationLoader implements AutoCloseable { long claimedCacheQuotaBytes = 0; int readModCount = 0; + boolean estimatedAfterLastAppend = false; while (currentIterator.hasNext()) { ModEntry modification = currentIterator.next(); readModCount++; if (!queryContext.shouldSkipModification(modification)) { modifications.append(modification.keyOfPatternTree(), modification); + estimatedAfterLastAppend = false; } if (readModCount % modsMemoryEstimateReadInterval == 0) { long currentEstimatedSize = estimateModsTreeMemory(modifications); - if (currentEstimatedSize < claimedCacheQuotaBytes) { - throw new IllegalStateException( - String.format( - "Estimated mods tree size decreased from %d to %d for TsFile %s.", - claimedCacheQuotaBytes, currentEstimatedSize, resource)); - } + checkEstimatedSizeNotDecreased(claimedCacheQuotaBytes, currentEstimatedSize); long delta = currentEstimatedSize - claimedCacheQuotaBytes; if (!tryClaimCacheQuota(delta)) { return new LoadedModsResult(modifications, claimedCacheQuotaBytes, false); } claimedCacheQuotaBytes = currentEstimatedSize; + estimatedAfterLastAppend = true; } } - long finalEstimatedSize = estimateModsTreeMemory(modifications); - if (finalEstimatedSize < claimedCacheQuotaBytes) { + if (!estimatedAfterLastAppend) { + long finalEstimatedSize = estimateModsTreeMemory(modifications); + checkEstimatedSizeNotDecreased(claimedCacheQuotaBytes, finalEstimatedSize); + long delta = finalEstimatedSize - claimedCacheQuotaBytes; + if (!tryClaimCacheQuota(delta)) { + return new LoadedModsResult(modifications, claimedCacheQuotaBytes, false); + } + claimedCacheQuotaBytes = finalEstimatedSize; + } + return new LoadedModsResult(modifications, claimedCacheQuotaBytes, true); + } + + private void checkEstimatedSizeNotDecreased( + long previousEstimatedSize, long currentEstimatedSize) { + if (currentEstimatedSize < previousEstimatedSize) { throw new IllegalStateException( String.format( - "Estimated mods tree size decreased from %d to %d for TsFile %s.", - claimedCacheQuotaBytes, finalEstimatedSize, resource)); - } - long delta = finalEstimatedSize - claimedCacheQuotaBytes; - if (!tryClaimCacheQuota(delta)) { - return new LoadedModsResult(modifications, claimedCacheQuotaBytes, false); + DataNodeQueryMessages.ESTIMATED_MODS_TREE_SIZE_DECREASED, + previousEstimatedSize, + currentEstimatedSize, + resource)); } - claimedCacheQuotaBytes = finalEstimatedSize; - return new LoadedModsResult(modifications, claimedCacheQuotaBytes, true); } private boolean tryClaimCacheQuota(long delta) { @@ -182,11 +191,12 @@ class QueryModificationLoader implements AutoCloseable { // has already been read, then continue scanning the same iterator by path. List<ModEntry> matchedMods; try { - matchedMods = modsTreeMatcher.match(partialTree.mods); + matchedMods = new ArrayList<>(modsTreeMatcher.match(partialTree.mods)); } finally { partialTree.mods = null; releaseClaimedCacheQuota(partialTree); } + reserveMatchedModsMemory(matchedMods); while (currentIterator.hasNext()) { ModEntry modification = currentIterator.next(); @@ -194,12 +204,12 @@ class QueryModificationLoader implements AutoCloseable { continue; } if (modificationMatcher.test(modification)) { + reserveMatchedModMemory(modification); matchedMods.add(modification); } } matchedMods = ModificationUtils.sortAndMerge(matchedMods); - reserveMatchedModsMemory(matchedMods); return matchedMods; } finally { close(); @@ -213,7 +223,7 @@ class QueryModificationLoader implements AutoCloseable { // Return only the matched mods and release the cache quota claimed during loading. List<ModEntry> matchedMods; try { - matchedMods = modsTreeMatcher.match(loadedTree.mods); + matchedMods = new ArrayList<>(modsTreeMatcher.match(loadedTree.mods)); } finally { loadedTree.mods = null; releaseClaimedCacheQuota(loadedTree); @@ -226,26 +236,20 @@ class QueryModificationLoader implements AutoCloseable { } private void reserveMatchedModsMemory(List<ModEntry> matchedMods) { - long memCost = - RamUsageEstimator.shallowSizeOf(matchedMods) - + (long) matchedMods.size() * RamUsageEstimator.NUM_BYTES_OBJECT_REF - + matchedMods.stream().mapToLong(ModEntry::ramBytesUsed).sum(); - if (memCost > 0) { - reserveMemoryImmediately(memCost); + reserveMemoryImmediately(RamUsageEstimator.shallowSizeOf(matchedMods)); + for (ModEntry matchedMod : matchedMods) { + reserveMatchedModMemory(matchedMod); } } + private void reserveMatchedModMemory(ModEntry matchedMod) { + reserveMemoryImmediately(RamUsageEstimator.NUM_BYTES_OBJECT_REF + matchedMod.ramBytesUsed()); + } + private void reserveMemoryImmediately(long bytes) { - synchronized (memoryReservationManager) { - try { - memoryReservationManager.reserveMemoryCumulatively(bytes); - memoryReservationManager.reserveMemoryImmediately(); - } catch (MemoryNotEnoughException e) { - // reserveMemoryCumulatively has already added bytes to the pending reservation. Clear it - // while holding the same lock so other loader threads cannot observe the stale pending - // size. - memoryReservationManager.releaseMemoryVirtually(bytes); - throw e; + if (bytes > 0) { + synchronized (memoryReservationManager) { + memoryReservationManager.reserveMemoryImmediately(bytes); } } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/FakedMemoryReservationManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/FakedMemoryReservationManager.java index ee6e0b06cf4..7cee8034a05 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/FakedMemoryReservationManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/FakedMemoryReservationManager.java @@ -31,6 +31,9 @@ public class FakedMemoryReservationManager implements MemoryReservationManager { @Override public void reserveMemoryImmediately() {} + @Override + public void reserveMemoryImmediately(final long size) {} + @Override public void releaseMemoryCumulatively(long size) {} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/NotThreadSafeMemoryReservationManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/NotThreadSafeMemoryReservationManager.java index 7dbebaa2b50..d156628532c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/NotThreadSafeMemoryReservationManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/NotThreadSafeMemoryReservationManager.java @@ -68,6 +68,15 @@ public class NotThreadSafeMemoryReservationManager implements MemoryReservationM } } + @Override + public void reserveMemoryImmediately(final long size) { + if (size != 0) { + LOCAL_EXECUTION_PLANNER.reserveFromFreeMemoryForOperators( + size, reservedBytesInTotal, queryId.getId(), contextHolder); + reservedBytesInTotal += size; + } + } + @Override public void releaseMemoryCumulatively(final long size) { bytesToBeReleased += size; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/ThreadSafeMemoryReservationManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/ThreadSafeMemoryReservationManager.java index d167eae354f..2a544421f3f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/ThreadSafeMemoryReservationManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/ThreadSafeMemoryReservationManager.java @@ -41,6 +41,11 @@ public class ThreadSafeMemoryReservationManager extends NotThreadSafeMemoryReser super.reserveMemoryImmediately(); } + @Override + public synchronized void reserveMemoryImmediately(final long size) { + super.reserveMemoryImmediately(size); + } + @Override public synchronized void releaseMemoryCumulatively(long size) { super.releaseMemoryCumulatively(size); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/fragment/QueryModificationLoaderTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/fragment/QueryModificationLoaderTest.java new file mode 100644 index 00000000000..ec889a053e7 --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/fragment/QueryModificationLoaderTest.java @@ -0,0 +1,281 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.execution.fragment; + +import org.apache.iotdb.calc.exception.MemoryNotEnoughException; +import org.apache.iotdb.calc.plan.planner.memory.MemoryReservationManager; +import org.apache.iotdb.commons.path.MeasurementPath; +import org.apache.iotdb.commons.path.PatternTreeMap; +import org.apache.iotdb.db.storageengine.dataregion.modification.ModEntry; +import org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile; +import org.apache.iotdb.db.storageengine.dataregion.modification.TreeDeletionEntry; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileID; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.generator.TsFileNameGenerator; +import org.apache.iotdb.db.utils.constant.TestConstant; +import org.apache.iotdb.db.utils.datastructure.PatternTreeMapFactory; + +import org.apache.tsfile.external.commons.io.FileUtils; +import org.apache.tsfile.file.metadata.IDeviceID; +import org.apache.tsfile.utils.Pair; +import org.junit.After; +import org.junit.Test; + +import java.io.File; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; + +public class QueryModificationLoaderTest { + + private static final IDeviceID DEVICE_ID = IDeviceID.Factory.DEFAULT_FACTORY.create("root.sg.d1"); + + private File testDir; + + @After + public void tearDown() throws Exception { + if (testDir != null && testDir.exists()) { + FileUtils.deleteDirectory(testDir); + } + } + + @Test + public void testCacheLoadedModsTreeWhenQuotaEnough() throws Exception { + TsFileResource resource = prepareResource("cache"); + writeMods( + resource, + new TreeDeletionEntry(new MeasurementPath("root.sg.d1.s1"), 0, 10), + new TreeDeletionEntry(new MeasurementPath("root.sg.d2.s1"), 20, 30)); + + Map<TsFileID, PatternTreeMap<ModEntry, PatternTreeMapFactory.ModsSerializer>> fileModCache = + new ConcurrentHashMap<>(); + AtomicLong cachedModEntriesSize = new AtomicLong(); + CountingMemoryReservationManager memoryReservationManager = + new CountingMemoryReservationManager(); + + try (QueryModificationLoader loader = + newLoader( + resource, + Long.MAX_VALUE, + fileModCache, + cachedModEntriesSize, + memoryReservationManager, + 1)) { + List<ModEntry> result = loader.getPathModifications(); + + assertEquals(1, result.size()); + assertTrue(fileModCache.containsKey(resource.getTsFileID())); + assertTrue(cachedModEntriesSize.get() > 0); + assertTrue(memoryReservationManager.getReservedBytes() >= cachedModEntriesSize.get()); + } + } + + @Test + public void testFallbackScansRemainingModsWhenQuotaExceeded() throws Exception { + TsFileResource resource = prepareResource("fallback"); + writeMods( + resource, + new TreeDeletionEntry(new MeasurementPath("root.sg.d1.s1"), 0, 10), + new TreeDeletionEntry(new MeasurementPath("root.sg.d2.s1"), 20, 30), + new TreeDeletionEntry(new MeasurementPath("root.sg.d1.s1"), 40, 50)); + + Map<TsFileID, PatternTreeMap<ModEntry, PatternTreeMapFactory.ModsSerializer>> fileModCache = + new ConcurrentHashMap<>(); + AtomicLong cachedModEntriesSize = new AtomicLong(); + CountingMemoryReservationManager memoryReservationManager = + new CountingMemoryReservationManager(); + + try (QueryModificationLoader loader = + newLoader(resource, 1, fileModCache, cachedModEntriesSize, memoryReservationManager, 1)) { + List<ModEntry> result = loader.getPathModifications(); + + assertEquals(2, result.size()); + assertFalse(fileModCache.containsKey(resource.getTsFileID())); + assertEquals(0, cachedModEntriesSize.get()); + assertTrue(memoryReservationManager.getReservedBytes() > 0); + assertEquals(0, memoryReservationManager.getRemainingImmediateFailures()); + } + } + + @Test + public void testFallbackMatchesLoadedTreeWhenFinalReservationFailed() throws Exception { + TsFileResource resource = prepareResource("final-reserve-failed"); + writeMods( + resource, + new TreeDeletionEntry(new MeasurementPath("root.sg.d1.s1"), 0, 10), + new TreeDeletionEntry(new MeasurementPath("root.sg.d2.s1"), 20, 30), + new TreeDeletionEntry(new MeasurementPath("root.sg.d1.s1"), 40, 50)); + + Map<TsFileID, PatternTreeMap<ModEntry, PatternTreeMapFactory.ModsSerializer>> fileModCache = + new ConcurrentHashMap<>(); + AtomicLong cachedModEntriesSize = new AtomicLong(); + CountingMemoryReservationManager memoryReservationManager = + new CountingMemoryReservationManager(1); + + try (QueryModificationLoader loader = + newLoader( + resource, + Long.MAX_VALUE, + fileModCache, + cachedModEntriesSize, + memoryReservationManager, + 1)) { + List<ModEntry> result = loader.getPathModifications(); + + assertEquals(2, result.size()); + assertFalse(fileModCache.containsKey(resource.getTsFileID())); + assertEquals(0, cachedModEntriesSize.get()); + assertTrue(memoryReservationManager.getReservedBytes() > 0); + assertEquals(0, memoryReservationManager.getRemainingImmediateFailures()); + } + } + + @Test + public void testFallbackThrowsWhenMatchedModsReservationFailed() throws Exception { + TsFileResource resource = prepareResource("fallback-reserve-failed"); + writeMods( + resource, + new TreeDeletionEntry(new MeasurementPath("root.sg.d1.s1"), 0, 10), + new TreeDeletionEntry(new MeasurementPath("root.sg.d2.s1"), 20, 30), + new TreeDeletionEntry(new MeasurementPath("root.sg.d1.s1"), 40, 50)); + + Map<TsFileID, PatternTreeMap<ModEntry, PatternTreeMapFactory.ModsSerializer>> fileModCache = + new ConcurrentHashMap<>(); + AtomicLong cachedModEntriesSize = new AtomicLong(); + CountingMemoryReservationManager memoryReservationManager = + new CountingMemoryReservationManager(1); + + try (QueryModificationLoader loader = + newLoader(resource, 1, fileModCache, cachedModEntriesSize, memoryReservationManager, 1)) { + assertThrows(MemoryNotEnoughException.class, loader::getPathModifications); + + assertFalse(fileModCache.containsKey(resource.getTsFileID())); + assertEquals(0, cachedModEntriesSize.get()); + assertEquals(0, memoryReservationManager.getReservedBytes()); + assertEquals(0, memoryReservationManager.getRemainingImmediateFailures()); + } + } + + private QueryModificationLoader newLoader( + TsFileResource resource, + long modsCacheSizeLimitPerFI, + Map<TsFileID, PatternTreeMap<ModEntry, PatternTreeMapFactory.ModsSerializer>> fileModCache, + AtomicLong cachedModEntriesSize, + MemoryReservationManager memoryReservationManager, + int modsMemoryEstimateReadInterval) { + QueryContext queryContext = new QueryContext(false, false); + return new QueryModificationLoader( + queryContext, + resource, + memoryReservationManager, + modsCacheSizeLimitPerFI, + modsMemoryEstimateReadInterval, + fileModCache, + cachedModEntriesSize, + modification -> modification.affects(DEVICE_ID) && modification.affects("s1"), + modsTree -> queryContext.getPathModifications(modsTree, DEVICE_ID, "s1")); + } + + private TsFileResource prepareResource(String name) { + testDir = new File(TestConstant.BASE_OUTPUT_PATH, "QueryModificationLoaderTest-" + name); + testDir.mkdirs(); + File tsFile = + new File(TsFileNameGenerator.generateNewTsFilePath(testDir.getAbsolutePath(), 1, 1, 0, 0)); + return new TsFileResource(tsFile); + } + + private void writeMods(TsFileResource resource, TreeDeletionEntry... modifications) + throws Exception { + try (ModificationFile modificationFile = resource.getModFileForWrite()) { + for (TreeDeletionEntry modification : modifications) { + modificationFile.write(modification); + } + } + } + + private static class CountingMemoryReservationManager implements MemoryReservationManager { + + private long reservedBytes; + private int remainingImmediateFailures; + + private CountingMemoryReservationManager() {} + + private CountingMemoryReservationManager(int remainingImmediateFailures) { + this.remainingImmediateFailures = remainingImmediateFailures; + } + + @Override + public void reserveMemoryCumulatively(long size) { + reservedBytes += size; + } + + @Override + public void reserveMemoryImmediately() { + if (remainingImmediateFailures > 0) { + remainingImmediateFailures--; + throw new MemoryNotEnoughException("Mock memory reservation failure."); + } + } + + @Override + public void reserveMemoryImmediately(long size) { + if (remainingImmediateFailures > 0) { + remainingImmediateFailures--; + throw new MemoryNotEnoughException("Mock memory reservation failure."); + } + reservedBytes += size; + } + + @Override + public void releaseMemoryCumulatively(long size) { + reservedBytes -= size; + } + + @Override + public void releaseAllReservedMemory() { + reservedBytes = 0; + } + + @Override + public Pair<Long, Long> releaseMemoryVirtually(long size) { + reservedBytes -= size; + return new Pair<>(size, 0L); + } + + @Override + public void reserveMemoryVirtually(long bytesToBeReserved, long bytesAlreadyReserved) { + reservedBytes += bytesToBeReserved + bytesAlreadyReserved; + } + + private long getReservedBytes() { + return reservedBytes; + } + + private int getRemainingImmediateFailures() { + return remainingImmediateFailures; + } + } +}
