This is an automated email from the ASF dual-hosted git repository.
shuwenwei pushed a commit to branch addMemoryControlForModEntriesInQuery
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to
refs/heads/addMemoryControlForModEntriesInQuery by this push:
new 98a11c7ad91 modify compaction and region scan
98a11c7ad91 is described below
commit 98a11c7ad91f29150bfed58924edc7e29070b158
Author: shuwenwei <[email protected]>
AuthorDate: Mon Aug 11 18:41:16 2025 +0800
modify compaction and region scan
---
.../execution/fragment/QueryContext.java | 35 +++++++++++++++++-----
.../selector/estimator/CompactionTaskInfo.java | 16 +++++++++-
.../filescan/impl/ClosedFileScanHandleImpl.java | 16 ++++++++--
3 files changed, 55 insertions(+), 12 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/QueryContext.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/QueryContext.java
index bcd062768f2..52233a52789 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/QueryContext.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/QueryContext.java
@@ -105,11 +105,11 @@ public class QueryContext {
private PatternTreeMap<ModEntry, ModsSerializer>
getAllModifications(TsFileResource resource) {
if (!(this instanceof FragmentInstanceContext)) {
return fileModCache.computeIfAbsent(
- resource.getTsFileID(), k -> loadAllModifications(resource));
+ resource.getTsFileID(), k -> loadAllModificationsFromDisk(resource));
}
FragmentInstanceContext fragmentInstanceContext =
(FragmentInstanceContext) this;
if (fragmentInstanceContext.getSourcePaths().size() == 1) {
- return loadAllModifications(resource);
+ return loadAllModificationsFromDisk(resource);
}
AtomicReference<PatternTreeMap<ModEntry, ModsSerializer>> atomicReference =
@@ -118,7 +118,8 @@ public class QueryContext {
fileModCache.computeIfAbsent(
resource.getTsFileID(),
k -> {
- PatternTreeMap<ModEntry, ModsSerializer> allMods =
loadAllModifications(resource);
+ PatternTreeMap<ModEntry, ModsSerializer> allMods =
+ loadAllModificationsFromDisk(resource);
atomicReference.set(allMods);
if (cachedModEntriesSize.get() >=
config.getModsCacheSizeLimitPerFI()) {
return null;
@@ -142,7 +143,8 @@ public class QueryContext {
return cachedResult == null ? atomicReference.get() : cachedResult;
}
- private PatternTreeMap<ModEntry, ModsSerializer>
loadAllModifications(TsFileResource resource) {
+ public PatternTreeMap<ModEntry, ModsSerializer> loadAllModificationsFromDisk(
+ TsFileResource resource) {
PatternTreeMap<ModEntry, ModsSerializer> modifications =
PatternTreeMapFactory.getModsPatternTreeMap();
for (ModEntry modification : resource.getAllModEntries()) {
@@ -158,8 +160,17 @@ public class QueryContext {
return Collections.emptyList();
}
- List<ModEntry> modEntries =
- getAllModifications(tsFileResource).getOverlapped(deviceID,
measurement);
+ return getPathModifications(getAllModifications(tsFileResource), deviceID,
measurement);
+ }
+
+ public List<ModEntry> getPathModifications(
+ PatternTreeMap<ModEntry, ModsSerializer> fileModEntries,
+ IDeviceID deviceID,
+ String measurement) {
+ if (fileModEntries == null) {
+ return Collections.emptyList();
+ }
+ List<ModEntry> modEntries = fileModEntries.getOverlapped(deviceID,
measurement);
if (deviceID.isTableModel()) {
// the pattern tree has false-positive for table model deletion, so we
do a further
// filtering
@@ -179,8 +190,16 @@ public class QueryContext {
if (!checkIfModificationExists(tsFileResource)) {
return Collections.emptyList();
}
- List<ModEntry> modEntries =
- getAllModifications(tsFileResource).getOverlapped(new
PartialPath(deviceID));
+ return getPathModifications(getAllModifications(tsFileResource), deviceID);
+ }
+
+ public List<ModEntry> getPathModifications(
+ PatternTreeMap<ModEntry, ModsSerializer> fileModEntries, IDeviceID
deviceID)
+ throws IllegalPathException {
+ if (fileModEntries == null) {
+ return Collections.emptyList();
+ }
+ List<ModEntry> modEntries = fileModEntries.getOverlapped(new
PartialPath(deviceID));
if (deviceID.isTableModel()) {
// the pattern tree has false-positive for table model deletion, so we
do a further
// filtering
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/CompactionTaskInfo.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/CompactionTaskInfo.java
index e8b90f31a80..f0152f254b0 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/CompactionTaskInfo.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/CompactionTaskInfo.java
@@ -19,7 +19,12 @@
package
org.apache.iotdb.db.storageengine.dataregion.compaction.selector.estimator;
+import org.apache.iotdb.commons.path.PatternTreeMap;
+import org.apache.iotdb.db.storageengine.dataregion.modification.ModEntry;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+import org.apache.iotdb.db.utils.datastructure.PatternTreeMapFactory;
+
+import org.apache.tsfile.utils.RamUsageEstimator;
import java.util.List;
@@ -38,7 +43,7 @@ public class CompactionTaskInfo {
this.fileInfoList = fileInfoList;
this.resources = resources;
for (TsFileResource resource : resources) {
- this.modificationFileSize += resource.getTotalModSizeInByte();
+ this.modificationFileSize += getMemCostForCachedModEntries(resource);
this.totalFileSize += resource.getTsFileSize();
}
for (FileInfo fileInfo : fileInfoList) {
@@ -88,4 +93,13 @@ public class CompactionTaskInfo {
public List<TsFileResource> getResources() {
return resources;
}
+
+ private long getMemCostForCachedModEntries(TsFileResource tsFileResource) {
+ PatternTreeMap<ModEntry, PatternTreeMapFactory.ModsSerializer>
modifications =
+ PatternTreeMapFactory.getModsPatternTreeMap();
+ for (ModEntry modification : tsFileResource.getAllModEntries()) {
+ modifications.append(modification.keyOfPatternTree(), modification);
+ }
+ return RamUsageEstimator.sizeOfObject(modifications);
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/filescan/impl/ClosedFileScanHandleImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/filescan/impl/ClosedFileScanHandleImpl.java
index 3471cd4539b..6ebb6f7073b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/filescan/impl/ClosedFileScanHandleImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/read/filescan/impl/ClosedFileScanHandleImpl.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.db.storageengine.dataregion.read.filescan.impl;
import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.path.PatternTreeMap;
import org.apache.iotdb.db.queryengine.execution.fragment.QueryContext;
import org.apache.iotdb.db.storageengine.dataregion.modification.ModEntry;
import
org.apache.iotdb.db.storageengine.dataregion.read.control.FileReaderManager;
@@ -35,6 +36,7 @@ import
org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.ArrayDevice
import
org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.ITimeIndex;
import
org.apache.iotdb.db.storageengine.dataregion.utils.TsFileDeviceStartEndTimeIterator;
import org.apache.iotdb.db.utils.ModificationUtils;
+import org.apache.iotdb.db.utils.datastructure.PatternTreeMapFactory;
import org.apache.tsfile.file.metadata.AlignedChunkMetadata;
import org.apache.tsfile.file.metadata.IChunkMetadata;
@@ -60,6 +62,7 @@ public class ClosedFileScanHandleImpl implements
IFileScanHandle {
private final TsFileResource tsFileResource;
private final QueryContext queryContext;
+ private PatternTreeMap<ModEntry, PatternTreeMapFactory.ModsSerializer>
curFileModEntries = null;
// Used to cache the modifications of each timeseries
private final Map<IDeviceID, Map<String, List<TimeRange>>>
deviceToModifications;
@@ -80,7 +83,11 @@ public class ClosedFileScanHandleImpl implements
IFileScanHandle {
@Override
public boolean isDeviceTimeDeleted(IDeviceID deviceID, long timestamp)
throws IllegalPathException {
- List<ModEntry> modifications =
queryContext.getPathModifications(tsFileResource, deviceID);
+ curFileModEntries =
+ curFileModEntries != null
+ ? curFileModEntries
+ : queryContext.loadAllModificationsFromDisk(tsFileResource);
+ List<ModEntry> modifications =
queryContext.getPathModifications(curFileModEntries, deviceID);
List<TimeRange> timeRangeList =
modifications.stream().map(ModEntry::getTimeRange).collect(Collectors.toList());
return ModificationUtils.isPointDeletedWithoutOrderedRange(timestamp,
timeRangeList);
@@ -89,14 +96,17 @@ public class ClosedFileScanHandleImpl implements
IFileScanHandle {
@Override
public boolean isTimeSeriesTimeDeleted(IDeviceID deviceID, String
timeSeriesName, long timestamp)
throws IllegalPathException {
-
+ curFileModEntries =
+ curFileModEntries != null
+ ? curFileModEntries
+ : queryContext.loadAllModificationsFromDisk(tsFileResource);
Map<String, List<TimeRange>> modificationTimeRange =
deviceToModifications.get(deviceID);
if (modificationTimeRange != null &&
modificationTimeRange.containsKey(timeSeriesName)) {
return ModificationUtils.isPointDeleted(timestamp,
modificationTimeRange.get(timeSeriesName));
}
List<ModEntry> modifications =
- queryContext.getPathModifications(tsFileResource, deviceID,
timeSeriesName);
+ queryContext.getPathModifications(curFileModEntries, deviceID,
timeSeriesName);
List<TimeRange> timeRangeList =
modifications.stream().map(ModEntry::getTimeRange).collect(Collectors.toList());
TimeRange.sortAndMerge(timeRangeList);