This is an automated email from the ASF dual-hosted git repository.

marklau99 pushed a commit to branch IOTDB-3164
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/IOTDB-3164 by this push:
     new 2ab1dddf96 finish for test
2ab1dddf96 is described below

commit 2ab1dddf9637244a58a7b00b7712848bcebd4e45
Author: Liu Xuxin <[email protected]>
AuthorDate: Thu Jul 14 17:44:33 2022 +0800

    finish for test
---
 .../db/engine/storagegroup/StorageGroupInfo.java   | 12 ++++-
 .../db/engine/storagegroup/TsFileProcessor.java    | 10 ++--
 .../iotdb/db/rescon/memory/MemoryController.java   |  7 ++-
 .../db/rescon/memory/WriteMemoryController.java    | 59 ++++++++++++----------
 4 files changed, 47 insertions(+), 41 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupInfo.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupInfo.java
index 33b310c5d6..e3f3769dfa 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupInfo.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupInfo.java
@@ -19,7 +19,6 @@
 package org.apache.iotdb.db.engine.storagegroup;
 
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.rescon.memory.WriteMemoryController;
 
 import java.util.List;
 import java.util.concurrent.CopyOnWriteArrayList;
@@ -45,6 +44,8 @@ public class StorageGroupInfo {
   /** A set of all unclosed TsFileProcessors in this SG */
   private List<TsFileProcessor> reportedTsps = new CopyOnWriteArrayList<>();
 
+  private volatile boolean recorded = false;
+
   public StorageGroupInfo(DataRegion dataRegion) {
     this.dataRegion = dataRegion;
     memoryCost = new AtomicLong();
@@ -87,6 +88,13 @@ public class StorageGroupInfo {
    */
   public void closeTsFileProcessorAndReportToSystem(TsFileProcessor 
tsFileProcessor) {
     reportedTsps.remove(tsFileProcessor);
-    WriteMemoryController.getInstance().resetStorageGroupInfo(this);
+  }
+
+  public boolean isRecorded() {
+    return recorded;
+  }
+
+  public void setRecorded(boolean recorded) {
+    this.recorded = recorded;
   }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index ce96d157bb..adaee20f82 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -809,7 +809,6 @@ public class TsFileProcessor {
     storageGroupInfo.releaseStorageGroupMemCost(memTableIncrement);
     tsFileProcessorInfo.releaseTSPMemCost(chunkMetadataIncrement);
     WriteMemoryController.getInstance().releaseMemory(memTableIncrement);
-    
WriteMemoryController.getInstance().resetStorageGroupInfo(storageGroupInfo);
     workMemTable.releaseTVListRamCost(memTableIncrement);
     workMemTable.releaseTextDataSize(textDataIncrement);
   }
@@ -1167,12 +1166,9 @@ public class TsFileProcessor {
               flushingMemTables.size());
         }
         // report to System
-        
WriteMemoryController.getInstance().resetStorageGroupInfo(storageGroupInfo);
-        logger.error(
-            "Memory usage for {} is {}",
-            storageGroupName,
-            
WriteMemoryController.getInstance().getMemoryUsageForSg(storageGroupName));
-        
WriteMemoryController.getInstance().releaseMemory(memTable.getTVListsRamCost());
+        WriteMemoryController.getInstance()
+            .releaseFlushingMemory(
+                memTable.getTVListsRamCost(), storageGroupName, 
memTable.getMemTableId());
         logger.error("Release size {} for {}", memTable.getTVListsRamCost(), 
storageGroupName);
       }
       if (logger.isDebugEnabled()) {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/rescon/memory/MemoryController.java 
b/server/src/main/java/org/apache/iotdb/db/rescon/memory/MemoryController.java
index 9e39fad30e..80fe1063a7 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/rescon/memory/MemoryController.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/rescon/memory/MemoryController.java
@@ -70,7 +70,7 @@ public class MemoryController<T> {
       }
 
       if (memoryUsage.compareAndSet(current, newUsage)) {
-        checkTrigger(current, newUsage, triggerParam);
+        checkTrigger(newUsage, triggerParam);
         return true;
       }
     }
@@ -137,10 +137,9 @@ public class MemoryController<T> {
     }
   }
 
