This is an automated email from the ASF dual-hosted git repository. shuwenwei pushed a commit to branch fixEstimatedSizeForStringType-1.3 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 4d718d1c6451fab2e6c1710939e4dd0b2c003619 Author: shuwenwei <[email protected]> AuthorDate: Mon Apr 28 09:45:10 2025 +0800 estimating inner compaction task memory during selection (#15257) * estimating inner compaction task during selection * use tsfile id * rename * spotless * modify CompactionEstimateUtils * fix bug * fix compile * fix ut * fix bug * fix ut * fix review * modify CompactionEstimateUtils * modify ut * fix ut * add log * add log --- .../performer/ICrossCompactionPerformer.java | 6 + ...rformer.java => IInnerCompactionPerformer.java} | 11 +- .../execute/performer/ISeqCompactionPerformer.java | 2 +- .../performer/IUnseqCompactionPerformer.java | 2 +- .../performer/impl/FastCompactionPerformer.java | 21 ++++ .../impl/ReadChunkCompactionPerformer.java | 8 ++ .../impl/ReadPointCompactionPerformer.java | 8 ++ .../execute/task/AbstractCompactionTask.java | 10 ++ .../execute/task/InnerSpaceCompactionTask.java | 18 +-- .../schedule/CompactionScheduleContext.java | 24 ++++ .../compaction/schedule/CompactionScheduler.java | 5 +- .../estimator/AbstractCompactionEstimator.java | 124 ++++++++++++++++++--- .../estimator/AbstractCrossSpaceEstimator.java | 6 +- .../estimator/AbstractInnerSpaceEstimator.java | 5 +- .../estimator/CompactionEstimateUtils.java | 91 +++++++++++++-- .../selector/estimator/CompactionTaskInfo.java | 6 - ...taInfo.java => CompactionTaskMetadataInfo.java} | 12 +- .../FastCompactionInnerCompactionEstimator.java | 44 ++++++-- .../FastCrossSpaceCompactionEstimator.java | 29 +++-- .../compaction/selector/estimator/FileInfo.java | 39 ++++++- .../ReadChunkInnerCompactionEstimator.java | 25 +++-- .../RepairUnsortedFileCompactionEstimator.java | 21 +++- .../impl/NewSizeTieredCompactionSelector.java | 76 ++++++++++++- .../impl/RewriteCrossSpaceCompactionSelector.java | 2 +- .../utils/CompactionTaskMemCostEstimatorTest.java | 87 +++++++++------ 25 files changed, 552 insertions(+), 130 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/ICrossCompactionPerformer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/ICrossCompactionPerformer.java index 9aad2d0b5e4..d9b9f9fbf09 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/ICrossCompactionPerformer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/ICrossCompactionPerformer.java @@ -19,11 +19,17 @@ package org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer; +import org.apache.iotdb.db.storageengine.dataregion.compaction.selector.estimator.AbstractCrossSpaceEstimator; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; import java.util.List; +import java.util.Optional; public interface ICrossCompactionPerformer extends ICompactionPerformer { @Override void setSourceFiles(List<TsFileResource> seqFiles, List<TsFileResource> unseqFiles); + + default Optional<AbstractCrossSpaceEstimator> getCrossSpaceEstimator() { + return Optional.empty(); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/ICrossCompactionPerformer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/IInnerCompactionPerformer.java similarity index 74% copy from iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/ICrossCompactionPerformer.java copy to iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/IInnerCompactionPerformer.java index 9aad2d0b5e4..04118a1475b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/ICrossCompactionPerformer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/IInnerCompactionPerformer.java @@ -19,11 +19,12 @@ package org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer; -import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; +import org.apache.iotdb.db.storageengine.dataregion.compaction.selector.estimator.AbstractInnerSpaceEstimator; -import java.util.List; +import java.util.Optional; -public interface ICrossCompactionPerformer extends ICompactionPerformer { - @Override - void setSourceFiles(List<TsFileResource> seqFiles, List<TsFileResource> unseqFiles); +public interface IInnerCompactionPerformer extends ICompactionPerformer { + default Optional<AbstractInnerSpaceEstimator> getInnerSpaceEstimator() { + return Optional.empty(); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/ISeqCompactionPerformer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/ISeqCompactionPerformer.java index 30d7264fdcd..3416d587d05 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/ISeqCompactionPerformer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/ISeqCompactionPerformer.java @@ -23,7 +23,7 @@ import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; import java.util.List; -public interface ISeqCompactionPerformer extends ICompactionPerformer { +public interface ISeqCompactionPerformer extends IInnerCompactionPerformer { @Override void setSourceFiles(List<TsFileResource> seqFiles); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/IUnseqCompactionPerformer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/IUnseqCompactionPerformer.java index 9c9e11538b1..686ec14fe72 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/IUnseqCompactionPerformer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/IUnseqCompactionPerformer.java @@ -23,7 +23,7 @@ import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; import java.util.List; -public interface IUnseqCompactionPerformer extends ICompactionPerformer { +public interface IUnseqCompactionPerformer extends IInnerCompactionPerformer { @Override void setSourceFiles(List<TsFileResource> unseqFiles); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/FastCompactionPerformer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/FastCompactionPerformer.java index fc8392572d2..b8347c5b91e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/FastCompactionPerformer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/FastCompactionPerformer.java @@ -39,6 +39,10 @@ import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.wri import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.writer.FastCrossCompactionWriter; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.writer.FastInnerCompactionWriter; import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionTaskManager; +import org.apache.iotdb.db.storageengine.dataregion.compaction.selector.estimator.AbstractCrossSpaceEstimator; +import org.apache.iotdb.db.storageengine.dataregion.compaction.selector.estimator.AbstractInnerSpaceEstimator; +import org.apache.iotdb.db.storageengine.dataregion.compaction.selector.estimator.FastCompactionInnerCompactionEstimator; +import org.apache.iotdb.db.storageengine.dataregion.compaction.selector.estimator.FastCrossSpaceCompactionEstimator; import org.apache.iotdb.db.storageengine.dataregion.modification.Deletion; import org.apache.iotdb.db.storageengine.dataregion.modification.Modification; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; @@ -61,6 +65,7 @@ import java.util.Comparator; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; @@ -364,4 +369,20 @@ public class FastCompactionPerformer modificationCache.put(resource.getTsFile().getName(), modifications); } } + + public String getDatabaseName() { + return !seqFiles.isEmpty() + ? seqFiles.get(0).getDatabaseName() + : unseqFiles.get(0).getDatabaseName(); + } + + @Override + public Optional<AbstractInnerSpaceEstimator> getInnerSpaceEstimator() { + return Optional.of(new FastCompactionInnerCompactionEstimator()); + } + + @Override + public Optional<AbstractCrossSpaceEstimator> getCrossSpaceEstimator() { + return Optional.of(new FastCrossSpaceCompactionEstimator()); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadChunkCompactionPerformer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadChunkCompactionPerformer.java index 670f91961c6..f790d4e606c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadChunkCompactionPerformer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadChunkCompactionPerformer.java @@ -30,6 +30,8 @@ import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.exe import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.readchunk.SingleSeriesCompactionExecutor; import org.apache.iotdb.db.storageengine.dataregion.compaction.io.CompactionTsFileWriter; import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.constant.CompactionType; +import org.apache.iotdb.db.storageengine.dataregion.compaction.selector.estimator.AbstractInnerSpaceEstimator; +import org.apache.iotdb.db.storageengine.dataregion.compaction.selector.estimator.ReadChunkInnerCompactionEstimator; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; import org.apache.iotdb.db.storageengine.rescon.memory.SystemInfo; @@ -45,6 +47,7 @@ import java.io.IOException; import java.util.Collections; import java.util.LinkedList; import java.util.List; +import java.util.Optional; public class ReadChunkCompactionPerformer implements ISeqCompactionPerformer { private List<TsFileResource> seqFiles; @@ -286,4 +289,9 @@ public class ReadChunkCompactionPerformer implements ISeqCompactionPerformer { public void setSourceFiles(List<TsFileResource> seqFiles) { this.seqFiles = seqFiles; } + + @Override + public Optional<AbstractInnerSpaceEstimator> getInnerSpaceEstimator() { + return Optional.of(new ReadChunkInnerCompactionEstimator()); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadPointCompactionPerformer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadPointCompactionPerformer.java index 064cdecf841..eb868a3a714 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadPointCompactionPerformer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadPointCompactionPerformer.java @@ -38,6 +38,8 @@ import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.wri import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.writer.ReadPointCrossCompactionWriter; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.writer.ReadPointInnerCompactionWriter; import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionTaskManager; +import org.apache.iotdb.db.storageengine.dataregion.compaction.selector.estimator.AbstractInnerSpaceEstimator; +import org.apache.iotdb.db.storageengine.dataregion.compaction.selector.estimator.RepairUnsortedFileCompactionEstimator; import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSource; import org.apache.iotdb.db.storageengine.dataregion.read.control.QueryResourceManager; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; @@ -61,6 +63,7 @@ import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.stream.Collectors; @@ -323,4 +326,9 @@ public class ReadPointCompactionPerformer public void setSourceFiles(List<TsFileResource> unseqFiles) { this.unseqFiles = unseqFiles; } + + @Override + public Optional<AbstractInnerSpaceEstimator> getInnerSpaceEstimator() { + return Optional.of(new RepairUnsortedFileCompactionEstimator()); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/AbstractCompactionTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/AbstractCompactionTask.java index f58f2d0ef3e..8d7bbd2d14d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/AbstractCompactionTask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/AbstractCompactionTask.java @@ -74,6 +74,7 @@ public abstract class AbstractCompactionTask { protected CompactionTaskSummary summary; protected long serialId; protected CompactionTaskStage taskStage; + protected long roughMemoryCost = -1L; protected long memoryCost = 0L; protected boolean recoverMemoryStatus; @@ -256,6 +257,15 @@ public abstract class AbstractCompactionTask { } } + @TestOnly + public long getRoughMemoryCost() { + return roughMemoryCost; + } + + public void setRoughMemoryCost(long memoryCost) { + this.roughMemoryCost = memoryCost; + } + public abstract long getEstimatedMemoryCost(); public abstract int getProcessedFileNum(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java index 25674209023..a9dfce2bad3 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java @@ -28,8 +28,8 @@ import org.apache.iotdb.db.storageengine.dataregion.compaction.constant.Compacti import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.CompactionRecoverException; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.CompactionSourceFileDeletedException; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.ICompactionPerformer; +import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.IInnerCompactionPerformer; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl.FastCompactionPerformer; -import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl.ReadChunkCompactionPerformer; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.subtask.FastCompactionTaskSummary; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.CompactionUtils; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.CompactionLogAnalyzer; @@ -39,8 +39,6 @@ import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionTaskManager; import org.apache.iotdb.db.storageengine.dataregion.compaction.selector.estimator.AbstractInnerSpaceEstimator; import org.apache.iotdb.db.storageengine.dataregion.compaction.selector.estimator.CompactionEstimateUtils; -import org.apache.iotdb.db.storageengine.dataregion.compaction.selector.estimator.FastCompactionInnerCompactionEstimator; -import org.apache.iotdb.db.storageengine.dataregion.compaction.selector.estimator.ReadChunkInnerCompactionEstimator; import org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileManager; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; @@ -674,20 +672,14 @@ public class InnerSpaceCompactionTask extends AbstractCompactionTask { @Override public long getEstimatedMemoryCost() { if (innerSpaceEstimator == null) { - if (this.performer instanceof ReadChunkCompactionPerformer) { - innerSpaceEstimator = new ReadChunkInnerCompactionEstimator(); - } else if (this.performer instanceof FastCompactionPerformer) { - innerSpaceEstimator = new FastCompactionInnerCompactionEstimator(); - } + innerSpaceEstimator = + ((IInnerCompactionPerformer) this.performer).getInnerSpaceEstimator().orElse(null); } if (innerSpaceEstimator != null && memoryCost == 0L) { try { - long roughEstimatedMemoryCost = - innerSpaceEstimator.roughEstimateInnerCompactionMemory( - filesView.sourceFilesInCompactionPerformer); memoryCost = - CompactionEstimateUtils.shouldUseRoughEstimatedResult(roughEstimatedMemoryCost) - ? roughEstimatedMemoryCost + CompactionEstimateUtils.shouldUseRoughEstimatedResult(roughMemoryCost) + ? roughMemoryCost : innerSpaceEstimator.estimateInnerCompactionMemory( filesView.sourceFilesInCompactionPerformer); } catch (CompactionSourceFileDeletedException e) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduleContext.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduleContext.java index d1b9f134527..6f215ab579a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduleContext.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduleContext.java @@ -19,8 +19,12 @@ package org.apache.iotdb.db.storageengine.dataregion.compaction.schedule; +import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.service.metrics.CompactionMetrics; import org.apache.iotdb.db.storageengine.dataregion.compaction.constant.CompactionTaskType; +import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.ICrossCompactionPerformer; +import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.ISeqCompactionPerformer; +import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.IUnseqCompactionPerformer; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.AbstractCompactionTask; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.SettleCompactionTask; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; @@ -169,4 +173,24 @@ public class CompactionScheduleContext { public int getPartiallyDirtyFileNum() { return partiallyDirtyFileNum; } + + public ISeqCompactionPerformer getSeqCompactionPerformer() { + return IoTDBDescriptor.getInstance() + .getConfig() + .getInnerSeqCompactionPerformer() + .createInstance(); + } + + public IUnseqCompactionPerformer getUnseqCompactionPerformer() { + IUnseqCompactionPerformer unseqCompactionPerformer = + IoTDBDescriptor.getInstance() + .getConfig() + .getInnerUnseqCompactionPerformer() + .createInstance(); + return unseqCompactionPerformer; + } + + public ICrossCompactionPerformer getCrossCompactionPerformer() { + return IoTDBDescriptor.getInstance().getConfig().getCrossCompactionPerformer().createInstance(); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduler.java index f2c8e789ab1..4f762ad4306 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduler.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduler.java @@ -301,10 +301,7 @@ public class CompactionScheduler { tsFileManager, taskList.get(i).getSeqFiles(), taskList.get(i).getUnseqFiles(), - IoTDBDescriptor.getInstance() - .getConfig() - .getCrossCompactionPerformer() - .createInstance(), + context.getCrossCompactionPerformer(), memoryCost.get(i), tsFileManager.getNextCompactionTaskId()); task.setCompactionConfigVersion(compactionConfigVersionWhenSelectTask); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/AbstractCompactionEstimator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/AbstractCompactionEstimator.java index fe73090b13c..8e46c0f80fd 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/AbstractCompactionEstimator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/AbstractCompactionEstimator.java @@ -19,10 +19,13 @@ package org.apache.iotdb.db.storageengine.dataregion.compaction.selector.estimator; +import org.apache.iotdb.commons.utils.TestOnly; import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.CompactionUtils; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.batch.utils.BatchCompactionPlan; +import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionScheduleContext; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileID; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus; import org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.DeviceTimeIndex; @@ -36,7 +39,8 @@ import org.apache.tsfile.common.conf.TSFileDescriptor; import org.apache.tsfile.file.metadata.IDeviceID; import org.apache.tsfile.read.TsFileSequenceReader; -import java.io.File; +import javax.annotation.Nullable; + import java.io.IOException; import java.util.ArrayList; import java.util.Collections; @@ -55,14 +59,42 @@ import java.util.stream.Collectors; @SuppressWarnings("OptionalGetWithoutIsPresent") public abstract class AbstractCompactionEstimator { - private static final Map<File, FileInfo> globalFileInfoCacheForFailedCompaction = - Collections.synchronizedMap( - new LRUMap<>( - IoTDBDescriptor.getInstance().getConfig().getGlobalCompactionFileInfoCacheSize())); + /** The size of global compaction estimation file info cahce. */ + private static int globalCompactionFileInfoCacheSize = 1000; + + /** The size of global compaction estimation rough file info cahce. */ + private static int globalCompactionRoughFileInfoCacheSize = 100000; + + private static final double maxRatioToAllocateFileInfoCache = 0.1; + private static boolean globalFileInfoCacheEnabled; + private static Map<TsFileID, FileInfo> globalFileInfoCacheForFailedCompaction; + private static Map<TsFileID, FileInfo.RoughFileInfo> globalRoughInfoCacheForCompaction; + + protected IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig(); + + public static long allocateMemoryCostForFileInfoCache(long compactionMemorySize) { + long fixedMemoryCost = + globalCompactionFileInfoCacheSize * FileInfo.MEMORY_COST_OF_FILE_INFO_ENTRY_IN_CACHE + + globalCompactionRoughFileInfoCacheSize + * FileInfo.MEMORY_COST_OF_ROUGH_FILE_INFO_ENTRY_IN_CACHE; + globalFileInfoCacheEnabled = + compactionMemorySize * maxRatioToAllocateFileInfoCache > fixedMemoryCost; + if (globalFileInfoCacheEnabled) { + globalRoughInfoCacheForCompaction = + Collections.synchronizedMap(new LRUMap<>(globalCompactionFileInfoCacheSize)); + globalFileInfoCacheForFailedCompaction = + Collections.synchronizedMap(new LRUMap<>(globalCompactionRoughFileInfoCacheSize)); + } else { + globalRoughInfoCacheForCompaction = Collections.emptyMap(); + globalFileInfoCacheForFailedCompaction = Collections.emptyMap(); + } + return globalFileInfoCacheEnabled ? fixedMemoryCost : 0; + } + protected Map<TsFileResource, FileInfo> fileInfoCache = new HashMap<>(); + protected Map<TsFileResource, FileInfo.RoughFileInfo> roughInfoMap = new HashMap<>(); protected Map<TsFileResource, DeviceTimeIndex> deviceTimeIndexCache = new HashMap<>(); - protected IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig(); protected TSFileConfig tsFileConfig = TSFileDescriptor.getInstance().getConfig(); protected long fixedMemoryBudget = (long) @@ -100,10 +132,10 @@ public abstract class AbstractCompactionEstimator { if (fileInfoCache.containsKey(resource)) { return fileInfoCache.get(resource); } - File file = new File(resource.getTsFilePath()); + TsFileID tsFileID = resource.getTsFileID(); synchronized (globalFileInfoCacheForFailedCompaction) { - if (globalFileInfoCacheForFailedCompaction.containsKey(file)) { - FileInfo fileInfo = globalFileInfoCacheForFailedCompaction.get(file); + FileInfo fileInfo = globalFileInfoCacheForFailedCompaction.get(tsFileID); + if (fileInfo != null) { fileInfoCache.put(resource, fileInfo); return fileInfo; } @@ -111,19 +143,26 @@ public abstract class AbstractCompactionEstimator { try (TsFileSequenceReader reader = getReader(resource.getTsFilePath())) { FileInfo fileInfo = CompactionEstimateUtils.calculateFileInfo(reader); fileInfoCache.put(resource, fileInfo); - synchronized (globalFileInfoCacheForFailedCompaction) { - globalFileInfoCacheForFailedCompaction.put(file, fileInfo); + if (globalFileInfoCacheEnabled) { + synchronized (globalFileInfoCacheForFailedCompaction) { + globalFileInfoCacheForFailedCompaction.put(tsFileID, fileInfo); + } + synchronized (globalRoughInfoCacheForCompaction) { + globalRoughInfoCacheForCompaction.put(tsFileID, fileInfo.getSimpleFileInfo()); + } } return fileInfo; } } - protected int calculatingMaxOverlapFileNumInSubCompactionTask(List<TsFileResource> resources) + @SuppressWarnings("OptionalGetWithoutIsPresent") + protected int calculatingMaxOverlapFileNumInSubCompactionTask( + @Nullable CompactionScheduleContext context, List<TsFileResource> resources) throws IOException { Set<IDeviceID> devices = new HashSet<>(); List<DeviceTimeIndex> resourceDevices = new ArrayList<>(resources.size()); for (TsFileResource resource : resources) { - DeviceTimeIndex deviceTimeIndex = getDeviceTimeIndexFromCache(resource); + DeviceTimeIndex deviceTimeIndex = getDeviceTimeIndexFromCache(context, resource); devices.addAll(deviceTimeIndex.getDevices()); resourceDevices.add(deviceTimeIndex); } @@ -166,10 +205,18 @@ public abstract class AbstractCompactionEstimator { return maxOverlapFileNumInSubCompactionTask; } - private DeviceTimeIndex getDeviceTimeIndexFromCache(TsFileResource resource) throws IOException { + private DeviceTimeIndex getDeviceTimeIndexFromCache( + @Nullable CompactionScheduleContext context, TsFileResource resource) throws IOException { if (deviceTimeIndexCache.containsKey(resource)) { return deviceTimeIndexCache.get(resource); } + if (context != null) { + DeviceTimeIndex timeIndex = context.getResourceDeviceInfo(resource); + if (timeIndex != null) { + deviceTimeIndexCache.put(resource, timeIndex); + return timeIndex; + } + } ITimeIndex timeIndex = resource.getTimeIndex(); if (timeIndex instanceof FileTimeIndex) { timeIndex = CompactionUtils.buildDeviceTimeIndex(resource); @@ -183,12 +230,57 @@ public abstract class AbstractCompactionEstimator { fileInfoCache.clear(); } + public boolean hasCachedRoughFileInfo(TsFileResource resource) { + return getRoughFileInfo(resource) != null; + } + + public FileInfo.RoughFileInfo getRoughFileInfo(TsFileResource resource) { + FileInfo.RoughFileInfo roughFileInfo = roughInfoMap.get(resource); + if (roughFileInfo != null) { + return roughFileInfo; + } + synchronized (globalRoughInfoCacheForCompaction) { + roughFileInfo = globalRoughInfoCacheForCompaction.get(resource.getTsFileID()); + } + if (roughFileInfo != null) { + roughInfoMap.put(resource, roughFileInfo); + } + return roughFileInfo; + } + public static void removeFileInfoFromGlobalFileInfoCache(TsFileResource resource) { if (resource == null || resource.getTsFile() == null) { return; } - synchronized (globalFileInfoCacheForFailedCompaction) { - globalFileInfoCacheForFailedCompaction.remove(resource.getTsFile()); + if (globalFileInfoCacheEnabled) { + synchronized (globalFileInfoCacheForFailedCompaction) { + globalFileInfoCacheForFailedCompaction.remove(resource.getTsFileID()); + } + synchronized (globalRoughInfoCacheForCompaction) { + globalRoughInfoCacheForCompaction.remove(resource.getTsFileID()); + } } } + + @TestOnly + public static void enableFileInfoCacheForTest( + int globalCompactionFileInfoCacheSize, int globalCompactionRoughFileInfoCacheSize) { + globalFileInfoCacheEnabled = true; + globalRoughInfoCacheForCompaction = + Collections.synchronizedMap(new LRUMap<>(globalCompactionFileInfoCacheSize)); + globalFileInfoCacheForFailedCompaction = + Collections.synchronizedMap(new LRUMap<>(globalCompactionRoughFileInfoCacheSize)); + } + + @TestOnly + public static void disableFileInfoCacheForTest() { + globalFileInfoCacheEnabled = false; + globalRoughInfoCacheForCompaction = Collections.emptyMap(); + globalFileInfoCacheForFailedCompaction = Collections.emptyMap(); + } + + @TestOnly + public static boolean isGlobalFileInfoCacheEnabled() { + return globalFileInfoCacheEnabled; + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/AbstractCrossSpaceEstimator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/AbstractCrossSpaceEstimator.java index 3b6db3127b1..76d52c6ed75 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/AbstractCrossSpaceEstimator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/AbstractCrossSpaceEstimator.java @@ -20,6 +20,7 @@ package org.apache.iotdb.db.storageengine.dataregion.compaction.selector.estimator; import org.apache.iotdb.db.storageengine.dataregion.compaction.io.CompactionTsFileReader; +import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionScheduleContext; import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.constant.CompactionType; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; @@ -59,5 +60,8 @@ public abstract class AbstractCrossSpaceEstimator extends AbstractCompactionEsti } public abstract long roughEstimateCrossCompactionMemory( - List<TsFileResource> seqResources, List<TsFileResource> unseqResources) throws IOException; + CompactionScheduleContext context, + List<TsFileResource> seqResources, + List<TsFileResource> unseqResources) + throws IOException; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/AbstractInnerSpaceEstimator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/AbstractInnerSpaceEstimator.java index 21288883ae8..40912afdffe 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/AbstractInnerSpaceEstimator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/AbstractInnerSpaceEstimator.java @@ -21,6 +21,7 @@ package org.apache.iotdb.db.storageengine.dataregion.compaction.selector.estimat import org.apache.iotdb.commons.conf.IoTDBConstant; import org.apache.iotdb.db.storageengine.dataregion.compaction.io.CompactionTsFileReader; +import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionScheduleContext; import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.constant.CompactionType; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; @@ -57,6 +58,6 @@ public abstract class AbstractInnerSpaceEstimator extends AbstractCompactionEsti return cost; } - public abstract long roughEstimateInnerCompactionMemory(List<TsFileResource> resources) - throws IOException; + public abstract long roughEstimateInnerCompactionMemory( + CompactionScheduleContext context, List<TsFileResource> resources) throws IOException; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/CompactionEstimateUtils.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/CompactionEstimateUtils.java index c332f72dc54..419d8a6ed7c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/CompactionEstimateUtils.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/CompactionEstimateUtils.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.storageengine.dataregion.compaction.selector.estimator; +import org.apache.iotdb.commons.conf.IoTDBConstant; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.CompactionSourceFileDeletedException; import org.apache.iotdb.db.storageengine.dataregion.compaction.io.CompactionTsFileReader; @@ -26,12 +27,17 @@ import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.constant import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; import org.apache.iotdb.db.storageengine.rescon.memory.SystemInfo; +import org.apache.tsfile.enums.TSDataType; import org.apache.tsfile.file.metadata.ChunkMetadata; import org.apache.tsfile.file.metadata.IDeviceID; import org.apache.tsfile.file.metadata.MetadataIndexNode; +import org.apache.tsfile.file.metadata.statistics.BinaryStatistics; import org.apache.tsfile.read.TsFileDeviceIterator; import org.apache.tsfile.read.TsFileSequenceReader; import org.apache.tsfile.utils.Pair; +import org.apache.tsfile.utils.RamUsageEstimator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.HashMap; @@ -41,6 +47,9 @@ import java.util.Map; public class CompactionEstimateUtils { + protected static final Logger LOGGER = + LoggerFactory.getLogger(IoTDBConstant.COMPACTION_LOGGER_NAME); + /** * Get the details of the tsfile, the returned array contains the following elements in sequence: * @@ -55,12 +64,15 @@ public class CompactionEstimateUtils { * * @throws IOException if io errors occurred */ - public static FileInfo calculateFileInfo(TsFileSequenceReader reader) throws IOException { + static FileInfo calculateFileInfo(TsFileSequenceReader reader) throws IOException { int totalChunkNum = 0; int maxChunkNum = 0; int maxAlignedSeriesNumInDevice = -1; int maxDeviceChunkNum = 0; + long maxMemCostToReadAlignedSeriesMetadata = 0; + long maxMemCostToReadNonAlignedSeriesMetadata = 0; TsFileDeviceIterator deviceIterator = reader.getAllDevicesIteratorWithIsAligned(); + long totalMetadataSize = 0; while (deviceIterator.hasNext()) { int deviceChunkNum = 0; int alignedSeriesNumInDevice = 0; @@ -68,6 +80,7 @@ public class CompactionEstimateUtils { Pair<IDeviceID, Boolean> deviceWithIsAlignedPair = deviceIterator.next(); IDeviceID device = deviceWithIsAlignedPair.left; boolean isAlignedDevice = deviceWithIsAlignedPair.right; + long memCostToReadMetadata = 0; Iterator<Map<String, List<ChunkMetadata>>> measurementChunkMetadataListMapIterator = reader.getMeasurementChunkMetadataListMapIterator(device); @@ -81,9 +94,45 @@ public class CompactionEstimateUtils { for (Map.Entry<String, List<ChunkMetadata>> measurementChunkMetadataList : measurementChunkMetadataListMap.entrySet()) { int currentChunkMetadataListSize = measurementChunkMetadataList.getValue().size(); + long measurementNameRamSize = + RamUsageEstimator.sizeOf(measurementChunkMetadataList.getKey()); + long chunkMetadataMemCost = 0; + long currentSeriesRamSize = measurementNameRamSize; + for (ChunkMetadata chunkMetadata : measurementChunkMetadataList.getValue()) { + // chunkMetadata should not be a null value + if (chunkMetadata != null) { + TSDataType dataType = chunkMetadata.getDataType(); + chunkMetadataMemCost = + chunkMetadataMemCost != 0 + ? chunkMetadataMemCost + : (ChunkMetadata.calculateRamSize(chunkMetadata.getMeasurementUid(), dataType) + - measurementNameRamSize); + if (dataType == TSDataType.TEXT) { + // add ram size for first value and last value + currentSeriesRamSize += + chunkMetadata.getStatistics().getRetainedSizeInBytes() + - BinaryStatistics.INSTANCE_SIZE; + } else { + break; + } + } else { + LOGGER.warn( + "{} has null chunk metadata, file is {}", + device.toString() + "." + measurementChunkMetadataList.getKey(), + reader.getFileName()); + } + } + currentSeriesRamSize += chunkMetadataMemCost * currentChunkMetadataListSize; + if (isAlignedDevice) { + memCostToReadMetadata += currentSeriesRamSize; + } else { + maxMemCostToReadNonAlignedSeriesMetadata = + Math.max(maxMemCostToReadNonAlignedSeriesMetadata, currentSeriesRamSize); + } deviceChunkNum += currentChunkMetadataListSize; totalChunkNum += currentChunkMetadataListSize; maxChunkNum = Math.max(maxChunkNum, currentChunkMetadataListSize); + totalMetadataSize += currentSeriesRamSize; } } if (isAlignedDevice) { @@ -91,21 +140,24 @@ public class CompactionEstimateUtils { Math.max(maxAlignedSeriesNumInDevice, alignedSeriesNumInDevice); } maxDeviceChunkNum = Math.max(maxDeviceChunkNum, deviceChunkNum); + maxMemCostToReadAlignedSeriesMetadata = + Math.max(maxMemCostToReadAlignedSeriesMetadata, memCostToReadMetadata); } - long averageChunkMetadataSize = - totalChunkNum == 0 ? 0 : reader.getAllMetadataSize() / totalChunkNum; + long averageChunkMetadataSize = totalChunkNum == 0 ? 0 : totalMetadataSize / totalChunkNum; return new FileInfo( totalChunkNum, maxChunkNum, maxAlignedSeriesNumInDevice, maxDeviceChunkNum, - averageChunkMetadataSize); + averageChunkMetadataSize, + maxMemCostToReadAlignedSeriesMetadata, + maxMemCostToReadNonAlignedSeriesMetadata); } - static MetadataInfo collectMetadataInfo(List<TsFileResource> resources, CompactionType taskType) - throws IOException { + static CompactionTaskMetadataInfo collectMetadataInfoFromDisk( + List<TsFileResource> resources, CompactionType taskType) throws IOException { CompactionEstimateUtils.addReadLock(resources); - MetadataInfo metadataInfo = new MetadataInfo(); + CompactionTaskMetadataInfo metadataInfo = new CompactionTaskMetadataInfo(); long cost = 0L; Map<IDeviceID, Long> deviceMetadataSizeMap = new HashMap<>(); try { @@ -129,8 +181,31 @@ public class CompactionEstimateUtils { } } + static CompactionTaskMetadataInfo collectMetadataInfoFromCachedFileInfo( + List<TsFileResource> resources, + Map<TsFileResource, FileInfo.RoughFileInfo> cachedFileInfo, + boolean hasConcurrentSubTask) { + CompactionTaskMetadataInfo metadataInfo = new CompactionTaskMetadataInfo(); + for (TsFileResource resource : resources) { + metadataInfo.metadataMemCost += resource.getModFile().getSize(); + long maxMemToReadAlignedSeries = cachedFileInfo.get(resource).maxMemToReadAlignedSeries; + long maxMemToReadNonAlignedSeries = cachedFileInfo.get(resource).maxMemToReadNonAlignedSeries; + metadataInfo.metadataMemCost += + Math.max( + maxMemToReadAlignedSeries, + maxMemToReadNonAlignedSeries + * (hasConcurrentSubTask + ? IoTDBDescriptor.getInstance().getConfig().getSubCompactionTaskNum() + : 1)); + if (maxMemToReadAlignedSeries > 0) { + metadataInfo.hasAlignedSeries = true; + } + } + return metadataInfo; + } + static Map<IDeviceID, Long> getDeviceMetadataSizeMapAndCollectMetadataInfo( - CompactionTsFileReader reader, MetadataInfo metadataInfo) throws IOException { + CompactionTsFileReader reader, CompactionTaskMetadataInfo metadataInfo) throws IOException { Map<IDeviceID, Long> deviceMetadataSizeMap = new HashMap<>(); TsFileDeviceIterator deviceIterator = reader.getAllDevicesIteratorWithIsAligned(); while (deviceIterator.hasNext()) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/CompactionTaskInfo.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/CompactionTaskInfo.java index 32803fb944f..7328d05ea54 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/CompactionTaskInfo.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/CompactionTaskInfo.java @@ -34,7 +34,6 @@ public class CompactionTaskInfo { private long modificationFileSize = 0; private long totalFileSize = 0; private long totalChunkNum = 0; - private long totalChunkMetadataSize = 0; protected CompactionTaskInfo(List<TsFileResource> resources, List<FileInfo> fileInfoList) { this.fileInfoList = fileInfoList; @@ -55,7 +54,6 @@ public class CompactionTaskInfo { Math.max(maxChunkMetadataNumInDevice, fileInfo.maxDeviceChunkNum); maxChunkMetadataSize = Math.max(maxChunkMetadataSize, fileInfo.averageChunkMetadataSize); totalChunkNum += fileInfo.totalChunkNum; - totalChunkMetadataSize += fileInfo.totalChunkNum * fileInfo.averageChunkMetadataSize; } } @@ -94,8 +92,4 @@ public class CompactionTaskInfo { public List<TsFileResource> getResources() { return resources; } - - public long getTotalChunkMetadataSize() { - return totalChunkMetadataSize; - } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/MetadataInfo.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/CompactionTaskMetadataInfo.java similarity index 81% rename from iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/MetadataInfo.java rename to iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/CompactionTaskMetadataInfo.java index a6474a599f4..ac72978b7dc 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/MetadataInfo.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/CompactionTaskMetadataInfo.java @@ -21,11 +21,15 @@ package org.apache.iotdb.db.storageengine.dataregion.compaction.selector.estimat import org.apache.iotdb.db.conf.IoTDBDescriptor; -class MetadataInfo { +class CompactionTaskMetadataInfo { public long metadataMemCost; public boolean hasAlignedSeries; - public int getMaxConcurrentSeriesNum() { + public int getMaxConcurrentSeriesNum(boolean hasConcurrentSubTask) { + int subTaskNum = + hasConcurrentSubTask + ? IoTDBDescriptor.getInstance().getConfig().getSubCompactionTaskNum() + : 1; if (!hasAlignedSeries) { return IoTDBDescriptor.getInstance().getConfig().getSubCompactionTaskNum(); } @@ -35,8 +39,6 @@ class MetadataInfo { compactionMaxAlignedSeriesNumInOneBatch <= 0 ? Integer.MAX_VALUE : compactionMaxAlignedSeriesNumInOneBatch; - return Math.max( - compactionMaxAlignedSeriesNumInOneBatch, - IoTDBDescriptor.getInstance().getConfig().getSubCompactionTaskNum()); + return Math.max(compactionMaxAlignedSeriesNumInOneBatch, subTaskNum); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/FastCompactionInnerCompactionEstimator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/FastCompactionInnerCompactionEstimator.java index d7b3933b38d..d69c0b38f4b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/FastCompactionInnerCompactionEstimator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/FastCompactionInnerCompactionEstimator.java @@ -19,9 +19,11 @@ package org.apache.iotdb.db.storageengine.dataregion.compaction.selector.estimator; -import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.constant.CompactionType; +import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionScheduleContext; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; +import javax.annotation.Nullable; + import java.io.IOException; import java.util.List; @@ -31,9 +33,19 @@ public class FastCompactionInnerCompactionEstimator extends AbstractInnerSpaceEs public long calculatingMetadataMemoryCost(CompactionTaskInfo taskInfo) { long cost = 0; // add ChunkMetadata size of MultiTsFileDeviceIterator + long maxAlignedSeriesMemCost = + taskInfo.getFileInfoList().stream() + .mapToLong(fileInfo -> fileInfo.maxMemToReadAlignedSeries) + .sum(); + long maxNonAlignedSeriesMemCost = + taskInfo.getFileInfoList().stream() + .mapToLong( + fileInfo -> + fileInfo.maxMemToReadNonAlignedSeries * config.getSubCompactionTaskNum()) + .sum(); cost += Math.min( - taskInfo.getTotalChunkMetadataSize(), + Math.max(maxAlignedSeriesMemCost, maxNonAlignedSeriesMemCost), taskInfo.getFileInfoList().size() * taskInfo.getMaxChunkMetadataNumInDevice() * taskInfo.getMaxChunkMetadataSize()); @@ -71,7 +83,7 @@ public class FastCompactionInnerCompactionEstimator extends AbstractInnerSpaceEs long maxConcurrentChunkSizeFromSourceFile = (averageChunkSize + tsFileConfig.getPageSizeInByte()) * maxConcurrentSeriesNum - * calculatingMaxOverlapFileNumInSubCompactionTask(taskInfo.getResources()); + * calculatingMaxOverlapFileNumInSubCompactionTask(null, taskInfo.getResources()); return targetChunkWriterSize + maxConcurrentChunkSizeFromSourceFile @@ -79,25 +91,33 @@ public class FastCompactionInnerCompactionEstimator extends AbstractInnerSpaceEs } @Override - public long roughEstimateInnerCompactionMemory(List<TsFileResource> resources) + public long roughEstimateInnerCompactionMemory( + @Nullable CompactionScheduleContext context, List<TsFileResource> resources) throws IOException { if (config.getCompactionMaxAlignedSeriesNumInOneBatch() <= 0) { return -1L; } - MetadataInfo metadataInfo = - CompactionEstimateUtils.collectMetadataInfo( - resources, - resources.get(0).isSeq() - ? CompactionType.INNER_SEQ_COMPACTION - : CompactionType.INNER_UNSEQ_COMPACTION); - int maxConcurrentSeriesNum = metadataInfo.getMaxConcurrentSeriesNum(); + CompactionTaskMetadataInfo metadataInfo = + CompactionEstimateUtils.collectMetadataInfoFromCachedFileInfo( + resources, roughInfoMap, true); + int maxConcurrentSeriesNum = metadataInfo.getMaxConcurrentSeriesNum(true); long maxChunkSize = config.getTargetChunkSize(); long maxPageSize = tsFileConfig.getPageSizeInByte(); - int maxOverlapFileNum = calculatingMaxOverlapFileNumInSubCompactionTask(resources); + int maxOverlapFileNum = calculatingMaxOverlapFileNumInSubCompactionTask(context, resources); // source files (chunk + uncompressed page) * overlap file num // target file (chunk + unsealed page writer) return (maxOverlapFileNum + 1) * maxConcurrentSeriesNum * (maxChunkSize + maxPageSize) + fixedMemoryBudget + metadataInfo.metadataMemCost; } + + @Override + protected int calculatingMaxOverlapFileNumInSubCompactionTask( + @Nullable CompactionScheduleContext context, List<TsFileResource> resources) + throws IOException { + if (resources.get(0).isSeq()) { + return 1; + } + return super.calculatingMaxOverlapFileNumInSubCompactionTask(context, resources); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/FastCrossSpaceCompactionEstimator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/FastCrossSpaceCompactionEstimator.java index 97f3aef7a8a..aaa9099bd7f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/FastCrossSpaceCompactionEstimator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/FastCrossSpaceCompactionEstimator.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.storageengine.dataregion.compaction.selector.estimator; +import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionScheduleContext; import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.constant.CompactionType; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; @@ -32,9 +33,19 @@ public class FastCrossSpaceCompactionEstimator extends AbstractCrossSpaceEstimat protected long calculatingMetadataMemoryCost(CompactionTaskInfo taskInfo) { long cost = 0; // add ChunkMetadata size of MultiTsFileDeviceIterator + long maxAlignedSeriesMemCost = + taskInfo.getFileInfoList().stream() + .mapToLong(fileInfo -> fileInfo.maxMemToReadAlignedSeries) + .sum(); + long maxNonAlignedSeriesMemCost = + taskInfo.getFileInfoList().stream() + .mapToLong( + fileInfo -> + fileInfo.maxMemToReadNonAlignedSeries * config.getSubCompactionTaskNum()) + .sum(); cost += Math.min( - taskInfo.getTotalChunkMetadataSize(), + Math.max(maxAlignedSeriesMemCost, maxNonAlignedSeriesMemCost), taskInfo.getFileInfoList().size() * taskInfo.getMaxChunkMetadataNumInDevice() * taskInfo.getMaxChunkMetadataSize()); @@ -73,7 +84,7 @@ public class FastCrossSpaceCompactionEstimator extends AbstractCrossSpaceEstimat long maxConcurrentChunkSizeFromSourceFile = (averageChunkSize + tsFileConfig.getPageSizeInByte()) * maxConcurrentSeriesNum - * calculatingMaxOverlapFileNumInSubCompactionTask(taskInfo.getResources()); + * calculatingMaxOverlapFileNumInSubCompactionTask(null, taskInfo.getResources()); return targetChunkWriterSize + maxConcurrentChunkSizeFromSourceFile @@ -82,7 +93,10 @@ public class FastCrossSpaceCompactionEstimator extends AbstractCrossSpaceEstimat @Override public long roughEstimateCrossCompactionMemory( - List<TsFileResource> seqResources, List<TsFileResource> unseqResources) throws IOException { + CompactionScheduleContext context, + List<TsFileResource> seqResources, + List<TsFileResource> unseqResources) + throws IOException { if (config.getCompactionMaxAlignedSeriesNumInOneBatch() <= 0) { return -1L; } @@ -90,13 +104,14 @@ public class FastCrossSpaceCompactionEstimator extends AbstractCrossSpaceEstimat sourceFiles.addAll(seqResources); sourceFiles.addAll(unseqResources); - MetadataInfo metadataInfo = - CompactionEstimateUtils.collectMetadataInfo(sourceFiles, CompactionType.CROSS_COMPACTION); + CompactionTaskMetadataInfo metadataInfo = + CompactionEstimateUtils.collectMetadataInfoFromDisk( + sourceFiles, CompactionType.CROSS_COMPACTION); - int maxConcurrentSeriesNum = metadataInfo.getMaxConcurrentSeriesNum(); + int maxConcurrentSeriesNum = metadataInfo.getMaxConcurrentSeriesNum(true); long maxChunkSize = config.getTargetChunkSize(); long maxPageSize = tsFileConfig.getPageSizeInByte(); - int maxOverlapFileNum = calculatingMaxOverlapFileNumInSubCompactionTask(sourceFiles); + int maxOverlapFileNum = calculatingMaxOverlapFileNumInSubCompactionTask(context, sourceFiles); // source files (chunk + uncompressed page) * overlap file num // target files (chunk + unsealed page writer) return (maxOverlapFileNum + 1) * maxConcurrentSeriesNum * (maxChunkSize + maxPageSize) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/FileInfo.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/FileInfo.java index 09282cc0cdc..b3282732898 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/FileInfo.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/FileInfo.java @@ -19,7 +19,23 @@ package org.apache.iotdb.db.storageengine.dataregion.compaction.selector.estimator; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileID; + +import org.apache.tsfile.utils.RamUsageEstimator; + +import java.util.Map; + public class FileInfo { + public static final long MEMORY_COST_OF_FILE_INFO_ENTRY_IN_CACHE = + RamUsageEstimator.shallowSizeOfInstance(FileInfo.class) + + RamUsageEstimator.shallowSizeOfInstance(TsFileID.class) + + RamUsageEstimator.shallowSizeOfInstance(Map.Entry.class) + + RamUsageEstimator.NUM_BYTES_OBJECT_REF * 2L; + public static final long MEMORY_COST_OF_ROUGH_FILE_INFO_ENTRY_IN_CACHE = + RamUsageEstimator.shallowSizeOfInstance(RoughFileInfo.class) + + RamUsageEstimator.shallowSizeOfInstance(TsFileID.class) + + RamUsageEstimator.shallowSizeOfInstance(Map.Entry.class) + + RamUsageEstimator.NUM_BYTES_OBJECT_REF * 2L; // total chunk num in this tsfile int totalChunkNum = 0; // max chunk num of one timeseries in this tsfile @@ -34,16 +50,37 @@ public class FileInfo { long averageChunkMetadataSize = 0; + long maxMemToReadAlignedSeries; + long maxMemToReadNonAlignedSeries; + public FileInfo( int totalChunkNum, int maxSeriesChunkNum, int maxAlignedSeriesNumInDevice, int maxDeviceChunkNum, - long averageChunkMetadataSize) { + long averageChunkMetadataSize, + long maxMemToReadAlignedSeries, + long maxMemToReadNonAlignedSeries) { this.totalChunkNum = totalChunkNum; this.maxSeriesChunkNum = maxSeriesChunkNum; this.maxAlignedSeriesNumInDevice = maxAlignedSeriesNumInDevice; this.maxDeviceChunkNum = maxDeviceChunkNum; this.averageChunkMetadataSize = averageChunkMetadataSize; + this.maxMemToReadAlignedSeries = maxMemToReadAlignedSeries; + this.maxMemToReadNonAlignedSeries = maxMemToReadNonAlignedSeries; + } + + public RoughFileInfo getSimpleFileInfo() { + return new RoughFileInfo(maxMemToReadAlignedSeries, maxMemToReadNonAlignedSeries); + } + + public static class RoughFileInfo { + long maxMemToReadAlignedSeries; + long maxMemToReadNonAlignedSeries; + + public RoughFileInfo(long maxMemToReadAlignedSeries, long maxMemToReadNonAlignedSeries) { + this.maxMemToReadAlignedSeries = maxMemToReadAlignedSeries; + this.maxMemToReadNonAlignedSeries = maxMemToReadNonAlignedSeries; + } } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/ReadChunkInnerCompactionEstimator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/ReadChunkInnerCompactionEstimator.java index 4e3ddad6969..96b0d882fec 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/ReadChunkInnerCompactionEstimator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/ReadChunkInnerCompactionEstimator.java @@ -19,10 +19,9 @@ package org.apache.iotdb.db.storageengine.dataregion.compaction.selector.estimator; -import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.constant.CompactionType; +import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionScheduleContext; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; -import java.io.IOException; import java.util.List; public class ReadChunkInnerCompactionEstimator extends AbstractInnerSpaceEstimator { @@ -31,9 +30,17 @@ public class ReadChunkInnerCompactionEstimator extends AbstractInnerSpaceEstimat public long calculatingMetadataMemoryCost(CompactionTaskInfo taskInfo) { long cost = 0; // add ChunkMetadata size of MultiTsFileDeviceIterator + long maxAlignedSeriesMemCost = + taskInfo.getFileInfoList().stream() + .mapToLong(fileInfo -> fileInfo.maxMemToReadAlignedSeries) + .sum(); + long maxNonAlignedSeriesMemCost = + taskInfo.getFileInfoList().stream() + .mapToLong(fileInfo -> fileInfo.maxMemToReadNonAlignedSeries) + .sum(); cost += Math.min( - taskInfo.getTotalChunkMetadataSize(), + Math.max(maxAlignedSeriesMemCost, maxNonAlignedSeriesMemCost), taskInfo.getFileInfoList().size() * taskInfo.getMaxChunkMetadataNumInDevice() * taskInfo.getMaxChunkMetadataSize()); @@ -73,14 +80,16 @@ public class ReadChunkInnerCompactionEstimator extends AbstractInnerSpaceEstimat } @Override - public long roughEstimateInnerCompactionMemory(List<TsFileResource> resources) - throws IOException { + public long roughEstimateInnerCompactionMemory( + CompactionScheduleContext context, List<TsFileResource> resources) { if (config.getCompactionMaxAlignedSeriesNumInOneBatch() <= 0) { return -1L; } - MetadataInfo metadataInfo = - CompactionEstimateUtils.collectMetadataInfo(resources, CompactionType.INNER_SEQ_COMPACTION); - int maxConcurrentSeriesNum = metadataInfo.getMaxConcurrentSeriesNum(); + CompactionTaskMetadataInfo metadataInfo = + CompactionEstimateUtils.collectMetadataInfoFromCachedFileInfo( + resources, roughInfoMap, false); + + int maxConcurrentSeriesNum = metadataInfo.getMaxConcurrentSeriesNum(false); long maxChunkSize = config.getTargetChunkSize(); long maxPageSize = tsFileConfig.getPageSizeInByte(); // source files (chunk + uncompressed page) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/RepairUnsortedFileCompactionEstimator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/RepairUnsortedFileCompactionEstimator.java index 3b20b57b025..eca5bb3dcb0 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/RepairUnsortedFileCompactionEstimator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/RepairUnsortedFileCompactionEstimator.java @@ -20,6 +20,7 @@ package org.apache.iotdb.db.storageengine.dataregion.compaction.selector.estimator; import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionScheduleContext; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; import org.apache.iotdb.db.storageengine.rescon.memory.SystemInfo; @@ -31,10 +32,22 @@ public class RepairUnsortedFileCompactionEstimator extends AbstractInnerSpaceEst protected long calculatingMetadataMemoryCost(CompactionTaskInfo taskInfo) { long cost = 0; // add ChunkMetadata size of MultiTsFileDeviceIterator + long maxAlignedSeriesMemCost = + taskInfo.getFileInfoList().stream() + .mapToLong(fileInfo -> fileInfo.maxMemToReadAlignedSeries) + .sum(); + long maxNonAlignedSeriesMemCost = + taskInfo.getFileInfoList().stream() + .mapToLong( + fileInfo -> + fileInfo.maxMemToReadNonAlignedSeries * config.getSubCompactionTaskNum()) + .sum(); cost += Math.min( - taskInfo.getTotalChunkMetadataSize(), - taskInfo.getMaxChunkMetadataNumInDevice() * taskInfo.getMaxChunkMetadataSize()); + Math.max(maxAlignedSeriesMemCost, maxNonAlignedSeriesMemCost), + taskInfo.getFileInfoList().size() + * taskInfo.getMaxChunkMetadataNumInDevice() + * taskInfo.getMaxChunkMetadataSize()); // add ChunkMetadata size of targetFileWriter long sizeForFileWriter = @@ -72,8 +85,8 @@ public class RepairUnsortedFileCompactionEstimator extends AbstractInnerSpaceEst } @Override - public long roughEstimateInnerCompactionMemory(List<TsFileResource> resources) - throws IOException { + public long roughEstimateInnerCompactionMemory( + CompactionScheduleContext context, List<TsFileResource> resources) throws IOException { throw new RuntimeException("unimplemented"); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/impl/NewSizeTieredCompactionSelector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/impl/NewSizeTieredCompactionSelector.java index b800a1a9815..ee24ea60748 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/impl/NewSizeTieredCompactionSelector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/impl/NewSizeTieredCompactionSelector.java @@ -19,17 +19,23 @@ package org.apache.iotdb.db.storageengine.dataregion.compaction.selector.impl; +import org.apache.iotdb.commons.conf.IoTDBConstant; import org.apache.iotdb.commons.service.metric.MetricService; import org.apache.iotdb.commons.service.metric.enums.Tag; +import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.IInnerCompactionPerformer; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.InnerSpaceCompactionTask; import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionScheduleContext; +import org.apache.iotdb.db.storageengine.dataregion.compaction.selector.estimator.AbstractInnerSpaceEstimator; import org.apache.iotdb.db.storageengine.dataregion.compaction.selector.utils.TsFileResourceCandidate; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileManager; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; +import org.apache.iotdb.db.storageengine.rescon.memory.SystemInfo; import org.apache.iotdb.metrics.utils.MetricLevel; import org.apache.iotdb.metrics.utils.SystemMetric; import org.apache.tsfile.file.metadata.IDeviceID; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.ArrayList; @@ -41,6 +47,8 @@ import java.util.stream.Stream; public class NewSizeTieredCompactionSelector extends SizeTieredCompactionSelector { + private static final Logger LOGGER = + LoggerFactory.getLogger(IoTDBConstant.COMPACTION_LOGGER_NAME); private List<TsFileResourceCandidate> tsFileResourceCandidateList = new ArrayList<>(); private final long totalFileSizeThreshold; // the total file num in one task can not exceed this value @@ -150,6 +158,10 @@ public class NewSizeTieredCompactionSelector extends SizeTieredCompactionSelecto levelTaskSelection.endCurrentTaskSelection(); break; } + if (!levelTaskSelection.canSelectMoreFilesInMemoryBudget(currentFile)) { + levelTaskSelection.endCurrentTaskSelection(); + break; + } levelTaskSelection.addSelectedResource(currentFile, idx); } levelTaskSelection.endCurrentTaskSelection(); @@ -172,8 +184,27 @@ public class NewSizeTieredCompactionSelector extends SizeTieredCompactionSelecto int lastSelectedFileIndex = -1; int nextTaskStartIndex = -1; + boolean estimateCompactionTaskMemoryDuringSelection; + boolean reachMemoryLimit = false; + IInnerCompactionPerformer performer; + AbstractInnerSpaceEstimator estimator; + long memoryCost; + private InnerSpaceCompactionTaskSelection(long level) { this.level = level; + resetMemoryEstimationFields(); + } + + private void resetMemoryEstimationFields() { + estimateCompactionTaskMemoryDuringSelection = true; + reachMemoryLimit = false; + performer = + sequence ? context.getSeqCompactionPerformer() : context.getUnseqCompactionPerformer(); + estimator = performer.getInnerSpaceEstimator().orElse(null); + if (estimator == null) { + estimateCompactionTaskMemoryDuringSelection = false; + } + memoryCost = 0; } private boolean haveOverlappedDevices(TsFileResourceCandidate resourceCandidate) @@ -214,6 +245,31 @@ public class NewSizeTieredCompactionSelector extends SizeTieredCompactionSelecto return currentSelectedResources.isEmpty(); } + private boolean canSelectMoreFilesInMemoryBudget(TsFileResourceCandidate currentFile) + throws IOException { + // can not get enough information to estimate memory cost + if (!estimateCompactionTaskMemoryDuringSelection) { + return true; + } + if (!estimator.hasCachedRoughFileInfo(currentFile.resource)) { + estimateCompactionTaskMemoryDuringSelection = false; + return true; + } + memoryCost = + estimator.roughEstimateInnerCompactionMemory( + context, + Stream.concat(currentSelectedResources.stream(), Stream.of(currentFile.resource)) + .collect(Collectors.toList())); + if (memoryCost < 0) { + return false; + } + if (memoryCost > SystemInfo.getInstance().getMemorySizeForCompaction()) { + reachMemoryLimit = true; + return false; + } + return true; + } + private void reset() { currentSelectedResources = new ArrayList<>(); currentSkippedResources = new ArrayList<>(); @@ -221,6 +277,7 @@ public class NewSizeTieredCompactionSelector extends SizeTieredCompactionSelecto lastContinuousSkippedResources = new ArrayList<>(); currentSelectedFileTotalSize = 0; currentSkippedFileTotalSize = 0; + resetMemoryEstimationFields(); } private boolean isTaskTooLarge(TsFileResourceCandidate currentFile) { @@ -244,7 +301,11 @@ public class NewSizeTieredCompactionSelector extends SizeTieredCompactionSelecto long currentFileSize = resource.getTsFileSize(); if (totalFileSize + currentFileSize > singleFileSizeThreshold || totalFileNum + 1 > totalFileNumUpperBound - || !isFileLevelSatisfied(resource.getTsFileID().getInnerCompactionCount())) { + || !isFileLevelSatisfied(resource.getTsFileID().getInnerCompactionCount()) + // if estimateCompactionTaskMemoryDuringSelection is true, we have used the + // selected files for memory estimation. To ensure consistent results, we + // will not add other files for merging. + || estimateCompactionTaskMemoryDuringSelection) { break; } currentSkippedResources.add(resource); @@ -259,7 +320,12 @@ public class NewSizeTieredCompactionSelector extends SizeTieredCompactionSelecto } boolean canCompactAllFiles = - totalFileSize <= singleFileSizeThreshold && totalFileNum <= totalFileNumUpperBound; + totalFileSize <= singleFileSizeThreshold + && totalFileNum <= totalFileNumUpperBound + // if estimateCompactionTaskMemoryDuringSelection is true, we have used the + // selected files for memory estimation. To ensure consistent results, we + // will not add other files for merging. + && !estimateCompactionTaskMemoryDuringSelection; if (canCompactAllFiles) { currentSelectedResources = Stream.concat(currentSelectedResources.stream(), currentSkippedResources.stream()) @@ -273,10 +339,14 @@ public class NewSizeTieredCompactionSelector extends SizeTieredCompactionSelecto boolean isSatisfied = (currentSelectedResources.size() >= totalFileNumLowerBound || !isActiveTimePartition - || currentSelectedFileTotalSize >= singleFileSizeThreshold) + || currentSelectedFileTotalSize >= singleFileSizeThreshold + || reachMemoryLimit) && currentSelectedResources.size() > 1; if (isSatisfied) { InnerSpaceCompactionTask task = createInnerSpaceCompactionTask(); + if (estimateCompactionTaskMemoryDuringSelection) { + task.setRoughMemoryCost(memoryCost); + } selectedTaskList.add(task); } } finally { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/impl/RewriteCrossSpaceCompactionSelector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/impl/RewriteCrossSpaceCompactionSelector.java index 804d441d2f2..ffad8a487da 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/impl/RewriteCrossSpaceCompactionSelector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/impl/RewriteCrossSpaceCompactionSelector.java @@ -237,7 +237,7 @@ public class RewriteCrossSpaceCompactionSelector implements ICrossSpaceSelector long roughEstimatedMemoryCost = compactionEstimator.roughEstimateCrossCompactionMemory( - newSelectedSeqResources, newSelectedUnseqResources); + context, newSelectedSeqResources, newSelectedUnseqResources); long memoryCost = CompactionEstimateUtils.shouldUseRoughEstimatedResult(roughEstimatedMemoryCost) ? roughEstimatedMemoryCost diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/utils/CompactionTaskMemCostEstimatorTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/utils/CompactionTaskMemCostEstimatorTest.java index b208163a8ca..e96ea2653d8 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/utils/CompactionTaskMemCostEstimatorTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/utils/CompactionTaskMemCostEstimatorTest.java @@ -23,9 +23,13 @@ import org.apache.iotdb.commons.exception.MetadataException; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.exception.StorageEngineException; import org.apache.iotdb.db.storageengine.dataregion.compaction.AbstractCompactionTest; +import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.InnerSpaceCompactionTask; +import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionScheduleContext; +import org.apache.iotdb.db.storageengine.dataregion.compaction.selector.estimator.AbstractCompactionEstimator; import org.apache.iotdb.db.storageengine.dataregion.compaction.selector.estimator.FastCompactionInnerCompactionEstimator; import org.apache.iotdb.db.storageengine.dataregion.compaction.selector.estimator.FastCrossSpaceCompactionEstimator; import org.apache.iotdb.db.storageengine.dataregion.compaction.selector.estimator.ReadChunkInnerCompactionEstimator; +import org.apache.iotdb.db.storageengine.dataregion.compaction.selector.impl.NewSizeTieredCompactionSelector; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; import org.apache.tsfile.exception.write.WriteProcessException; @@ -43,12 +47,13 @@ import java.util.List; public class CompactionTaskMemCostEstimatorTest extends AbstractCompactionTest { - int compactionBatchSize = - IoTDBDescriptor.getInstance().getConfig().getCompactionMaxAlignedSeriesNumInOneBatch(); + int compactionBatchSize; @Before public void setUp() throws IOException, WriteProcessException, MetadataException, InterruptedException { + compactionBatchSize = + IoTDBDescriptor.getInstance().getConfig().getCompactionMaxAlignedSeriesNumInOneBatch(); super.setUp(); } @@ -116,39 +121,57 @@ public class CompactionTaskMemCostEstimatorTest extends AbstractCompactionTest { } @Test - public void testEstimateWithNegativeBatchSize() throws IOException { - TsFileResource resource = createEmptyFileAndResource(true); - try (CompactionTestFileWriter writer = new CompactionTestFileWriter(resource)) { - writer.startChunkGroup("d1"); - List<String> measurements = new ArrayList<>(); + public void testRoughEstimate() throws IOException { + boolean cacheEnabled = AbstractCompactionEstimator.isGlobalFileInfoCacheEnabled(); + if (!cacheEnabled) { + AbstractCompactionEstimator.enableFileInfoCacheForTest(100, 100); + } + try { for (int i = 0; i < 10; i++) { - measurements.add("s" + i); + TsFileResource resource = createEmptyFileAndResource(false); + try (CompactionTestFileWriter writer = new CompactionTestFileWriter(resource)) { + writer.startChunkGroup("d1"); + List<String> measurements = new ArrayList<>(); + for (int j = 0; j < 10; j++) { + measurements.add("s" + j); + } + writer.generateSimpleAlignedSeriesToCurrentDevice( + measurements, + new TimeRange[] {new TimeRange(0, 10000)}, + TSEncoding.PLAIN, + CompressionType.UNCOMPRESSED); + writer.endChunkGroup(); + + writer.startChunkGroup("d2"); + for (int j = 0; j < 10; j++) { + writer.generateSimpleNonAlignedSeriesToCurrentDevice( + "s" + j, + new TimeRange[] {new TimeRange(0, 10000)}, + TSEncoding.PLAIN, + CompressionType.UNCOMPRESSED); + } + writer.endChunkGroup(); + writer.endFile(); + } + seqResources.add(resource); } - writer.generateSimpleAlignedSeriesToCurrentDevice( - measurements, - new TimeRange[] {new TimeRange(0, 10000)}, - TSEncoding.PLAIN, - CompressionType.UNCOMPRESSED); - writer.endChunkGroup(); - - writer.startChunkGroup("d2"); - for (int i = 0; i < 10; i++) { - writer.generateSimpleNonAlignedSeriesToCurrentDevice( - "s" + i, - new TimeRange[] {new TimeRange(0, 10000)}, - TSEncoding.PLAIN, - CompressionType.UNCOMPRESSED); + NewSizeTieredCompactionSelector selector = + new NewSizeTieredCompactionSelector( + COMPACTION_TEST_SG, "0", 0, true, tsFileManager, new CompactionScheduleContext()); + List<InnerSpaceCompactionTask> innerSpaceCompactionTasks = + selector.selectInnerSpaceTask(seqResources); + Assert.assertEquals(1, innerSpaceCompactionTasks.size()); + Assert.assertEquals(-1, innerSpaceCompactionTasks.get(0).getRoughMemoryCost()); + long estimatedMemoryCost = innerSpaceCompactionTasks.get(0).getEstimatedMemoryCost(); + Assert.assertTrue(estimatedMemoryCost > 0); + + innerSpaceCompactionTasks = selector.selectInnerSpaceTask(seqResources); + Assert.assertEquals(1, innerSpaceCompactionTasks.size()); + Assert.assertTrue(innerSpaceCompactionTasks.get(0).getRoughMemoryCost() > 0); + } finally { + if (!cacheEnabled) { + AbstractCompactionEstimator.disableFileInfoCacheForTest(); } - writer.endChunkGroup(); - writer.endFile(); } - seqResources.add(resource); - IoTDBDescriptor.getInstance().getConfig().setCompactionMaxAlignedSeriesNumInOneBatch(-1); - ReadChunkInnerCompactionEstimator estimator = new ReadChunkInnerCompactionEstimator(); - long v1 = estimator.roughEstimateInnerCompactionMemory(seqResources); - Assert.assertTrue(v1 < 0); - IoTDBDescriptor.getInstance().getConfig().setCompactionMaxAlignedSeriesNumInOneBatch(10); - long v2 = estimator.roughEstimateInnerCompactionMemory(seqResources); - Assert.assertTrue(v2 > 0); } }
