This is an automated email from the ASF dual-hosted git repository. marklau99 pushed a commit to branch IOTDB-3164 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit e7131f848b674402d04d4e567f2b0dfc3223e8bc Author: Liu Xuxin <[email protected]> AuthorDate: Tue Jul 12 17:21:54 2022 +0800 WriteMemoryController Ready for test --- .../iotdb/db/engine/storagegroup/StorageGroupInfo.java | 8 ++------ .../iotdb/db/engine/storagegroup/TsFileProcessor.java | 17 ++++++++--------- .../java/org/apache/iotdb/db/rescon/SystemInfo.java | 4 ---- .../iotdb/db/rescon/memory/WriteMemoryController.java | 8 ++++---- 4 files changed, 14 insertions(+), 23 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupInfo.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupInfo.java index 8577c427e7..33b310c5d6 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupInfo.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupInfo.java @@ -19,7 +19,7 @@ package org.apache.iotdb.db.engine.storagegroup; import org.apache.iotdb.db.conf.IoTDBDescriptor; -import org.apache.iotdb.db.rescon.SystemInfo; +import org.apache.iotdb.db.rescon.memory.WriteMemoryController; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; @@ -75,10 +75,6 @@ public class StorageGroupInfo { return reportedTsps; } - public boolean needToReportToSystem() { - return memoryCost.get() - lastReportedSize.get() > storageGroupSizeReportThreshold; - } - public void setLastReportedSize(long size) { lastReportedSize.set(size); } @@ -91,6 +87,6 @@ public class StorageGroupInfo { */ public void closeTsFileProcessorAndReportToSystem(TsFileProcessor tsFileProcessor) { reportedTsps.remove(tsFileProcessor); - SystemInfo.getInstance().resetStorageGroupStatus(this); + WriteMemoryController.getInstance().resetStorageGroupInfo(this); } } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java index 84c284ee05..702e336dc3 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java @@ -55,7 +55,6 @@ import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan; import org.apache.iotdb.db.query.context.QueryContext; import org.apache.iotdb.db.rescon.MemTableManager; import org.apache.iotdb.db.rescon.PrimitiveArrayManager; -import org.apache.iotdb.db.rescon.SystemInfo; import org.apache.iotdb.db.rescon.memory.WriteMemoryController; import org.apache.iotdb.db.sync.sender.manager.TsFileSyncManager; import org.apache.iotdb.db.utils.MemUtils; @@ -786,14 +785,16 @@ public class TsFileProcessor { storageGroupInfo.addStorageGroupMemCost(memTableIncrement); tsFileProcessorInfo.addTSPMemCost(chunkMetadataIncrement); WriteMemoryController controller = WriteMemoryController.getInstance(); + boolean allocateMemory = false; try { - if (!controller.tryAllocateMemory(memTableIncrement, storageGroupInfo, this)) { + allocateMemory = controller.tryAllocateMemory(memTableIncrement, storageGroupInfo, this); + if (!allocateMemory) { StorageEngine.blockInsertionIfReject(this); } } catch (WriteProcessRejectException e) { storageGroupInfo.releaseStorageGroupMemCost(memTableIncrement); tsFileProcessorInfo.releaseTSPMemCost(chunkMetadataIncrement); - SystemInfo.getInstance().resetStorageGroupStatus(storageGroupInfo); + controller.resetStorageGroupInfo(storageGroupInfo); throw e; } workMemTable.addTVListRamCost(memTableIncrement); @@ -808,7 +809,8 @@ public class TsFileProcessor { memTableIncrement += textDataIncrement; storageGroupInfo.releaseStorageGroupMemCost(memTableIncrement); tsFileProcessorInfo.releaseTSPMemCost(chunkMetadataIncrement); - SystemInfo.getInstance().resetStorageGroupStatus(storageGroupInfo); + WriteMemoryController.getInstance().releaseMemory(memTableIncrement); + WriteMemoryController.getInstance().resetStorageGroupInfo(storageGroupInfo); workMemTable.releaseTVListRamCost(memTableIncrement); workMemTable.releaseTextDataSize(textDataIncrement); } @@ -1111,9 +1113,6 @@ public class TsFileProcessor { flushListener.onMemTableFlushStarted(tobeFlushed); } - if (enableMemControl) { - SystemInfo.getInstance().addFlushingMemTableCost(tobeFlushed.getTVListsRamCost()); - } flushingMemTables.addLast(tobeFlushed); if (logger.isDebugEnabled()) { logger.debug( @@ -1169,8 +1168,8 @@ public class TsFileProcessor { flushingMemTables.size()); } // report to System - SystemInfo.getInstance().resetStorageGroupStatus(storageGroupInfo); - SystemInfo.getInstance().resetFlushingMemTableCost(memTable.getTVListsRamCost()); + WriteMemoryController.getInstance().resetStorageGroupInfo(storageGroupInfo); + WriteMemoryController.getInstance().releaseMemory(memTable.getTVListsRamCost()); } if (logger.isDebugEnabled()) { logger.debug( diff --git a/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java b/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java index 09e432bec2..cb65841996 100644 --- a/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java +++ b/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java @@ -162,10 +162,6 @@ public class SystemInfo { } } - public synchronized void addFlushingMemTableCost(long flushingMemTableCost) { - this.flushingMemTablesCost += flushingMemTableCost; - } - public synchronized void resetFlushingMemTableCost(long flushingMemTableCost) { this.flushingMemTablesCost -= flushingMemTableCost; } diff --git a/server/src/main/java/org/apache/iotdb/db/rescon/memory/WriteMemoryController.java b/server/src/main/java/org/apache/iotdb/db/rescon/memory/WriteMemoryController.java index 6732a9e244..84dfabf760 100644 --- a/server/src/main/java/org/apache/iotdb/db/rescon/memory/WriteMemoryController.java +++ b/server/src/main/java/org/apache/iotdb/db/rescon/memory/WriteMemoryController.java @@ -91,7 +91,6 @@ public class WriteMemoryController extends MemoryController<TsFileProcessor> { for (StorageGroupInfo storageGroupInfo : reportedStorageGroupMemCostMap.keySet()) { allTsFileProcessors.addAll(storageGroupInfo.getAllReportedTsp()); } - boolean isCurrentTsFileProcessorSelected = false; long memCost = 0; long activeMemSize = memoryUsage.get(); while (activeMemSize - memCost > FLUSH_THRESHOLD) { @@ -106,10 +105,11 @@ public class WriteMemoryController extends MemoryController<TsFileProcessor> { memCost += selectedTsFileProcessor.getWorkMemTableRamCost(); selectedTsFileProcessor.setWorkMemTableShouldFlush(); flushTaskSubmitThreadPool.submit(selectedTsFileProcessor::submitAFlushTask); - if (selectedTsFileProcessor == currentTsFileProcessor) { - isCurrentTsFileProcessorSelected = true; - } allTsFileProcessors.poll(); } } + + public void resetStorageGroupInfo(StorageGroupInfo info) { + reportedStorageGroupMemCostMap.put(info, info.getMemCost()); + } }
