This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch rc/1.3.5 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit fc8b0e60e033538289336c51b2184b498510eb30 Author: shuwenwei <[email protected]> AuthorDate: Fri Aug 15 15:39:11 2025 +0800 [to dev/1.3] Add memory control for mod entries in query --- .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 11 ++++ .../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 20 ++++++ .../execution/MemoryEstimationHelper.java | 42 ++++++++++++ .../fragment/FragmentInstanceContext.java | 77 ++++++++++++++++++++++ .../execution/fragment/QueryContext.java | 63 ++++++++++-------- .../dataregion/modification/Deletion.java | 30 +++++++-- .../dataregion/modification/Modification.java | 4 +- .../filescan/impl/ClosedFileScanHandleImpl.java | 16 ++++- .../conf/iotdb-system.properties.template | 7 ++ .../apache/iotdb/commons/path/PathPatternNode.java | 19 +++++- .../apache/iotdb/commons/path/PatternTreeMap.java | 12 +++- pom.xml | 2 +- 12 files changed, 264 insertions(+), 39 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java index 890af218a59..9e3896f2171 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java @@ -454,6 +454,9 @@ public class IoTDBConfig { /** The buffer for sort operation */ private long sortBufferSize = 1024 * 1024L; + /** Mods cache size limit per fi */ + private long modsCacheSizeLimitPerFI = 32 * 1024 * 1024; + /** * The strategy of inner space compaction task. There are just one inner space compaction strategy * SIZE_TIRED_COMPACTION: @@ -4242,6 +4245,14 @@ public class IoTDBConfig { return sortBufferSize; } + public void setModsCacheSizeLimitPerFI(long modsCacheSizeLimitPerFI) { + this.modsCacheSizeLimitPerFI = modsCacheSizeLimitPerFI; + } + + public long getModsCacheSizeLimitPerFI() { + return modsCacheSizeLimitPerFI; + } + public void setSortTmpDir(String sortTmpDir) { this.sortTmpDir = sortTmpDir; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java index 7fbd9ce2ed7..3b7f313610a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java @@ -90,6 +90,7 @@ import java.util.Optional; import java.util.Properties; import java.util.ServiceLoader; import java.util.Set; +import java.util.function.LongConsumer; import java.util.regex.Pattern; public class IoTDBDescriptor { @@ -1086,6 +1087,9 @@ public class IoTDBDescriptor { .getProperty("sort_buffer_size_in_bytes", Long.toString(conf.getSortBufferSize())) .trim())); + loadFixedSizeLimitForQuery( + properties, "mods_cache_size_limit_per_fi_in_bytes", conf::setModsCacheSizeLimitPerFI); + // tmp filePath for sort operator conf.setSortTmpDir(properties.getProperty("sort_tmp_dir", conf.getSortTmpDir())); @@ -1109,6 +1113,19 @@ public class IoTDBDescriptor { loadTrustedUriPattern(properties); } + private void loadFixedSizeLimitForQuery( + TrimProperties properties, String name, LongConsumer setFunction) { + long defaultValue = + Math.min( + 32 * 1024 * 1024L, + conf.getAllocateMemoryForOperators() / conf.getQueryThreadCount() / 2); + long size = Long.parseLong(properties.getProperty(name, Long.toString(defaultValue))); + if (size <= 0) { + size = defaultValue; + } + setFunction.accept(size); + } + private void reloadConsensusProps(TrimProperties properties) throws IOException { loadIoTConsensusProps(properties); loadPipeConsensusProps(properties); @@ -2042,6 +2059,9 @@ public class IoTDBDescriptor { properties.getProperty( "tvlist_sort_threshold", ConfigurationFileUtils.getConfigurationDefaultValue("tvlist_sort_threshold")))); + + loadFixedSizeLimitForQuery( + properties, "mods_cache_size_limit_per_fi_in_bytes", conf::setModsCacheSizeLimitPerFI); } catch (Exception e) { if (e instanceof InterruptedException) { Thread.currentThread().interrupt(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/MemoryEstimationHelper.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/MemoryEstimationHelper.java index a18e2dbc58b..a10fdb405a7 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/MemoryEstimationHelper.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/MemoryEstimationHelper.java @@ -23,6 +23,7 @@ import org.apache.iotdb.commons.path.AlignedPath; import org.apache.iotdb.commons.path.MeasurementPath; import org.apache.iotdb.commons.path.PartialPath; +import org.apache.tsfile.read.common.TimeRange; import org.apache.tsfile.utils.Accountable; import org.apache.tsfile.utils.RamUsageEstimator; @@ -41,6 +42,13 @@ public class MemoryEstimationHelper { private static final long MEASUREMENT_PATH_INSTANCE_SIZE = RamUsageEstimator.shallowSizeOfInstance(AlignedPath.class); + private static final long ARRAY_LIST_INSTANCE_SIZE = + RamUsageEstimator.shallowSizeOfInstance(ArrayList.class); + private static final long INTEGER_INSTANCE_SIZE = + RamUsageEstimator.shallowSizeOfInstance(Integer.class); + public static final long TIME_RANGE_INSTANCE_SIZE = + RamUsageEstimator.shallowSizeOfInstance(TimeRange.class); + private MemoryEstimationHelper() { // hide the constructor } @@ -86,4 +94,38 @@ public class MemoryEstimationHelper { } return totalSize; } + + public static long getEstimatedSizeOfMeasurementPathNodes( + @Nullable final PartialPath partialPath) { + if (partialPath == null) { + return 0; + } + long totalSize = MEASUREMENT_PATH_INSTANCE_SIZE; + String[] nodes = partialPath.getNodes(); + if (nodes != null && nodes.length > 0) { + totalSize += Arrays.stream(nodes).mapToLong(RamUsageEstimator::sizeOf).sum(); + } + return totalSize; + } + + // This method should only be called if the content in the current PartialPath comes from other + // structures whose memory cost have already been calculated. + public static long getEstimatedSizeOfCopiedPartialPath(@Nullable final PartialPath partialPath) { + if (partialPath == null) { + return 0; + } + return PARTIAL_PATH_INSTANCE_SIZE + RamUsageEstimator.shallowSizeOf(partialPath.getNodes()); + } + + public static long getEstimatedSizeOfIntegerArrayList(List<Integer> integerArrayList) { + if (integerArrayList == null) { + return 0L; + } + long size = ARRAY_LIST_INSTANCE_SIZE; + size += + (long) RamUsageEstimator.NUM_BYTES_ARRAY_HEADER + + (long) integerArrayList.size() * (long) RamUsageEstimator.NUM_BYTES_OBJECT_REF; + size += INTEGER_INSTANCE_SIZE * integerArrayList.size(); + return RamUsageEstimator.alignObjectSize(size); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java index 84797cbfad9..cb2a170d095 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java @@ -23,7 +23,10 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.exception.IoTDBException; import org.apache.iotdb.commons.path.AlignedPath; import org.apache.iotdb.commons.path.PartialPath; +import org.apache.iotdb.commons.path.PatternTreeMap; import org.apache.iotdb.commons.utils.TestOnly; +import org.apache.iotdb.db.conf.IoTDBConfig; +import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.exception.query.QueryProcessException; import org.apache.iotdb.db.queryengine.common.DeviceContext; import org.apache.iotdb.db.queryengine.common.FragmentInstanceId; @@ -37,17 +40,21 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.TimePredicate; import org.apache.iotdb.db.storageengine.StorageEngine; import org.apache.iotdb.db.storageengine.dataregion.DataRegion; import org.apache.iotdb.db.storageengine.dataregion.IDataRegionForQuery; +import org.apache.iotdb.db.storageengine.dataregion.modification.Modification; +import org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile; import org.apache.iotdb.db.storageengine.dataregion.read.IQueryDataSource; import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSource; import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSourceForRegionScan; import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSourceType; import org.apache.iotdb.db.storageengine.dataregion.read.control.FileReaderManager; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; +import org.apache.iotdb.db.utils.datastructure.PatternTreeMapFactory; import org.apache.iotdb.db.utils.datastructure.TVList; import org.apache.iotdb.mpp.rpc.thrift.TFetchFragmentInstanceStatisticsResp; import org.apache.tsfile.file.metadata.IDeviceID; import org.apache.tsfile.read.filter.basic.Filter; +import org.apache.tsfile.utils.RamUsageEstimator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -67,6 +74,7 @@ import static org.apache.iotdb.db.storageengine.dataregion.VirtualDataRegion.EMP public class FragmentInstanceContext extends QueryContext { private static final Logger LOGGER = LoggerFactory.getLogger(FragmentInstanceContext.class); + private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig(); private static final long END_TIME_INITIAL_VALUE = -1L; private final FragmentInstanceId id; @@ -79,6 +87,8 @@ public class FragmentInstanceContext extends QueryContext { // it will only be used once, after sharedQueryDataSource being inited, it will be set to null protected List<PartialPath> sourcePaths; + + private boolean singleSourcePath = false; // Used for region scan. private Map<IDeviceID, DeviceContext> devicePathsToContext; @@ -302,6 +312,64 @@ public class FragmentInstanceContext extends QueryContext { lastExecutionStartTime.set(now); } + @Override + protected boolean checkIfModificationExists(TsFileResource tsFileResource) { + if (isSingleSourcePath()) { + return tsFileResource.getModFile().exists(); + } + if (nonExistentModFiles.contains(tsFileResource.getTsFileID())) { + return false; + } + + ModificationFile modFile = tsFileResource.getModFile(); + if (!modFile.exists()) { + if (nonExistentModFiles.add(tsFileResource.getTsFileID()) + && memoryReservationManager != null) { + memoryReservationManager.reserveMemoryCumulatively(RamUsageEstimator.NUM_BYTES_OBJECT_REF); + } + return false; + } + return true; + } + + @Override + protected PatternTreeMap<Modification, PatternTreeMapFactory.ModsSerializer> getAllModifications( + TsFileResource resource) { + if (isSingleSourcePath() || memoryReservationManager == null) { + return loadAllModificationsFromDisk(resource); + } + + AtomicReference<PatternTreeMap<Modification, PatternTreeMapFactory.ModsSerializer>> + atomicReference = new AtomicReference<>(); + PatternTreeMap<Modification, PatternTreeMapFactory.ModsSerializer> cachedResult = + fileModCache.computeIfAbsent( + resource.getTsFileID(), + k -> { + PatternTreeMap<Modification, PatternTreeMapFactory.ModsSerializer> allMods = + loadAllModificationsFromDisk(resource); + atomicReference.set(allMods); + if (cachedModEntriesSize.get() >= config.getModsCacheSizeLimitPerFI()) { + return null; + } + long memCost = + RamUsageEstimator.sizeOfObject(allMods) + + RamUsageEstimator.SHALLOW_SIZE_OF_CONCURRENT_HASHMAP_ENTRY; + long alreadyUsedMemoryForCachedModEntries = cachedModEntriesSize.get(); + while (alreadyUsedMemoryForCachedModEntries + memCost + < config.getModsCacheSizeLimitPerFI()) { + if (cachedModEntriesSize.compareAndSet( + alreadyUsedMemoryForCachedModEntries, + alreadyUsedMemoryForCachedModEntries + memCost)) { + memoryReservationManager.reserveMemoryCumulatively(memCost); + return allMods; + } + alreadyUsedMemoryForCachedModEntries = cachedModEntriesSize.get(); + } + return null; + }); + return cachedResult == null ? atomicReference.get() : cachedResult; + } + // the state change listener is added here in a separate initialize() method // instead of the constructor to prevent leaking the "this" reference to // another thread, which will cause unsafe publication of this instance. @@ -445,6 +513,9 @@ public class FragmentInstanceContext extends QueryContext { public void setSourcePaths(List<PartialPath> sourcePaths) { this.sourcePaths = sourcePaths; + if (sourcePaths != null && sourcePaths.size() == 1) { + singleSourcePath = true; + } } public void setDevicePathsToContext(Map<IDeviceID, DeviceContext> devicePathsToContext) { @@ -744,6 +815,8 @@ public class FragmentInstanceContext extends QueryContext { // release TVList/AlignedTVList owned by current query releaseTVListOwnedByQuery(); + fileModCache = null; + nonExistentModFiles = null; dataRegion = null; globalTimeFilter = null; sharedQueryDataSource = null; @@ -913,4 +986,8 @@ public class FragmentInstanceContext extends QueryContext { public boolean ignoreNotExistsDevice() { return ignoreNotExistsDevice; } + + public boolean isSingleSourcePath() { + return singleSourcePath; + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/QueryContext.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/QueryContext.java index f0a210e1edf..103267dfec5 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/QueryContext.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/QueryContext.java @@ -41,6 +41,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.atomic.AtomicLong; /** QueryContext contains the shared information with in a query. */ public class QueryContext { @@ -48,13 +49,15 @@ public class QueryContext { private QueryStatistics queryStatistics = new QueryStatistics(); /** - * The key is the path of a ModificationFile and the value is all Modifications in this file. We - * use this field because each call of Modification.getModifications() return a copy of the - * Modifications, and we do not want it to create multiple copies within a query. + * The key is TsFileID and the value is all Modifications in this file. We use this field because + * each call of Modification.getModifications() return a copy of the Modifications, and we do not + * want it to create multiple copies within a query. */ - private final Map<String, PatternTreeMap<Modification, ModsSerializer>> fileModCache = + protected Map<TsFileID, PatternTreeMap<Modification, ModsSerializer>> fileModCache = new ConcurrentHashMap<>(); + protected AtomicLong cachedModEntriesSize = new AtomicLong(0); + protected long queryId; private boolean debug; @@ -64,7 +67,7 @@ public class QueryContext { private volatile boolean isInterrupted = false; - private final Set<TsFileID> nonExistentModFiles = new CopyOnWriteArraySet<>(); + protected Set<TsFileID> nonExistentModFiles = new CopyOnWriteArraySet<>(); // referenced TVLists for the query protected final Set<TVList> tvListSet = new HashSet<>(); @@ -96,18 +99,21 @@ public class QueryContext { return true; } - private PatternTreeMap<Modification, ModsSerializer> getAllModifications( - ModificationFile modFile) { + protected PatternTreeMap<Modification, ModsSerializer> getAllModifications( + TsFileResource resource) { return fileModCache.computeIfAbsent( - modFile.getFilePath(), - k -> { - PatternTreeMap<Modification, ModsSerializer> modifications = - PatternTreeMapFactory.getModsPatternTreeMap(); - for (Modification modification : modFile.getModificationsIter()) { - modifications.append(modification.getPath(), modification); - } - return modifications; - }); + resource.getTsFileID(), k -> loadAllModificationsFromDisk(resource)); + } + + public PatternTreeMap<Modification, ModsSerializer> loadAllModificationsFromDisk( + TsFileResource resource) { + PatternTreeMap<Modification, ModsSerializer> modifications = + PatternTreeMapFactory.getModsPatternTreeMap(); + Iterable<Modification> modEntryIterator = resource.getModFile().getModificationsIter(); + for (Modification modification : modEntryIterator) { + modifications.append(modification.getPath(), modification); + } + return modifications; } public List<Modification> getPathModifications( @@ -119,20 +125,16 @@ public class QueryContext { } return ModificationFile.sortAndMerge( - getAllModifications(tsFileResource.getModFile()) - .getOverlapped(new PartialPath(deviceID, measurement))); + getAllModifications(tsFileResource).getOverlapped(new PartialPath(deviceID, measurement))); } - public List<Modification> getPathModifications(TsFileResource tsFileResource, IDeviceID deviceID) + public List<Modification> getPathModifications( + PatternTreeMap<Modification, ModsSerializer> fileMods, IDeviceID deviceID) throws IllegalPathException { - // if the mods file does not exist, do not add it to the cache - if (!checkIfModificationExists(tsFileResource)) { + if (fileMods == null) { return Collections.emptyList(); } - - return ModificationFile.sortAndMerge( - getAllModifications(tsFileResource.getModFile()) - .getDeviceOverlapped(new PartialPath(deviceID))); + return ModificationFile.sortAndMerge(fileMods.getDeviceOverlapped(new PartialPath(deviceID))); } /** @@ -145,8 +147,15 @@ public class QueryContext { return Collections.emptyList(); } - return ModificationFile.sortAndMerge( - getAllModifications(tsFileResource.getModFile()).getOverlapped(path)); + return getPathModifications(getAllModifications(tsFileResource), path); + } + + public List<Modification> getPathModifications( + PatternTreeMap<Modification, ModsSerializer> fileMods, PartialPath path) { + if (fileMods == null) { + return Collections.emptyList(); + } + return ModificationFile.sortAndMerge(fileMods.getOverlapped(path)); } /** diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/Deletion.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/Deletion.java index 5ab469fde10..07f8aecbfc0 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/Deletion.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/Deletion.java @@ -21,9 +21,12 @@ package org.apache.iotdb.db.storageengine.dataregion.modification; import org.apache.iotdb.commons.exception.IllegalPathException; import org.apache.iotdb.commons.path.PartialPath; -import org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeDevicePathCache; +import org.apache.iotdb.commons.utils.PathUtils; +import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper; +import org.apache.tsfile.common.constant.TsFileConstant; import org.apache.tsfile.read.common.TimeRange; +import org.apache.tsfile.utils.RamUsageEstimator; import org.apache.tsfile.utils.ReadWriteIOUtils; import java.io.DataInputStream; @@ -34,6 +37,8 @@ import java.util.Objects; /** Deletion is a delete operation on a timeseries. */ public class Deletion extends Modification implements Cloneable { + private static final long SHALLOW_SIZE = RamUsageEstimator.shallowSizeOfInstance(Deletion.class); + /** data within the interval [startTime, endTime] are to be deleted. */ private TimeRange timeRange; @@ -117,10 +122,18 @@ public class Deletion extends Modification implements Cloneable { long startTime = stream.readLong(); long endTime = stream.readLong(); return new Deletion( - DataNodeDevicePathCache.getInstance().getPartialPath(ReadWriteIOUtils.readString(stream)), - 0, - startTime, - endTime); + getMeasurementPath(ReadWriteIOUtils.readString(stream)), 0, startTime, endTime); + } + + private static PartialPath getMeasurementPath(String path) throws IllegalPathException { + // In this place, we can be sure that the path pattern here has been checked by antlr before, so + // when conditions permit, a lighter split method can be used here. + if (path.contains(TsFileConstant.BACK_QUOTE_STRING)) { + return new PartialPath(PathUtils.splitPathToDetachedNodes(path)); + } else { + String[] nodes = path.split(TsFileConstant.PATH_SEPARATER_NO_REGEX); + return new PartialPath(nodes); + } } public long getSerializedSize() { @@ -164,4 +177,11 @@ public class Deletion extends Modification implements Cloneable { public Deletion clone() { return new Deletion(getPath(), getFileOffset(), getStartTime(), getEndTime()); } + + @Override + public long ramBytesUsed() { + return SHALLOW_SIZE + + MemoryEstimationHelper.TIME_RANGE_INSTANCE_SIZE + + MemoryEstimationHelper.getEstimatedSizeOfMeasurementPathNodes(path); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/Modification.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/Modification.java index 4ca1f11b6d5..988c1be9093 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/Modification.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/modification/Modification.java @@ -21,10 +21,12 @@ package org.apache.iotdb.db.storageengine.dataregion.modification; import org.apache.iotdb.commons.path.PartialPath; +import org.apache.tsfile.utils.Accountable; + import java.util.Objects; /** Modification represents an UPDATE or DELETE operation on a certain timeseries. */ -public abstract class Modification { +public abstract class Modification implements Accountable { protected Type type; protected PartialPath path; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/filescan/impl/ClosedFileScanHandleImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/filescan/impl/ClosedFileScanHandleImpl.java index 1bbc6dc47c4..3f690f2bae9 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/filescan/impl/ClosedFileScanHandleImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/filescan/impl/ClosedFileScanHandleImpl.java @@ -20,6 +20,8 @@ package org.apache.iotdb.db.storageengine.dataregion.read.filescan.impl; import org.apache.iotdb.commons.exception.IllegalPathException; +import org.apache.iotdb.commons.path.PartialPath; +import org.apache.iotdb.commons.path.PatternTreeMap; import org.apache.iotdb.db.queryengine.execution.fragment.QueryContext; import org.apache.iotdb.db.storageengine.dataregion.modification.Deletion; import org.apache.iotdb.db.storageengine.dataregion.modification.Modification; @@ -35,6 +37,7 @@ import org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.DeviceTimeI import org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.ITimeIndex; import org.apache.iotdb.db.storageengine.dataregion.utils.TsFileDeviceStartEndTimeIterator; import org.apache.iotdb.db.utils.ModificationUtils; +import org.apache.iotdb.db.utils.datastructure.PatternTreeMapFactory; import org.apache.tsfile.file.metadata.AlignedChunkMetadata; import org.apache.tsfile.file.metadata.IChunkMetadata; @@ -60,6 +63,7 @@ public class ClosedFileScanHandleImpl implements IFileScanHandle { private final TsFileResource tsFileResource; private final QueryContext queryContext; + private PatternTreeMap<Modification, PatternTreeMapFactory.ModsSerializer> curFileMods = null; // Used to cache the modifications of each timeseries private final Map<IDeviceID, Map<String, List<TimeRange>>> deviceToModifications; @@ -80,7 +84,11 @@ public class ClosedFileScanHandleImpl implements IFileScanHandle { @Override public boolean isDeviceTimeDeleted(IDeviceID deviceID, long timestamp) throws IllegalPathException { - List<Modification> modifications = queryContext.getPathModifications(tsFileResource, deviceID); + curFileMods = + curFileMods != null + ? curFileMods + : queryContext.loadAllModificationsFromDisk(tsFileResource); + List<Modification> modifications = queryContext.getPathModifications(curFileMods, deviceID); List<TimeRange> timeRangeList = modifications.stream() .filter(Deletion.class::isInstance) @@ -99,8 +107,12 @@ public class ClosedFileScanHandleImpl implements IFileScanHandle { return ModificationUtils.isPointDeleted(timestamp, modificationTimeRange.get(timeSeriesName)); } + curFileMods = + curFileMods != null + ? curFileMods + : queryContext.loadAllModificationsFromDisk(tsFileResource); List<Modification> modifications = - queryContext.getPathModifications(tsFileResource, deviceID, timeSeriesName); + queryContext.getPathModifications(curFileMods, new PartialPath(deviceID, timeSeriesName)); List<TimeRange> timeRangeList = modifications.stream() .filter(Deletion.class::isInstance) diff --git a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template index c9b47d56ac3..ab33ece3cfa 100644 --- a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template +++ b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template @@ -1065,6 +1065,13 @@ batch_size=100000 # Datatype: long sort_buffer_size_in_bytes=1048576 +# The maximum mod entries size that each FragmentInstance can cache. +# if mods_cache_size_limit_per_fi_in_bytes <= 0, default value will be used, default value = min(32MB, memory for query operators / query_thread_count / 2) +# if mods_cache_size_limit_per_fi_in_bytes > 0, the specified value will be used. +# effectiveMode: hot_reload +# Datatype: long +mods_cache_size_limit_per_fi_in_bytes=0 + # The threshold of operator count in the result set of EXPLAIN ANALYZE, if the number of operator in the result set is larger than this threshold, operator will be merged. # effectiveMode: hot_reload # Datatype: int diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/path/PathPatternNode.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/path/PathPatternNode.java index 80e48f2e287..89ae7444d19 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/path/PathPatternNode.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/path/PathPatternNode.java @@ -19,7 +19,9 @@ package org.apache.iotdb.commons.path; +import org.apache.tsfile.utils.Accountable; import org.apache.tsfile.utils.PublicBAOS; +import org.apache.tsfile.utils.RamUsageEstimator; import org.apache.tsfile.utils.ReadWriteIOUtils; import java.io.DataOutputStream; @@ -40,8 +42,10 @@ import java.util.function.Supplier; import static org.apache.iotdb.commons.conf.IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD; import static org.apache.iotdb.commons.conf.IoTDBConstant.ONE_LEVEL_PATH_WILDCARD; -public class PathPatternNode<V, VSerializer extends PathPatternNode.Serializer<V>> { - +public class PathPatternNode<V, VSerializer extends PathPatternNode.Serializer<V>> + implements Accountable { + private static final long SHALLOW_SIZE = + RamUsageEstimator.shallowSizeOfInstance(PathPatternNode.class); private final String name; private final Map<String, PathPatternNode<V, VSerializer>> children; private Set<V> valueSet; @@ -256,6 +260,17 @@ public class PathPatternNode<V, VSerializer extends PathPatternNode.Serializer<V return node; } + @Override + public long ramBytesUsed() { + return SHALLOW_SIZE + + RamUsageEstimator.sizeOf(name) + + RamUsageEstimator.sizeOfHashSet(valueSet) + + RamUsageEstimator.sizeOfMapWithKnownShallowSize( + children, + RamUsageEstimator.SHALLOW_SIZE_OF_HASHMAP, + RamUsageEstimator.SHALLOW_SIZE_OF_HASHMAP_ENTRY); + } + /** * Interface to support serialize and deserialize valueSet. * diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/path/PatternTreeMap.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/path/PatternTreeMap.java index dc35cb5a4c4..4726283b93c 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/path/PatternTreeMap.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/path/PatternTreeMap.java @@ -20,6 +20,9 @@ package org.apache.iotdb.commons.path; import org.apache.iotdb.commons.conf.IoTDBConstant; +import org.apache.tsfile.utils.Accountable; +import org.apache.tsfile.utils.RamUsageEstimator; + import javax.annotation.concurrent.NotThreadSafe; import java.util.ArrayList; @@ -30,7 +33,9 @@ import java.util.function.BiConsumer; import java.util.function.Supplier; @NotThreadSafe -public class PatternTreeMap<V, VSerializer extends PathPatternNode.Serializer<V>> { +public class PatternTreeMap<V, VSerializer extends PathPatternNode.Serializer<V>> + implements Accountable { + private final long SHALLOW_SIZE = RamUsageEstimator.shallowSizeOfInstance(PatternTreeMap.class); private final PathPatternNode<V, VSerializer> root; private final Supplier<? extends Set<V>> supplier; private final BiConsumer<V, Set<V>> appendFunction; @@ -246,4 +251,9 @@ public class PatternTreeMap<V, VSerializer extends PathPatternNode.Serializer<V> searchDeviceOverlapped(child, deviceNodes, pos + 1, resultSet); } } + + @Override + public long ramBytesUsed() { + return SHALLOW_SIZE + RamUsageEstimator.sizeOfObject(root); + } } diff --git a/pom.xml b/pom.xml index fd9e0faf138..20ec64f00ad 100644 --- a/pom.xml +++ b/pom.xml @@ -175,7 +175,7 @@ <thrift.version>0.14.1</thrift.version> <xz.version>1.9</xz.version> <zstd-jni.version>1.5.6-3</zstd-jni.version> - <tsfile.version>1.1.2-250703-SNAPSHOT</tsfile.version> + <tsfile.version>1.1.2-250814-SNAPSHOT</tsfile.version> </properties> <!-- if we claim dependencies in dependencyManagement, then we do not claim
