This is an automated email from the ASF dual-hosted git repository. marklau99 pushed a commit to branch IOTDB-3164 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 264584757c6f9d504e24468f0f2fae50a2758165 Author: Liu Xuxin <[email protected]> AuthorDate: Tue Jul 12 17:07:49 2022 +0800 finish trigger function for WriteMemoryController --- .../db/engine/storagegroup/TsFileProcessor.java | 20 +++++----- .../db/rescon/memory/WriteMemoryController.java | 44 +++++++++++++++++++++- 2 files changed, 52 insertions(+), 12 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java index f906c47a4d..84c284ee05 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java @@ -785,18 +785,16 @@ public class TsFileProcessor { memTableIncrement += textDataIncrement; storageGroupInfo.addStorageGroupMemCost(memTableIncrement); tsFileProcessorInfo.addTSPMemCost(chunkMetadataIncrement); - if (storageGroupInfo.needToReportToSystem()) { - WriteMemoryController controller = WriteMemoryController.getInstance(); - try { - if (!controller.tryAllocateMemory(memTableIncrement, this)) { - StorageEngine.blockInsertionIfReject(this); - } - } catch (WriteProcessRejectException e) { - storageGroupInfo.releaseStorageGroupMemCost(memTableIncrement); - tsFileProcessorInfo.releaseTSPMemCost(chunkMetadataIncrement); - SystemInfo.getInstance().resetStorageGroupStatus(storageGroupInfo); - throw e; + WriteMemoryController controller = WriteMemoryController.getInstance(); + try { + if (!controller.tryAllocateMemory(memTableIncrement, storageGroupInfo, this)) { + StorageEngine.blockInsertionIfReject(this); } + } catch (WriteProcessRejectException e) { + storageGroupInfo.releaseStorageGroupMemCost(memTableIncrement); + tsFileProcessorInfo.releaseTSPMemCost(chunkMetadataIncrement); + SystemInfo.getInstance().resetStorageGroupStatus(storageGroupInfo); + throw e; } workMemTable.addTVListRamCost(memTableIncrement); workMemTable.addTextDataSize(textDataIncrement); diff --git a/server/src/main/java/org/apache/iotdb/db/rescon/memory/WriteMemoryController.java b/server/src/main/java/org/apache/iotdb/db/rescon/memory/WriteMemoryController.java index 8dc235fdb5..6732a9e244 100644 --- a/server/src/main/java/org/apache/iotdb/db/rescon/memory/WriteMemoryController.java +++ b/server/src/main/java/org/apache/iotdb/db/rescon/memory/WriteMemoryController.java @@ -24,11 +24,16 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.engine.storagegroup.StorageGroupInfo; import org.apache.iotdb.db.engine.storagegroup.TsFileProcessor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.util.Map; +import java.util.PriorityQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; public class WriteMemoryController extends MemoryController<TsFileProcessor> { + private static final Logger logger = LoggerFactory.getLogger(WriteMemoryController.class); private static volatile WriteMemoryController INSTANCE; private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig(); private static final long memorySizeForWrite = config.getAllocateMemoryForWrite(); @@ -49,8 +54,14 @@ public class WriteMemoryController extends MemoryController<TsFileProcessor> { public boolean tryAllocateMemory(long size, StorageGroupInfo info, TsFileProcessor processor) { boolean success = super.tryAllocateMemory(size, processor); if (memoryUsage.get() > REJECT_THRESHOLD) { + logger.info( + "Change system to reject status. Triggered by: logical SG ({}), mem cost delta ({}), totalSgMemCost ({}).", + info.getDataRegion().getLogicalStorageGroupName(), + size, + memoryUsage.get()); rejected = true; } + reportedStorageGroupMemCostMap.put(info, info.getMemCost()); return success; } @@ -69,5 +80,36 @@ public class WriteMemoryController extends MemoryController<TsFileProcessor> { return INSTANCE; } - protected void chooseMemtableToFlush(TsFileProcessor processor) {} + protected void chooseMemtableToFlush(TsFileProcessor currentTsFileProcessor) { + // If invoke flush by replaying logs, do not flush now! + if (reportedStorageGroupMemCostMap.size() == 0) { + return; + } + PriorityQueue<TsFileProcessor> allTsFileProcessors = + new PriorityQueue<>( + (o1, o2) -> Long.compare(o2.getWorkMemTableRamCost(), o1.getWorkMemTableRamCost())); + for (StorageGroupInfo storageGroupInfo : reportedStorageGroupMemCostMap.keySet()) { + allTsFileProcessors.addAll(storageGroupInfo.getAllReportedTsp()); + } + boolean isCurrentTsFileProcessorSelected = false; + long memCost = 0; + long activeMemSize = memoryUsage.get(); + while (activeMemSize - memCost > FLUSH_THRESHOLD) { + if (allTsFileProcessors.isEmpty() + || allTsFileProcessors.peek().getWorkMemTableRamCost() == 0) { + return; + } + TsFileProcessor selectedTsFileProcessor = allTsFileProcessors.peek(); + if (selectedTsFileProcessor == null) { + break; + } + memCost += selectedTsFileProcessor.getWorkMemTableRamCost(); + selectedTsFileProcessor.setWorkMemTableShouldFlush(); + flushTaskSubmitThreadPool.submit(selectedTsFileProcessor::submitAFlushTask); + if (selectedTsFileProcessor == currentTsFileProcessor) { + isCurrentTsFileProcessorSelected = true; + } + allTsFileProcessors.poll(); + } + } }
