This is an automated email from the ASF dual-hosted git repository.

marklau99 pushed a commit to branch IOTDB-3164
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 264584757c6f9d504e24468f0f2fae50a2758165
Author: Liu Xuxin <[email protected]>
AuthorDate: Tue Jul 12 17:07:49 2022 +0800

    finish trigger function for WriteMemoryController
---
 .../db/engine/storagegroup/TsFileProcessor.java    | 20 +++++-----
 .../db/rescon/memory/WriteMemoryController.java    | 44 +++++++++++++++++++++-
 2 files changed, 52 insertions(+), 12 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index f906c47a4d..84c284ee05 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -785,18 +785,16 @@ public class TsFileProcessor {
     memTableIncrement += textDataIncrement;
     storageGroupInfo.addStorageGroupMemCost(memTableIncrement);
     tsFileProcessorInfo.addTSPMemCost(chunkMetadataIncrement);
-    if (storageGroupInfo.needToReportToSystem()) {
-      WriteMemoryController controller = WriteMemoryController.getInstance();
-      try {
-        if (!controller.tryAllocateMemory(memTableIncrement, this)) {
-          StorageEngine.blockInsertionIfReject(this);
-        }
-      } catch (WriteProcessRejectException e) {
-        storageGroupInfo.releaseStorageGroupMemCost(memTableIncrement);
-        tsFileProcessorInfo.releaseTSPMemCost(chunkMetadataIncrement);
-        SystemInfo.getInstance().resetStorageGroupStatus(storageGroupInfo);
-        throw e;
+    WriteMemoryController controller = WriteMemoryController.getInstance();
+    try {
+      if (!controller.tryAllocateMemory(memTableIncrement, storageGroupInfo, 
this)) {
+        StorageEngine.blockInsertionIfReject(this);
       }
+    } catch (WriteProcessRejectException e) {
+      storageGroupInfo.releaseStorageGroupMemCost(memTableIncrement);
+      tsFileProcessorInfo.releaseTSPMemCost(chunkMetadataIncrement);
+      SystemInfo.getInstance().resetStorageGroupStatus(storageGroupInfo);
+      throw e;
     }
     workMemTable.addTVListRamCost(memTableIncrement);
     workMemTable.addTextDataSize(textDataIncrement);
diff --git 
a/server/src/main/java/org/apache/iotdb/db/rescon/memory/WriteMemoryController.java
 
b/server/src/main/java/org/apache/iotdb/db/rescon/memory/WriteMemoryController.java
index 8dc235fdb5..6732a9e244 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/rescon/memory/WriteMemoryController.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/rescon/memory/WriteMemoryController.java
@@ -24,11 +24,16 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.storagegroup.StorageGroupInfo;
 import org.apache.iotdb.db.engine.storagegroup.TsFileProcessor;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.util.Map;
+import java.util.PriorityQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 
 public class WriteMemoryController extends MemoryController<TsFileProcessor> {
+  private static final Logger logger = 
LoggerFactory.getLogger(WriteMemoryController.class);
   private static volatile WriteMemoryController INSTANCE;
   private static final IoTDBConfig config = 
IoTDBDescriptor.getInstance().getConfig();
   private static final long memorySizeForWrite = 
config.getAllocateMemoryForWrite();
@@ -49,8 +54,14 @@ public class WriteMemoryController extends 
MemoryController<TsFileProcessor> {
   public boolean tryAllocateMemory(long size, StorageGroupInfo info, 
TsFileProcessor processor) {
     boolean success = super.tryAllocateMemory(size, processor);
     if (memoryUsage.get() > REJECT_THRESHOLD) {
+      logger.info(
+          "Change system to reject status. Triggered by: logical SG ({}), mem 
cost delta ({}), totalSgMemCost ({}).",
+          info.getDataRegion().getLogicalStorageGroupName(),
+          size,
+          memoryUsage.get());
       rejected = true;
     }
+    reportedStorageGroupMemCostMap.put(info, info.getMemCost());
     return success;
   }
 
@@ -69,5 +80,36 @@ public class WriteMemoryController extends 
MemoryController<TsFileProcessor> {
     return INSTANCE;
   }
 
-  protected void chooseMemtableToFlush(TsFileProcessor processor) {}
+  protected void chooseMemtableToFlush(TsFileProcessor currentTsFileProcessor) 
{
+    // If invoke flush by replaying logs, do not flush now!
+    if (reportedStorageGroupMemCostMap.size() == 0) {
+      return;
+    }
+    PriorityQueue<TsFileProcessor> allTsFileProcessors =
+        new PriorityQueue<>(
+            (o1, o2) -> Long.compare(o2.getWorkMemTableRamCost(), 
o1.getWorkMemTableRamCost()));
+    for (StorageGroupInfo storageGroupInfo : 
reportedStorageGroupMemCostMap.keySet()) {
+      allTsFileProcessors.addAll(storageGroupInfo.getAllReportedTsp());
+    }
+    boolean isCurrentTsFileProcessorSelected = false;
+    long memCost = 0;
+    long activeMemSize = memoryUsage.get();
+    while (activeMemSize - memCost > FLUSH_THRESHOLD) {
+      if (allTsFileProcessors.isEmpty()
+          || allTsFileProcessors.peek().getWorkMemTableRamCost() == 0) {
+        return;
+      }
+      TsFileProcessor selectedTsFileProcessor = allTsFileProcessors.peek();
+      if (selectedTsFileProcessor == null) {
+        break;
+      }
+      memCost += selectedTsFileProcessor.getWorkMemTableRamCost();
+      selectedTsFileProcessor.setWorkMemTableShouldFlush();
+      
flushTaskSubmitThreadPool.submit(selectedTsFileProcessor::submitAFlushTask);
+      if (selectedTsFileProcessor == currentTsFileProcessor) {
+        isCurrentTsFileProcessorSelected = true;
+      }
+      allTsFileProcessors.poll();
+    }
+  }
 }

Reply via email to