jt2594838 commented on code in PR #15257:
URL: https://github.com/apache/iotdb/pull/15257#discussion_r2048714132
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/CompactionEstimateUtils.java:
##########
@@ -81,6 +84,22 @@ public static FileInfo
calculateFileInfo(TsFileSequenceReader reader) throws IOE
for (Map.Entry<String, List<ChunkMetadata>>
measurementChunkMetadataList :
measurementChunkMetadataListMap.entrySet()) {
int currentChunkMetadataListSize =
measurementChunkMetadataList.getValue().size();
+ long chunkMetadataMemCost = 0;
+ for (ChunkMetadata chunkMetadata :
measurementChunkMetadataList.getValue()) {
+ if (chunkMetadata != null) {
+ chunkMetadataMemCost =
+ ChunkMetadata.calculateRamSize(
+ chunkMetadata.getMeasurementUid(),
chunkMetadata.getDataType());
+ break;
+ }
+ }
+ long currentSeriesRamSize = chunkMetadataMemCost *
currentChunkMetadataListSize;
Review Comment:
If any chunkMetadata is null (not sure when), should
currentChunkMetadataListSize be reduced.
For example, if measurementChunkMetadataList.getValue().size() = 5, but
there are only 3 non-null chunks, then the memory size should be 3 instead of 5?
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/CompactionEstimateUtils.java:
##########
@@ -55,11 +55,13 @@ public class CompactionEstimateUtils {
*
* @throws IOException if io errors occurred
*/
- public static FileInfo calculateFileInfo(TsFileSequenceReader reader) throws
IOException {
+ static FileInfo calculateFileInfo(TsFileSequenceReader reader) throws
IOException {
Review Comment:
If this process costs significantly, we may consider serializing the result
in a file, as we have done to accelerate the restart.
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/impl/NewSizeTieredCompactionSelector.java:
##########
@@ -242,7 +306,8 @@ private void endCurrentTaskSelection() {
long currentFileSize = resource.getTsFileSize();
if (totalFileSize + currentFileSize > singleFileSizeThreshold
|| totalFileNum + 1 > totalFileNumUpperBound
- ||
!isFileLevelSatisfied(resource.getTsFileID().getInnerCompactionCount())) {
+ ||
!isFileLevelSatisfied(resource.getTsFileID().getInnerCompactionCount())
+ || estimateCompactionTaskMemoryDuringSelection) {
Review Comment:
Better to add some comment about why
`estimateCompactionTaskMemoryDuringSelection` would affect the judgement (and
the one below).
It took me a while to think it through.
##########
iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/AbstractCompactionTest.java:
##########
@@ -112,6 +113,11 @@ public class AbstractCompactionTest {
20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37,
38, 39
};
+ static {
+ AbstractCompactionEstimator.setGlobalCompactionRoughFileInfoCacheSize(100);
+ AbstractCompactionEstimator.setGlobalCompactionFileInfoCacheSize(100);
+ }
Review Comment:
Better to put them into setUpClass and tearDownClass to avoid unexpected
results caused by different orders of tests.
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/AbstractCompactionEstimator.java:
##########
@@ -99,31 +131,37 @@ private FileInfo getFileInfoFromCache(TsFileResource
resource) throws IOExceptio
if (fileInfoCache.containsKey(resource)) {
return fileInfoCache.get(resource);
}
- File file = new File(resource.getTsFilePath());
+ TsFileID tsFileID = resource.getTsFileID();
synchronized (globalFileInfoCacheForFailedCompaction) {
- if (globalFileInfoCacheForFailedCompaction.containsKey(file)) {
- FileInfo fileInfo = globalFileInfoCacheForFailedCompaction.get(file);
+ FileInfo fileInfo = globalFileInfoCacheForFailedCompaction.get(tsFileID);
+ if (fileInfo != null) {
fileInfoCache.put(resource, fileInfo);
return fileInfo;
}
}
try (TsFileSequenceReader reader = getReader(resource.getTsFilePath())) {
FileInfo fileInfo = CompactionEstimateUtils.calculateFileInfo(reader);
fileInfoCache.put(resource, fileInfo);
- synchronized (globalFileInfoCacheForFailedCompaction) {
- globalFileInfoCacheForFailedCompaction.put(file, fileInfo);
+ if (isCacheMemoryCostAllocated) {
+ synchronized (globalFileInfoCacheForFailedCompaction) {
+ globalFileInfoCacheForFailedCompaction.put(tsFileID, fileInfo);
+ }
+ synchronized (globalRoughInfoCacheForCompaction) {
+ globalRoughInfoCacheForCompaction.put(tsFileID,
fileInfo.getSimpleFileInfo());
+ }
Review Comment:
The map is already a synchronized map, any need to synchronize again? The
same for other places.
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/AbstractCompactionEstimator.java:
##########
@@ -54,14 +58,42 @@
*/
public abstract class AbstractCompactionEstimator {
- private static final Map<File, FileInfo>
globalFileInfoCacheForFailedCompaction =
- Collections.synchronizedMap(
- new LRUMap<>(
-
IoTDBDescriptor.getInstance().getConfig().getGlobalCompactionFileInfoCacheSize()));
+ /** The size of global compaction estimation file info cahce. */
+ private static int globalCompactionFileInfoCacheSize = 1000;
+
+ /** The size of global compaction estimation rough file info cahce. */
+ private static int globalCompactionRoughFileInfoCacheSize = 100000;
+
+ private static final double maxRatioToAllocateFileInfoCache = 0.1;
+ private static boolean isCacheMemoryCostAllocated;
+ private static Map<TsFileID, FileInfo>
globalFileInfoCacheForFailedCompaction;
+ private static Map<TsFileID, FileInfo.RoughFileInfo>
globalRoughInfoCacheForCompaction;
+
+ protected IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+
+ public static long allocateMemoryCostForFileInfoCache(long
compactionMemorySize) {
+ long fixedMemoryCost =
+ globalCompactionFileInfoCacheSize *
FileInfo.MEMORY_COST_OF_FILE_INFO_ENTRY_IN_CACHE
+ + globalCompactionRoughFileInfoCacheSize
+ * FileInfo.MEMORY_COST_OF_ROUGH_FILE_INFO_ENTRY_IN_CACHE;
+ isCacheMemoryCostAllocated =
+ compactionMemorySize * maxRatioToAllocateFileInfoCache >
fixedMemoryCost;
Review Comment:
Is it possible that we calculate globalCompactionFileInfoCacheSize and
globalCompactionRoughFileInfoCacheSize according to compactionMemorySize?
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/AbstractCompactionEstimator.java:
##########
@@ -99,31 +131,37 @@ private FileInfo getFileInfoFromCache(TsFileResource
resource) throws IOExceptio
if (fileInfoCache.containsKey(resource)) {
return fileInfoCache.get(resource);
}
- File file = new File(resource.getTsFilePath());
+ TsFileID tsFileID = resource.getTsFileID();
synchronized (globalFileInfoCacheForFailedCompaction) {
- if (globalFileInfoCacheForFailedCompaction.containsKey(file)) {
- FileInfo fileInfo = globalFileInfoCacheForFailedCompaction.get(file);
+ FileInfo fileInfo = globalFileInfoCacheForFailedCompaction.get(tsFileID);
+ if (fileInfo != null) {
fileInfoCache.put(resource, fileInfo);
return fileInfo;
}
Review Comment:
Is there any memory constraint on the exclusive cache?
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/AbstractCompactionEstimator.java:
##########
@@ -99,31 +131,37 @@ private FileInfo getFileInfoFromCache(TsFileResource
resource) throws IOExceptio
if (fileInfoCache.containsKey(resource)) {
return fileInfoCache.get(resource);
}
- File file = new File(resource.getTsFilePath());
+ TsFileID tsFileID = resource.getTsFileID();
synchronized (globalFileInfoCacheForFailedCompaction) {
- if (globalFileInfoCacheForFailedCompaction.containsKey(file)) {
- FileInfo fileInfo = globalFileInfoCacheForFailedCompaction.get(file);
+ FileInfo fileInfo = globalFileInfoCacheForFailedCompaction.get(tsFileID);
+ if (fileInfo != null) {
fileInfoCache.put(resource, fileInfo);
return fileInfo;
}
}
try (TsFileSequenceReader reader = getReader(resource.getTsFilePath())) {
FileInfo fileInfo = CompactionEstimateUtils.calculateFileInfo(reader);
fileInfoCache.put(resource, fileInfo);
- synchronized (globalFileInfoCacheForFailedCompaction) {
- globalFileInfoCacheForFailedCompaction.put(file, fileInfo);
+ if (isCacheMemoryCostAllocated) {
Review Comment:
This variable is not straightforward enough, maybe we can call it
"globalFileInfoCacheEnabled"?
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/impl/NewSizeTieredCompactionSelector.java:
##########
@@ -170,8 +186,30 @@ private class InnerSpaceCompactionTaskSelection {
int lastSelectedFileIndex = -1;
int nextTaskStartIndex = -1;
+ boolean estimateCompactionTaskMemoryDuringSelection;
+ boolean reachMemoryLimit = false;
+ ICompactionPerformer performer;
+ AbstractInnerSpaceEstimator estimator;
+ long memoryCost;
+
private InnerSpaceCompactionTaskSelection(long level) {
this.level = level;
+ resetMemoryEstimationFields();
+ }
+
+ private void resetMemoryEstimationFields() {
+ estimateCompactionTaskMemoryDuringSelection = true;
+ reachMemoryLimit = false;
+ performer =
+ sequence ? context.getSeqCompactionPerformer() :
context.getUnseqCompactionPerformer();
+ if (performer instanceof ReadChunkCompactionPerformer) {
+ estimator = new ReadChunkInnerCompactionEstimator();
+ } else if (performer instanceof FastCompactionPerformer) {
+ estimator = new FastCompactionInnerCompactionEstimator();
+ } else {
+ estimateCompactionTaskMemoryDuringSelection = false;
+ }
Review Comment:
Well, there had better be a method like getEstimator() in
CompactionPerformer, so that if a new performer is implemented, the developer
will not forget to add the associated estimator.
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/AbstractCompactionEstimator.java:
##########
@@ -184,12 +229,47 @@ public void cleanup() {
fileInfoCache.clear();
}
+ public boolean hasCachedRoughFileInfo(TsFileResource resource) {
+ return getRoughFileInfo(resource) != null;
+ }
+
+ public FileInfo.RoughFileInfo getRoughFileInfo(TsFileResource resource) {
+ FileInfo.RoughFileInfo roughFileInfo = roughInfoMap.get(resource);
+ if (roughFileInfo != null) {
+ return roughFileInfo;
+ }
+ synchronized (globalRoughInfoCacheForCompaction) {
+ roughFileInfo =
globalRoughInfoCacheForCompaction.get(resource.getTsFileID());
+ }
+ if (roughFileInfo != null) {
+ roughInfoMap.put(resource, roughFileInfo);
+ }
+ return roughFileInfo;
+ }
+
public static void removeFileInfoFromGlobalFileInfoCache(TsFileResource
resource) {
if (resource == null || resource.getTsFile() == null) {
return;
}
- synchronized (globalFileInfoCacheForFailedCompaction) {
- globalFileInfoCacheForFailedCompaction.remove(resource.getTsFile());
+ if (isCacheMemoryCostAllocated) {
+ synchronized (globalFileInfoCacheForFailedCompaction) {
+ globalFileInfoCacheForFailedCompaction.remove(resource.getTsFileID());
+ }
+ synchronized (globalRoughInfoCacheForCompaction) {
+ globalRoughInfoCacheForCompaction.remove(resource.getTsFileID());
+ }
}
}
+
+ public static void setGlobalCompactionRoughFileInfoCacheSize(
+ int globalCompactionRoughFileInfoCacheSize) {
+ AbstractCompactionEstimator.globalCompactionRoughFileInfoCacheSize =
+ globalCompactionRoughFileInfoCacheSize;
+ }
Review Comment:
Should this be TestOnly too?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]