This is an automated email from the ASF dual-hosted git repository. shuwenwei pushed a commit to branch loadModificationFileWithMemControl in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit c8420a0e7301c2d127776df891b11196ae185e77 Author: shuwenwei <[email protected]> AuthorDate: Wed May 27 16:37:20 2026 +0800 add memory control --- .../fragment/FragmentInstanceContext.java | 87 +++--- .../execution/fragment/QueryContext.java | 20 +- .../fragment/QueryModificationLoader.java | 299 +++++++++++++++++++++ .../dataregion/tsfile/TsFileResource.java | 18 +- 4 files changed, 381 insertions(+), 43 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java index 2a0373cf6fd..833ba668174 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java @@ -26,11 +26,11 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.audit.UserEntity; import org.apache.iotdb.commons.conf.CommonConfig; import org.apache.iotdb.commons.conf.CommonDescriptor; +import org.apache.iotdb.commons.exception.IllegalPathException; import org.apache.iotdb.commons.exception.IoTDBException; import org.apache.iotdb.commons.exception.IoTDBRuntimeException; import org.apache.iotdb.commons.path.AlignedFullPath; import org.apache.iotdb.commons.path.IFullPath; -import org.apache.iotdb.commons.path.PatternTreeMap; import org.apache.iotdb.commons.queryengine.common.SessionInfo; import org.apache.iotdb.commons.queryengine.utils.TimestampPrecisionUtils; import org.apache.iotdb.commons.utils.TestOnly; @@ -56,7 +56,6 @@ import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSourceForRegio import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSourceType; import org.apache.iotdb.db.storageengine.dataregion.read.control.FileReaderManager; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; -import org.apache.iotdb.db.utils.datastructure.PatternTreeMapFactory; import org.apache.iotdb.db.utils.datastructure.TVList; import org.apache.iotdb.mpp.rpc.thrift.TFetchFragmentInstanceStatisticsResp; @@ -82,6 +81,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Predicate; import java.util.stream.Collectors; import static org.apache.iotdb.commons.utils.ErrorHandlingCommonUtils.getRootCause; @@ -99,6 +99,7 @@ public class FragmentInstanceContext extends QueryContext { private static final long END_TIME_INITIAL_VALUE = -1L; // wait over 5s for driver to close is abnormal private static final long LONG_WAIT_DURATION = 5_000_000_000L; + private static final int MODS_MEMORY_ESTIMATE_READ_INTERVAL = 10_000; private final FragmentInstanceId id; private final FragmentInstanceStateMachine stateMachine; @@ -378,41 +379,57 @@ public class FragmentInstanceContext extends QueryContext { } @Override - protected PatternTreeMap<ModEntry, PatternTreeMapFactory.ModsSerializer> getAllModifications( - TsFileResource resource) { - if (isSingleSourcePath() || memoryReservationManager == null) { - return loadAllModificationsFromDisk(resource); + public List<ModEntry> getPathModifications( + TsFileResource tsFileResource, IDeviceID deviceID, String measurement) { + if (!checkIfModificationExists(tsFileResource)) { + return Collections.emptyList(); } + if (memoryReservationManager == null) { + return super.getPathModifications(tsFileResource, deviceID, measurement); + } + try (QueryModificationLoader modificationLoader = + getQueryModificationLoader( + tsFileResource, + modification -> modification.affects(deviceID) && modification.affects(measurement), + mods -> getPathModifications(mods, deviceID, measurement))) { + return modificationLoader.getPathModifications(); + } catch (IllegalPathException e) { + throw new IllegalStateException(e); + } + } + + @Override + public List<ModEntry> getPathModifications(TsFileResource tsFileResource, IDeviceID deviceID) + throws IllegalPathException { + if (!checkIfModificationExists(tsFileResource)) { + return Collections.emptyList(); + } + if (memoryReservationManager == null) { + return super.getPathModifications(tsFileResource, deviceID); + } + try (QueryModificationLoader modificationLoader = + getQueryModificationLoader( + tsFileResource, + modification -> modification.affects(deviceID), + mods -> getPathModifications(mods, deviceID))) { + return modificationLoader.getPathModifications(); + } + } - AtomicReference<PatternTreeMap<ModEntry, PatternTreeMapFactory.ModsSerializer>> - atomicReference = new AtomicReference<>(); - PatternTreeMap<ModEntry, PatternTreeMapFactory.ModsSerializer> cachedResult = - fileModCache.computeIfAbsent( - resource.getTsFileID(), - k -> { - PatternTreeMap<ModEntry, PatternTreeMapFactory.ModsSerializer> allMods = - loadAllModificationsFromDisk(resource); - atomicReference.set(allMods); - if (cachedModEntriesSize.get() >= CONFIG.getModsCacheSizeLimitPerFI()) { - return null; - } - long memCost = - RamUsageEstimator.sizeOfObject(allMods) - + RamUsageEstimator.SHALLOW_SIZE_OF_CONCURRENT_HASHMAP_ENTRY; - long alreadyUsedMemoryForCachedModEntries = cachedModEntriesSize.get(); - while (alreadyUsedMemoryForCachedModEntries + memCost - < CONFIG.getModsCacheSizeLimitPerFI()) { - if (cachedModEntriesSize.compareAndSet( - alreadyUsedMemoryForCachedModEntries, - alreadyUsedMemoryForCachedModEntries + memCost)) { - memoryReservationManager.reserveMemoryCumulatively(memCost); - return allMods; - } - alreadyUsedMemoryForCachedModEntries = cachedModEntriesSize.get(); - } - return null; - }); - return cachedResult == null ? atomicReference.get() : cachedResult; + private QueryModificationLoader getQueryModificationLoader( + TsFileResource tsFileResource, + Predicate<ModEntry> fallbackModificationMatcher, + QueryModificationLoader.ModsTreeMatcher modsTreeMatcher) { + return new QueryModificationLoader( + this, + tsFileResource, + memoryReservationManager, + CONFIG.getModsCacheSizeLimitPerFI(), + MODS_MEMORY_ESTIMATE_READ_INTERVAL, + fileModCache, + cachedModEntriesSize, + fallbackModificationMatcher, + modsTreeMatcher); } // the state change listener is added here in a separate initialize() method diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/QueryContext.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/QueryContext.java index d3edbe2e0d0..44a88826696 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/QueryContext.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/QueryContext.java @@ -127,20 +127,26 @@ public class QueryContext { TsFileResource resource) { PatternTreeMap<ModEntry, ModsSerializer> modifications = PatternTreeMapFactory.getModsPatternTreeMap(); - TsFileResource.ModIterator modEntryIterator = resource.getModEntryIterator(); - while (modEntryIterator.hasNext()) { - ModEntry modification = modEntryIterator.next(); - if (tables != null && modification instanceof TableDeletionEntry) { - String tableName = ((TableDeletionEntry) modification).getTableName(); - if (!tables.contains(tableName)) { + try (TsFileResource.ModIterator modEntryIterator = resource.getModEntryIterator()) { + while (modEntryIterator.hasNext()) { + ModEntry modification = modEntryIterator.next(); + if (shouldSkipModification(modification)) { continue; } + modifications.append(modification.keyOfPatternTree(), modification); } - modifications.append(modification.keyOfPatternTree(), modification); } return modifications; } + protected boolean shouldSkipModification(ModEntry modification) { + if (tables != null && modification instanceof TableDeletionEntry) { + String tableName = ((TableDeletionEntry) modification).getTableName(); + return !tables.contains(tableName); + } + return false; + } + public List<ModEntry> getPathModifications( TsFileResource tsFileResource, IDeviceID deviceID, String measurement) { // if the mods file does not exist, do not add it to the cache diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/QueryModificationLoader.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/QueryModificationLoader.java new file mode 100644 index 00000000000..8895a958bb2 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/QueryModificationLoader.java @@ -0,0 +1,299 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.execution.fragment; + +import org.apache.iotdb.calc.exception.MemoryNotEnoughException; +import org.apache.iotdb.calc.plan.planner.memory.MemoryReservationManager; +import org.apache.iotdb.commons.exception.IllegalPathException; +import org.apache.iotdb.commons.path.PatternTreeMap; +import org.apache.iotdb.db.storageengine.dataregion.modification.ModEntry; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileID; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; +import org.apache.iotdb.db.utils.ModificationUtils; +import org.apache.iotdb.db.utils.datastructure.PatternTreeMapFactory; + +import org.apache.tsfile.utils.RamUsageEstimator; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Predicate; + +class QueryModificationLoader implements AutoCloseable { + + private final QueryContext queryContext; + private final TsFileResource resource; + private final MemoryReservationManager memoryReservationManager; + private final long modsCacheSizeLimitPerFI; + private final int modsMemoryEstimateReadInterval; + private final Map<TsFileID, PatternTreeMap<ModEntry, PatternTreeMapFactory.ModsSerializer>> + fileModCache; + private final AtomicLong cachedModEntriesSize; + private final Predicate<ModEntry> modificationMatcher; + private final ModsTreeMatcher modsTreeMatcher; + + private TsFileResource.ModIterator currentIterator; + + QueryModificationLoader( + QueryContext queryContext, + TsFileResource resource, + MemoryReservationManager memoryReservationManager, + long modsCacheSizeLimitPerFI, + int modsMemoryEstimateReadInterval, + Map<TsFileID, PatternTreeMap<ModEntry, PatternTreeMapFactory.ModsSerializer>> fileModCache, + AtomicLong cachedModEntriesSize, + Predicate<ModEntry> modificationMatcher, + ModsTreeMatcher modsTreeMatcher) { + this.queryContext = queryContext; + this.resource = resource; + this.memoryReservationManager = memoryReservationManager; + this.modsCacheSizeLimitPerFI = modsCacheSizeLimitPerFI; + this.modsMemoryEstimateReadInterval = modsMemoryEstimateReadInterval; + this.fileModCache = fileModCache; + this.cachedModEntriesSize = cachedModEntriesSize; + this.modificationMatcher = modificationMatcher; + this.modsTreeMatcher = modsTreeMatcher; + } + + List<ModEntry> getPathModifications() throws IllegalPathException { + AtomicReference<LoadedModsResult> loadedResult = new AtomicReference<>(); + PatternTreeMap<ModEntry, PatternTreeMapFactory.ModsSerializer> cachedMods = + fileModCache.computeIfAbsent( + resource.getTsFileID(), ignored -> loadAllModificationsForCache(loadedResult)); + if (cachedMods != null) { + // Either this loader cached the full tree successfully, or another loader had cached it. + return modsTreeMatcher.match(cachedMods); + } + + LoadedModsResult result = loadedResult.get(); + if (result.loadedAllModEntries) { + return fallbackByMatchLoadedPatternTree(result); + } else { + return fallbackByMatchedScan(result); + } + } + + private PatternTreeMap<ModEntry, PatternTreeMapFactory.ModsSerializer> + loadAllModificationsForCache(AtomicReference<LoadedModsResult> loadedResult) { + LoadedModsResult result = loadAllModificationsWithQuotaControl(); + loadedResult.set(result); + if (!result.loadedAllModEntries) { + return null; + } + + try { + // The cache quota has already been claimed while loading. Reserve real query memory only + // once, just before the fully loaded tree becomes visible in fileModCache. + reserveMemoryImmediately(result.cacheQuotaBytes); + } catch (MemoryNotEnoughException e) { + return null; + } + + closeCurrentIterator(); + return result.mods; + } + + private LoadedModsResult loadAllModificationsWithQuotaControl() { + PatternTreeMap<ModEntry, PatternTreeMapFactory.ModsSerializer> modifications = + PatternTreeMapFactory.getModsPatternTreeMap(); + currentIterator = resource.getModEntryIterator(); + + long claimedCacheQuotaBytes = 0; + int readModCount = 0; + + while (currentIterator.hasNext()) { + ModEntry modification = currentIterator.next(); + readModCount++; + if (!queryContext.shouldSkipModification(modification)) { + modifications.append(modification.keyOfPatternTree(), modification); + } + + if (readModCount % modsMemoryEstimateReadInterval == 0) { + long currentEstimatedSize = estimateModsTreeMemory(modifications); + if (currentEstimatedSize < claimedCacheQuotaBytes) { + throw new IllegalStateException( + String.format( + "Estimated mods tree size decreased from %d to %d for TsFile %s.", + claimedCacheQuotaBytes, currentEstimatedSize, resource)); + } + long delta = currentEstimatedSize - claimedCacheQuotaBytes; + if (!tryClaimCacheQuota(delta)) { + return new LoadedModsResult(modifications, claimedCacheQuotaBytes, false); + } + claimedCacheQuotaBytes = currentEstimatedSize; + } + } + + long finalEstimatedSize = estimateModsTreeMemory(modifications); + if (finalEstimatedSize < claimedCacheQuotaBytes) { + throw new IllegalStateException( + String.format( + "Estimated mods tree size decreased from %d to %d for TsFile %s.", + claimedCacheQuotaBytes, finalEstimatedSize, resource)); + } + long delta = finalEstimatedSize - claimedCacheQuotaBytes; + if (!tryClaimCacheQuota(delta)) { + return new LoadedModsResult(modifications, claimedCacheQuotaBytes, false); + } + claimedCacheQuotaBytes = finalEstimatedSize; + return new LoadedModsResult(modifications, claimedCacheQuotaBytes, true); + } + + private boolean tryClaimCacheQuota(long delta) { + if (delta <= 0) { + return true; + } + + // During loading, this only claims FI-level cache quota. Real memory is reserved once after the + // full tree is loaded and before computeIfAbsent publishes it to fileModCache. + long alreadyUsedMemoryForCachedModEntries = cachedModEntriesSize.get(); + while (alreadyUsedMemoryForCachedModEntries + delta < modsCacheSizeLimitPerFI) { + if (cachedModEntriesSize.compareAndSet( + alreadyUsedMemoryForCachedModEntries, alreadyUsedMemoryForCachedModEntries + delta)) { + return true; + } + alreadyUsedMemoryForCachedModEntries = cachedModEntriesSize.get(); + } + return false; + } + + private List<ModEntry> fallbackByMatchedScan(LoadedModsResult partialTree) + throws IllegalPathException { + try { + // The full tree exceeded the FI mods cache quota. Reuse the partial tree for the part that + // has already been read, then continue scanning the same iterator by path. + List<ModEntry> matchedMods; + try { + matchedMods = modsTreeMatcher.match(partialTree.mods); + } finally { + partialTree.mods = null; + releaseClaimedCacheQuota(partialTree); + } + + while (currentIterator.hasNext()) { + ModEntry modification = currentIterator.next(); + if (queryContext.shouldSkipModification(modification)) { + continue; + } + if (modificationMatcher.test(modification)) { + matchedMods.add(modification); + } + } + + matchedMods = ModificationUtils.sortAndMerge(matchedMods); + reserveMatchedModsMemory(matchedMods); + return matchedMods; + } finally { + close(); + } + } + + private List<ModEntry> fallbackByMatchLoadedPatternTree(LoadedModsResult loadedTree) + throws IllegalPathException { + try { + // The tree was fully loaded but not cached, usually because final memory reservation failed. + // Return only the matched mods and release the cache quota claimed during loading. + List<ModEntry> matchedMods; + try { + matchedMods = modsTreeMatcher.match(loadedTree.mods); + } finally { + loadedTree.mods = null; + releaseClaimedCacheQuota(loadedTree); + } + reserveMatchedModsMemory(matchedMods); + return matchedMods; + } finally { + close(); + } + } + + private void reserveMatchedModsMemory(List<ModEntry> matchedMods) { + long memCost = + RamUsageEstimator.shallowSizeOf(matchedMods) + + (long) matchedMods.size() * RamUsageEstimator.NUM_BYTES_OBJECT_REF + + matchedMods.stream().mapToLong(ModEntry::ramBytesUsed).sum(); + if (memCost > 0) { + reserveMemoryImmediately(memCost); + } + } + + private void reserveMemoryImmediately(long bytes) { + synchronized (memoryReservationManager) { + try { + memoryReservationManager.reserveMemoryCumulatively(bytes); + memoryReservationManager.reserveMemoryImmediately(); + } catch (MemoryNotEnoughException e) { + // reserveMemoryCumulatively has already added bytes to the pending reservation. Clear it + // while holding the same lock so other loader threads cannot observe the stale pending + // size. + memoryReservationManager.releaseMemoryVirtually(bytes); + throw e; + } + } + } + + private long estimateModsTreeMemory( + PatternTreeMap<ModEntry, PatternTreeMapFactory.ModsSerializer> modifications) { + return RamUsageEstimator.sizeOfObject(modifications); + } + + @Override + public void close() { + closeCurrentIterator(); + } + + private void closeCurrentIterator() { + if (currentIterator != null) { + currentIterator.close(); + currentIterator = null; + } + } + + private void releaseClaimedCacheQuota(LoadedModsResult loadedResult) { + if (loadedResult.cacheQuotaBytes > 0) { + cachedModEntriesSize.addAndGet(-loadedResult.cacheQuotaBytes); + loadedResult.cacheQuotaBytes = 0; + } + } + + private static class LoadedModsResult { + + private PatternTreeMap<ModEntry, PatternTreeMapFactory.ModsSerializer> mods; + private long cacheQuotaBytes; + private final boolean loadedAllModEntries; + + private LoadedModsResult( + PatternTreeMap<ModEntry, PatternTreeMapFactory.ModsSerializer> mods, + long cacheQuotaBytes, + boolean loadedAllModEntries) { + this.mods = mods; + this.cacheQuotaBytes = cacheQuotaBytes; + this.loadedAllModEntries = loadedAllModEntries; + } + } + + @FunctionalInterface + interface ModsTreeMatcher { + + List<ModEntry> match(PatternTreeMap<ModEntry, PatternTreeMapFactory.ModsSerializer> modsTree) + throws IllegalPathException; + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java index e513d79ba61..439286288d6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java @@ -1494,7 +1494,7 @@ public class TsFileResource implements PersistentResource, Cloneable { return entries; } - public class ModIterator implements Iterator<ModEntry> { + public class ModIterator implements Iterator<ModEntry>, AutoCloseable { private final Iterator<ModEntry> sharedModIterator; private final Iterator<ModEntry> exclusiveModIterator; @@ -1537,6 +1537,22 @@ public class TsFileResource implements PersistentResource, Cloneable { } throw new NoSuchElementException(); } + + @Override + public void close() { + closeModIterator(exclusiveModIterator); + closeModIterator(sharedModIterator); + } + + private void closeModIterator(Iterator<ModEntry> modIterator) { + if (modIterator instanceof AutoCloseable) { + try { + ((AutoCloseable) modIterator).close(); + } catch (Exception e) { + LOGGER.info(StorageEngineMessages.CANNOT_CLOSE_MOD_FILE_INPUT_STREAM, getTsFile(), e); + } + } + } } public void upgradeModFile(ExecutorService upgradeModFileThreadPool) throws IOException {