-  private void checkTrigger(long prevUsage, long newUsage, T triggerParam) {
-    if (newUsage >= triggerThreshold && prevUsage < triggerThreshold && 
trigger != null) {
+  private void checkTrigger(long usage, T triggerParam) {
+    if (usage >= triggerThreshold && trigger != null) {
       if (triggerRunning.compareAndSet(false, true)) {
-        log.info("Start to execute memory controller trigger");
         try {
           trigger.run(triggerParam);
         } finally {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/rescon/memory/WriteMemoryController.java
 
b/server/src/main/java/org/apache/iotdb/db/rescon/memory/WriteMemoryController.java
index d9dcb48c5c..a487ffcc43 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/rescon/memory/WriteMemoryController.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/rescon/memory/WriteMemoryController.java
@@ -27,9 +27,9 @@ import 
org.apache.iotdb.db.engine.storagegroup.TsFileProcessor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Map;
 import java.util.PriorityQueue;
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -42,8 +42,7 @@ public class WriteMemoryController extends 
MemoryController<TsFileProcessor> {
   private static final double REJECT_THRESHOLD = memorySizeForWrite * 
config.getRejectProportion();
   private volatile boolean rejected = false;
   private AtomicLong flushingMemory = new AtomicLong(0);
-  private Map<StorageGroupInfo, Long> reportedStorageGroupMemCostMap = new 
ConcurrentHashMap<>();
-  private Map<String, AtomicLong> memoryUsageForEachSg = new 
ConcurrentHashMap<>();
+  private Set<StorageGroupInfo> infoSet = new CopyOnWriteArraySet<>();
   private ExecutorService flushTaskSubmitThreadPool =
       IoTDBThreadPoolFactory.newSingleThreadExecutor("FlushTask-Submit-Pool");
 
@@ -63,23 +62,21 @@ public class WriteMemoryController extends 
MemoryController<TsFileProcessor> {
           memoryUsage.get());
       rejected = true;
     }
-    if (success) {
-      reportedStorageGroupMemCostMap.put(info, info.getMemCost());
-      memoryUsageForEachSg
-          .computeIfAbsent(
-              info.getDataRegion().getLogicalStorageGroupName()
-                  + "-"
-                  + info.getDataRegion().getDataRegionId(),
-              x -> new AtomicLong(0))
-          .addAndGet(size);
+    if (!info.isRecorded()) {
+      info.setRecorded(true);
+      infoSet.add(info);
     }
     return success;
   }
 
-  @Override
-  public void releaseMemory(long size) {
+  public void releaseFlushingMemory(long size, String storageGroup, long 
memTableId) {
+    this.flushingMemory.addAndGet(-size);
+    this.releaseMemory(size, storageGroup, memTableId);
+  }
+
+  public void releaseMemory(long size, String storageGroup, long memTableId) {
     super.releaseMemory(size);
-    if (memoryUsage.get() < REJECT_THRESHOLD) {
+    if (rejected && memoryUsage.get() < REJECT_THRESHOLD) {
       rejected = false;
     }
   }
@@ -101,17 +98,21 @@ public class WriteMemoryController extends 
MemoryController<TsFileProcessor> {
 
   protected void chooseMemtableToFlush(TsFileProcessor currentTsFileProcessor) 
{
     // If invoke flush by replaying logs, do not flush now!
-    if (reportedStorageGroupMemCostMap.size() == 0) {
+    if (infoSet.size() == 0) {
+      return;
+    }
+    long memCost = 0;
+    long activeMemSize = memoryUsage.get() - flushingMemory.get();
+    if (activeMemSize - memCost < FLUSH_THRESHOLD) {
       return;
     }
     PriorityQueue<TsFileProcessor> allTsFileProcessors =
         new PriorityQueue<>(
             (o1, o2) -> Long.compare(o2.getWorkMemTableRamCost(), 
o1.getWorkMemTableRamCost()));
-    for (StorageGroupInfo storageGroupInfo : 
reportedStorageGroupMemCostMap.keySet()) {
+    for (StorageGroupInfo storageGroupInfo : infoSet) {
       allTsFileProcessors.addAll(storageGroupInfo.getAllReportedTsp());
     }
-    long memCost = 0;
-    long activeMemSize = memoryUsage.get() - flushingMemory.get();
+    long selectedCount = 0;
     while (activeMemSize - memCost > FLUSH_THRESHOLD) {
       if (allTsFileProcessors.isEmpty()
           || allTsFileProcessors.peek().getWorkMemTableRamCost() == 0) {
@@ -121,18 +122,20 @@ public class WriteMemoryController extends 
MemoryController<TsFileProcessor> {
       if (selectedTsFileProcessor == null) {
         break;
       }
+      if (selectedTsFileProcessor.shouldFlush()) {
+        continue;
+      }
       memCost += selectedTsFileProcessor.getWorkMemTableRamCost();
       selectedTsFileProcessor.setWorkMemTableShouldFlush();
+      
flushingMemory.addAndGet(selectedTsFileProcessor.getWorkMemTableRamCost());
       
flushTaskSubmitThreadPool.submit(selectedTsFileProcessor::submitAFlushTask);
+      selectedCount++;
       allTsFileProcessors.poll();
     }
-  }
-
-  public void resetStorageGroupInfo(StorageGroupInfo info) {
-    reportedStorageGroupMemCostMap.put(info, info.getMemCost());
-  }
-
-  public long getMemoryUsageForSg(String sgName) {
-    return memoryUsageForEachSg.get(sgName).get();
+    logger.info(
+        "Select {} memtable to flush, flushing memory is {}, remaining memory 
is {}",
+        selectedCount,
+        flushingMemory.get(),
+        memoryUsage.get() - flushingMemory.get());
   }
 }

Reply via email to