This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch rc/1.3.5 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit aa82f6912dd4f90e9c42d7285ff807241267f7df Author: shuwenwei <[email protected]> AuthorDate: Wed Aug 20 17:05:03 2025 +0800 [to dev/1.3] Using PatternTreeMap to cache mod entries in ReadChunkCompactionPerformer (#16183) * Using PatternTreeMap to cache mod entries in ReadChunkCompactionPerformer * modify SettleSelector * fix * fix --- .../execution/fragment/QueryContext.java | 5 ++- .../performer/impl/FastCompactionPerformer.java | 5 +-- .../compaction/execute/utils/CompactionUtils.java | 28 +++++++++++++++ .../execute/utils/MultiTsFileDeviceIterator.java | 41 +++++++++------------- .../fast/FastAlignedSeriesCompactionExecutor.java | 5 +-- .../FastNonAlignedSeriesCompactionExecutor.java | 5 +-- .../executor/fast/SeriesCompactionExecutor.java | 14 +++----- .../selector/impl/SettleSelectorImpl.java | 25 +++++++------ 8 files changed, 71 insertions(+), 57 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/QueryContext.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/QueryContext.java index 103267dfec5..8d62e207766 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/QueryContext.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/QueryContext.java @@ -134,7 +134,10 @@ public class QueryContext { if (fileMods == null) { return Collections.emptyList(); } - return ModificationFile.sortAndMerge(fileMods.getDeviceOverlapped(new PartialPath(deviceID))); + + return ModificationFile.sortAndMerge( + fileMods.getOverlapped( + new PartialPath(deviceID).concatAsMeasurementPath(AlignedPath.VECTOR_PLACEHOLDER))); } /** diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/FastCompactionPerformer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/FastCompactionPerformer.java index fc8392572d2..cab0289a28c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/FastCompactionPerformer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/FastCompactionPerformer.java @@ -357,10 +357,7 @@ public class FastCompactionPerformer } // read mods PatternTreeMap<Modification, PatternTreeMapFactory.ModsSerializer> modifications = - PatternTreeMapFactory.getModsPatternTreeMap(); - for (Modification modification : resource.getModFile().getModificationsIter()) { - modifications.append(modification.getPath(), modification); - } + CompactionUtils.buildModEntryPatternTreeMap(resource); modificationCache.put(resource.getTsFile().getName(), modifications); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionUtils.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionUtils.java index d0f79cb992b..1ace2bde3a0 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionUtils.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionUtils.java @@ -21,6 +21,9 @@ package org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils; import org.apache.iotdb.commons.conf.CommonDescriptor; import org.apache.iotdb.commons.conf.IoTDBConstant; +import org.apache.iotdb.commons.exception.IllegalPathException; +import org.apache.iotdb.commons.path.PartialPath; +import org.apache.iotdb.commons.path.PatternTreeMap; import org.apache.iotdb.commons.service.metric.MetricService; import org.apache.iotdb.commons.service.metric.enums.Tag; import org.apache.iotdb.db.service.metrics.CompactionMetrics; @@ -31,6 +34,7 @@ import org.apache.iotdb.db.storageengine.dataregion.modification.Modification; import org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; import org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.DeviceTimeIndex; +import org.apache.iotdb.db.utils.datastructure.PatternTreeMapFactory; import org.apache.iotdb.metrics.utils.MetricLevel; import org.apache.iotdb.metrics.utils.SystemMetric; @@ -45,6 +49,7 @@ import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; import java.util.Collection; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -357,6 +362,29 @@ public class CompactionUtils { } } + public static PatternTreeMap<Modification, PatternTreeMapFactory.ModsSerializer> + buildModEntryPatternTreeMap(TsFileResource resource) { + PatternTreeMap<Modification, PatternTreeMapFactory.ModsSerializer> patternTreeMap = + PatternTreeMapFactory.getModsPatternTreeMap(); + Iterable<Modification> iterator = resource.getModFile().getModificationsIter(); + for (Modification modification : iterator) { + patternTreeMap.append(modification.getPath(), modification); + } + return patternTreeMap; + } + + public static List<Modification> getMatchedModifications( + PatternTreeMap<Modification, PatternTreeMapFactory.ModsSerializer> patternTreeMap, + IDeviceID deviceID, + String measurement) + throws IllegalPathException { + if (patternTreeMap == null) { + return Collections.emptyList(); + } + PartialPath path = CompactionPathUtils.getPath(deviceID, measurement); + return ModificationFile.sortAndMerge(patternTreeMap.getOverlapped(path)); + } + public static boolean isDiskHasSpace() { return isDiskHasSpace(0d); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/MultiTsFileDeviceIterator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/MultiTsFileDeviceIterator.java index a53d8a40bdb..1f66db15151 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/MultiTsFileDeviceIterator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/MultiTsFileDeviceIterator.java @@ -22,16 +22,17 @@ package org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils; import org.apache.iotdb.commons.conf.IoTDBConstant; import org.apache.iotdb.commons.exception.IllegalPathException; import org.apache.iotdb.commons.path.PartialPath; +import org.apache.iotdb.commons.path.PatternTreeMap; import org.apache.iotdb.commons.utils.CommonDateTimeUtils; import org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeTTLCache; import org.apache.iotdb.db.storageengine.dataregion.compaction.io.CompactionTsFileReader; import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.constant.CompactionType; import org.apache.iotdb.db.storageengine.dataregion.modification.Deletion; import org.apache.iotdb.db.storageengine.dataregion.modification.Modification; -import org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile; import org.apache.iotdb.db.storageengine.dataregion.read.control.FileReaderManager; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; import org.apache.iotdb.db.utils.ModificationUtils; +import org.apache.iotdb.db.utils.datastructure.PatternTreeMapFactory; import org.apache.tsfile.enums.TSDataType; import org.apache.tsfile.file.metadata.AlignedChunkMetadata; @@ -69,7 +70,9 @@ public class MultiTsFileDeviceIterator implements AutoCloseable { private List<TsFileResource> tsFileResourcesSortedByAsc; private Map<TsFileResource, TsFileSequenceReader> readerMap = new HashMap<>(); private final Map<TsFileResource, TsFileDeviceIterator> deviceIteratorMap = new HashMap<>(); - private final Map<TsFileResource, List<Modification>> modificationCache = new HashMap<>(); + private final Map< + TsFileResource, PatternTreeMap<Modification, PatternTreeMapFactory.ModsSerializer>> + modificationCache = new HashMap<>(); private Pair<IDeviceID, Boolean> currentDevice = null; private long ttlForCurrentDevice; private long timeLowerBoundForCurrentDevice; @@ -432,10 +435,9 @@ public class MultiTsFileDeviceIterator implements AutoCloseable { timeLowerBoundForCurrentDevice); } - List<Modification> modifications = + PatternTreeMap<Modification, PatternTreeMapFactory.ModsSerializer> modifications = modificationCache.computeIfAbsent( - tsFileResource, - r -> new LinkedList<>(ModificationFile.getNormalMods(r).getModifications())); + tsFileResource, CompactionUtils::buildModEntryPatternTreeMap); // construct the input params List<List<Modification>> for QueryUtils.modifyAlignedChunkMetaData AlignedChunkMetadata alignedChunkMetadata = alignedChunkMetadataList.get(0); @@ -446,15 +448,9 @@ public class MultiTsFileDeviceIterator implements AutoCloseable { modificationForCurDevice.add(Collections.emptyList()); continue; } - List<Modification> modificationList = new ArrayList<>(); - PartialPath path = - CompactionPathUtils.getPath( - currentDevice.getLeft(), valueChunkMetadata.getMeasurementUid()); - for (Modification modification : modifications) { - if (modification.getPath().matchFullPath(path)) { - modificationList.add(modification); - } - } + List<Modification> modificationList = + CompactionUtils.getMatchedModifications( + modifications, device, valueChunkMetadata.getMeasurementUid()); if (ttlDeletion != null) { modificationList.add(ttlDeletion); } @@ -662,17 +658,14 @@ public class MultiTsFileDeviceIterator implements AutoCloseable { chunkMetadataListMap.get(currentCompactingSeries); chunkMetadataListMap.remove(currentCompactingSeries); - List<Modification> modificationsInThisResource = - modificationCache.computeIfAbsent( - resource, - r -> new LinkedList<>(ModificationFile.getNormalMods(r).getModifications())); - LinkedList<Modification> modificationForCurrentSeries = new LinkedList<>(); + PatternTreeMap<Modification, PatternTreeMapFactory.ModsSerializer> + modificationsInThisResource = + modificationCache.computeIfAbsent( + resource, CompactionUtils::buildModEntryPatternTreeMap); // collect the modifications for current series - for (Modification modification : modificationsInThisResource) { - if (modification.getPath().matchFullPath(path)) { - modificationForCurrentSeries.add(modification); - } - } + List<Modification> modificationForCurrentSeries = + CompactionUtils.getMatchedModifications( + modificationsInThisResource, device, currentCompactingSeries); // add ttl deletion for current series if (ttlDeletion != null) { modificationForCurrentSeries.add(ttlDeletion); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/FastAlignedSeriesCompactionExecutor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/FastAlignedSeriesCompactionExecutor.java index fa5f53f4209..edf127b20de 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/FastAlignedSeriesCompactionExecutor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/FastAlignedSeriesCompactionExecutor.java @@ -23,7 +23,6 @@ import org.apache.iotdb.commons.exception.IllegalPathException; import org.apache.iotdb.commons.path.PatternTreeMap; import org.apache.iotdb.db.exception.WriteProcessException; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.subtask.FastCompactionTaskSummary; -import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.CompactionPathUtils; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.ModifiedStatus; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.batch.utils.AlignedSeriesBatchCompactionUtils; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.fast.element.AlignedPageElement; @@ -242,9 +241,7 @@ public class FastAlignedSeriesCompactionExecutor extends SeriesCompactionExecuto valueModifications.add(null); } else { valueModifications.add( - getModificationsFromCache( - resource, - CompactionPathUtils.getPath(deviceId, x.getMeasurementUid()))); + getModificationsFromCache(resource, deviceId, x.getMeasurementUid())); } } catch (IllegalPathException e) { throw new RuntimeException(e); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/FastNonAlignedSeriesCompactionExecutor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/FastNonAlignedSeriesCompactionExecutor.java index ab7d509c0ec..860ca3927d6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/FastNonAlignedSeriesCompactionExecutor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/FastNonAlignedSeriesCompactionExecutor.java @@ -23,7 +23,6 @@ import org.apache.iotdb.commons.exception.IllegalPathException; import org.apache.iotdb.commons.path.PatternTreeMap; import org.apache.iotdb.db.exception.WriteProcessException; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.subtask.FastCompactionTaskSummary; -import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.CompactionPathUtils; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.ModifiedStatus; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.fast.element.ChunkMetadataElement; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.fast.element.FileElement; @@ -147,9 +146,7 @@ public class FastNonAlignedSeriesCompactionExecutor extends SeriesCompactionExec ModificationUtils.modifyChunkMetaData( iChunkMetadataList, getModificationsFromCache( - resource, - CompactionPathUtils.getPath( - deviceId, iChunkMetadataList.get(0).getMeasurementUid()))); + resource, deviceId, iChunkMetadataList.get(0).getMeasurementUid())); if (iChunkMetadataList.isEmpty()) { // all chunks has been deleted in this file, just remove it removeFile(fileElement); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/SeriesCompactionExecutor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/SeriesCompactionExecutor.java index a44453645c0..9bf2e43d4cb 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/SeriesCompactionExecutor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/fast/SeriesCompactionExecutor.java @@ -20,10 +20,10 @@ package org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.fast; import org.apache.iotdb.commons.exception.IllegalPathException; -import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.commons.path.PatternTreeMap; import org.apache.iotdb.db.exception.WriteProcessException; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.subtask.FastCompactionTaskSummary; +import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.CompactionUtils; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.ModifiedStatus; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.fast.element.ChunkMetadataElement; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.fast.element.FileElement; @@ -31,7 +31,6 @@ import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.exe import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.reader.PointPriorityReader; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.writer.AbstractCompactionWriter; import org.apache.iotdb.db.storageengine.dataregion.modification.Modification; -import org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; import org.apache.iotdb.db.utils.datastructure.PatternTreeMapFactory; @@ -46,7 +45,6 @@ import org.apache.tsfile.read.common.TimeRange; import java.io.IOException; import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Optional; @@ -474,17 +472,13 @@ public abstract class SeriesCompactionExecutor { /** * Get the modifications of a timeseries in the ModificationFile of a TsFile. Create ttl * modification from ttl cache. - * - * @param path name of the time series */ protected List<Modification> getModificationsFromCache( - TsFileResource tsFileResource, PartialPath path) { + TsFileResource tsFileResource, IDeviceID deviceId, String measurement) + throws IllegalPathException { PatternTreeMap<Modification, PatternTreeMapFactory.ModsSerializer> allModifications = modificationCacheMap.get(tsFileResource.getTsFile().getName()); - if (allModifications == null) { - return Collections.emptyList(); - } - return ModificationFile.sortAndMerge(allModifications.getOverlapped(path)); + return CompactionUtils.getMatchedModifications(allModifications, deviceId, measurement); } @SuppressWarnings("squid:S3776") diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/impl/SettleSelectorImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/impl/SettleSelectorImpl.java index 9a5dff0afea..de81d4f4f7a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/impl/SettleSelectorImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/impl/SettleSelectorImpl.java @@ -20,7 +20,8 @@ package org.apache.iotdb.db.storageengine.dataregion.compaction.selector.impl; import org.apache.iotdb.commons.conf.IoTDBConstant; import org.apache.iotdb.commons.exception.IllegalPathException; -import org.apache.iotdb.commons.path.PartialPath; +import org.apache.iotdb.commons.path.AlignedPath; +import org.apache.iotdb.commons.path.PatternTreeMap; import org.apache.iotdb.commons.utils.CommonDateTimeUtils; import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; @@ -38,6 +39,7 @@ import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus; import org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.DeviceTimeIndex; import org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.FileTimeIndex; import org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.ITimeIndex; +import org.apache.iotdb.db.utils.datastructure.PatternTreeMapFactory; import org.apache.tsfile.file.metadata.IDeviceID; import org.apache.tsfile.file.metadata.PlainDeviceID; @@ -46,7 +48,6 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.ArrayList; -import java.util.Collection; import java.util.Collections; import java.util.HashSet; import java.util.List; @@ -213,9 +214,9 @@ public class SettleSelectorImpl implements ISettleSelector { * * @return dirty status means the status of current resource. */ + @SuppressWarnings("OptionalGetWithoutIsPresent") // iterating the index, must present private FileDirtyInfo selectFileBaseOnDirtyData(TsFileResource resource) throws IOException, IllegalPathException { - ModificationFile modFile = resource.getModFile(); ITimeIndex timeIndex = resource.getTimeIndex(); if (timeIndex instanceof FileTimeIndex) { timeIndex = CompactionUtils.buildDeviceTimeIndex(resource); @@ -224,7 +225,8 @@ public class SettleSelectorImpl implements ISettleSelector { boolean hasExpiredTooLong = false; long currentTime = CommonDateTimeUtils.currentTime(); - Collection<Modification> modifications = modFile.getModifications(); + PatternTreeMap<Modification, PatternTreeMapFactory.ModsSerializer> modifications = + CompactionUtils.buildModEntryPatternTreeMap(resource); for (IDeviceID device : ((DeviceTimeIndex) timeIndex).getDevices()) { // check expired device by ttl // TODO: remove deviceId conversion @@ -279,13 +281,16 @@ public class SettleSelectorImpl implements ISettleSelector { /** Check whether the device is completely deleted by mods or not. */ private boolean isDeviceDeletedByMods( - Collection<Modification> modifications, IDeviceID device, long startTime, long endTime) + PatternTreeMap<Modification, PatternTreeMapFactory.ModsSerializer> modifications, + IDeviceID device, + long startTime, + long endTime) throws IllegalPathException { - for (Modification modification : modifications) { - PartialPath path = modification.getPath(); - if (path.endWithMultiLevelWildcard() - && path.getDevicePath().matchFullPath(new PartialPath(device)) - && ((Deletion) modification).getTimeRange().contains(startTime, endTime)) { + List<Modification> deviceModifications = + CompactionUtils.getMatchedModifications( + modifications, device, AlignedPath.VECTOR_PLACEHOLDER); + for (Modification modification : deviceModifications) { + if (((Deletion) modification).getTimeRange().contains(startTime, endTime)) { return true; } }
