This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch rel/0.11
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/0.11 by this push:
     new 0b7cd50  [IOTDB-1306][To rel/0.11] New memory control strategy (#3061)
0b7cd50 is described below

commit 0b7cd500a18df5f3b0b0ba1264c79bb316fec994
Author: Haonan <[email protected]>
AuthorDate: Thu Apr 29 13:53:11 2021 +0800

    [IOTDB-1306][To rel/0.11] New memory control strategy (#3061)
---
 .../org/apache/iotdb/db/engine/StorageEngine.java  |  38 +++-
 .../iotdb/db/engine/memtable/AbstractMemTable.java |  12 ++
 .../apache/iotdb/db/engine/memtable/IMemTable.java |   4 +
 .../db/engine/storagegroup/StorageGroupInfo.java   |   2 +-
 .../engine/storagegroup/StorageGroupProcessor.java |  31 +--
 .../db/engine/storagegroup/TsFileProcessor.java    |  44 ++--
 .../db/engine/storagegroup/TsFileResource.java     |  22 --
 .../org/apache/iotdb/db/rescon/SystemInfo.java     | 223 ++++++++++++---------
 .../engine/storagegroup/TsFileProcessorTest.java   |  16 +-
 9 files changed, 206 insertions(+), 186 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java 
b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
index 5f08ea4..f7d0f63 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.engine;
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Comparator;
 import java.util.ConcurrentModificationException;
 import java.util.HashMap;
@@ -81,7 +82,9 @@ import org.apache.iotdb.db.service.ServiceType;
 import org.apache.iotdb.db.utils.FilePathUtils;
 import org.apache.iotdb.db.utils.TestOnly;
 import org.apache.iotdb.db.utils.UpgradeUtils;
+import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.expression.impl.SingleSeriesExpression;
 import org.apache.iotdb.tsfile.utils.Pair;
@@ -153,8 +156,9 @@ public class StorageEngine implements IService {
    * whether enable data partition if disabled, all data belongs to partition 0
    */
   @ServerConfigConsistent
-  private static boolean enablePartition =
-      IoTDBDescriptor.getInstance().getConfig().isEnablePartition();
+  private static boolean enablePartition = config.isEnablePartition();
+
+  private final boolean enableMemControl = config.isEnableMemControl();
 
   private StorageEngine() {
     logger = LoggerFactory.getLogger(StorageEngine.class);
@@ -373,6 +377,13 @@ public class StorageEngine implements IService {
    */
   public void insert(InsertRowPlan insertRowPlan) throws 
StorageEngineException {
 
+    if (enableMemControl) {
+      try {
+        blockInsertionIfReject(null);
+      } catch (WriteProcessException e) {
+        throw new StorageEngineException(e);
+      }
+    }
     StorageGroupProcessor storageGroupProcessor = 
getProcessor(insertRowPlan.getDeviceId());
 
     // TODO monitor: update statistics
@@ -385,6 +396,13 @@ public class StorageEngine implements IService {
 
   public void insert(InsertRowsOfOneDevicePlan insertRowsOfOneDevicePlan)
       throws StorageEngineException {
+    if (enableMemControl) {
+      try {
+        blockInsertionIfReject(null);
+      } catch (WriteProcessException e) {
+        throw new StorageEngineException(e);
+      }
+    }
     StorageGroupProcessor storageGroupProcessor = getProcessor(
         insertRowsOfOneDevicePlan.getDeviceId());
 
@@ -406,6 +424,15 @@ public class StorageEngine implements IService {
    */
   public void insertTablet(InsertTabletPlan insertTabletPlan)
       throws StorageEngineException, BatchInsertionException {
+    if (enableMemControl) {
+      try {
+        blockInsertionIfReject(null);
+      } catch (WriteProcessRejectException e) {
+        TSStatus[] results = new TSStatus[insertTabletPlan.getRowCount()];
+        Arrays.fill(results, 
RpcUtils.getStatus(TSStatusCode.WRITE_PROCESS_REJECT));
+        throw new BatchInsertionException(results);
+      }
+    }
     StorageGroupProcessor storageGroupProcessor;
     try {
       storageGroupProcessor = getProcessor(insertTabletPlan.getDeviceId());
@@ -849,14 +876,17 @@ public class StorageEngine implements IService {
   /**
    * block insertion if the insertion is rejected by memory control
    */
-  public static void blockInsertionIfReject() throws 
WriteProcessRejectException {
+  public static void blockInsertionIfReject(TsFileProcessor tsfileProcessor) 
throws WriteProcessRejectException {
     long startTime = System.currentTimeMillis();
     while (SystemInfo.getInstance().isRejected()) {
+      if (tsfileProcessor != null && tsfileProcessor.shouldFlush()) {
+        break;
+      }
       try {
         TimeUnit.MILLISECONDS.sleep(config.getCheckPeriodWhenInsertBlocked());
         if (System.currentTimeMillis() - startTime > 
config.getMaxWaitingTimeWhenInsertBlocked()) {
           throw new WriteProcessRejectException(
-              "System rejected over " + 
config.getMaxWaitingTimeWhenInsertBlocked() +
+              "System rejected over " + (System.currentTimeMillis() - 
startTime) +
                   "ms");
         }
       } catch (InterruptedException e) {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
index 5960d43..5e7daaa 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
@@ -71,6 +71,8 @@ public abstract class AbstractMemTable implements IMemTable {
    */
   protected boolean disableMemControl = true;
 
+  private volatile boolean shouldFlush = false;
+
   private int seriesNumber = 0;
 
   private long totalPointsNum = 0;
@@ -323,6 +325,16 @@ public abstract class AbstractMemTable implements 
IMemTable {
   }
 
   @Override
+  public void setShouldFlush() {
+    shouldFlush = true;
+  }
+
+  @Override
+  public boolean shouldFlush() {
+    return shouldFlush;
+  }
+
+  @Override
   public void release() {
     for (Entry<String, Map<String, IWritableMemChunk>> entry : 
memTableMap.entrySet()) {
       for (Entry<String, IWritableMemChunk> subEntry : 
entry.getValue().entrySet()) {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java 
b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java
index 81435c9..17f2f5b 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java
@@ -130,6 +130,10 @@ public interface IMemTable {
 
   void setVersion(long version);
 
+  void setShouldFlush();
+
+  boolean shouldFlush();
+
   void release();
 
   /**
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupInfo.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupInfo.java
index a31d41a..25105b9 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupInfo.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupInfo.java
@@ -101,6 +101,6 @@ public class StorageGroupInfo {
    */
   public void closeTsFileProcessorAndReportToSystem(TsFileProcessor 
tsFileProcessor) {
     reportedTsps.remove(tsFileProcessor);
-    SystemInfo.getInstance().resetStorageGroupStatus(this, true);
+    SystemInfo.getInstance().resetStorageGroupStatus(this);
   }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 9c9db9f..4fd2d0f 100755
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -636,8 +636,6 @@ public class StorageGroupProcessor {
             TsFileProcessorInfo tsFileProcessorInfo = new 
TsFileProcessorInfo(storageGroupInfo);
             tsFileProcessor.setTsFileProcessorInfo(tsFileProcessorInfo);
             this.storageGroupInfo.initTsFileProcessorInfo(tsFileProcessor);
-            tsFileProcessorInfo.addTSPMemCost(
-                tsFileProcessor.getTsFileResource().calculateRamSize());
           }
           workSequenceTsFileProcessors.put(timePartitionId, tsFileProcessor);
         } else {
@@ -655,8 +653,6 @@ public class StorageGroupProcessor {
             TsFileProcessorInfo tsFileProcessorInfo = new 
TsFileProcessorInfo(storageGroupInfo);
             tsFileProcessor.setTsFileProcessorInfo(tsFileProcessorInfo);
             this.storageGroupInfo.initTsFileProcessorInfo(tsFileProcessor);
-            tsFileProcessorInfo.addTSPMemCost(
-                tsFileProcessor.getTsFileResource().calculateRamSize());
           }
           workUnsequenceTsFileProcessors.put(timePartitionId, tsFileProcessor);
         }
@@ -700,9 +696,6 @@ public class StorageGroupProcessor {
     if (!isAlive(insertRowPlan.getTime())) {
       throw new OutOfTTLException(insertRowPlan.getTime(), 
(System.currentTimeMillis() - dataTTL));
     }
-    if (enableMemControl) {
-      StorageEngine.blockInsertionIfReject();
-    }
     writeLock();
     try {
       // init map
@@ -739,15 +732,6 @@ public class StorageGroupProcessor {
    */
   @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity 
warning
   public void insertTablet(InsertTabletPlan insertTabletPlan) throws 
BatchInsertionException {
-    if (enableMemControl) {
-      try {
-        StorageEngine.blockInsertionIfReject();
-      } catch (WriteProcessRejectException e) {
-        TSStatus[] results = new TSStatus[insertTabletPlan.getRowCount()];
-        Arrays.fill(results, 
RpcUtils.getStatus(TSStatusCode.WRITE_PROCESS_REJECT));
-        throw new BatchInsertionException(results);
-      }
-    }
     writeLock();
     try {
       TSStatus[] results = new TSStatus[insertTabletPlan.getRowCount()];
@@ -1007,11 +991,11 @@ public class StorageGroupProcessor {
     }
   }
 
-  public void asyncFlushMemTableInTsFileProcessor(TsFileProcessor 
tsFileProcessor) {
+  public void submitAFlushTaskIfShouldFlush(TsFileProcessor tsFileProcessor) {
     writeLock();
     try {
-      if (!closingSequenceTsFileProcessor.contains(tsFileProcessor)
-          && !closingUnSequenceTsFileProcessor.contains(tsFileProcessor)) {
+      // check memtable size and may asyncTryToFlush the work memtable
+      if (tsFileProcessor.shouldFlush()) {
         fileFlushPolicy.apply(this, tsFileProcessor, 
tsFileProcessor.isSequence());
       }
     } finally {
@@ -1125,7 +1109,6 @@ public class StorageGroupProcessor {
         TsFileProcessorInfo tsFileProcessorInfo = new 
TsFileProcessorInfo(storageGroupInfo);
         tsFileProcessor.setTsFileProcessorInfo(tsFileProcessorInfo);
         this.storageGroupInfo.initTsFileProcessorInfo(tsFileProcessor);
-        
tsFileProcessorInfo.addTSPMemCost(tsFileProcessor.getTsFileResource().calculateRamSize());
       }
     } else {
       tsFileProcessor =
@@ -1142,7 +1125,6 @@ public class StorageGroupProcessor {
         TsFileProcessorInfo tsFileProcessorInfo = new 
TsFileProcessorInfo(storageGroupInfo);
         tsFileProcessor.setTsFileProcessorInfo(tsFileProcessorInfo);
         this.storageGroupInfo.initTsFileProcessorInfo(tsFileProcessor);
-        
tsFileProcessorInfo.addTSPMemCost(tsFileProcessor.getTsFileResource().calculateRamSize());
       }
     }
     tsFileProcessor.addCloseFileListeners(customCloseFileListeners);
@@ -1199,7 +1181,8 @@ public class StorageGroupProcessor {
     // for sequence tsfile, we update the endTimeMap only when the file is 
prepared to be closed.
     // for unsequence tsfile, we have maintained the endTimeMap when an 
insertion comes.
     if (closingSequenceTsFileProcessor.contains(tsFileProcessor)
-        || closingUnSequenceTsFileProcessor.contains(tsFileProcessor)) {
+        || closingUnSequenceTsFileProcessor.contains(tsFileProcessor)
+        || tsFileProcessor.alreadyMarkedClosing()) {
       return;
     }
     logger.info(
@@ -2690,10 +2673,6 @@ public class StorageGroupProcessor {
 
   public void insert(InsertRowsOfOneDevicePlan insertRowsOfOneDevicePlan)
       throws WriteProcessException {
-
-    if (enableMemControl) {
-      StorageEngine.blockInsertionIfReject();
-    }
     writeLock();
     try {
       boolean isSequence = false;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index 20b924f..422d78d 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -117,7 +117,6 @@ public class TsFileProcessor {
   private WriteLogNode logNode;
   private final boolean sequence;
   private long totalMemTableSize;
-  private boolean shouldFlush = false;
 
   private static final String FLUSH_QUERY_WRITE_LOCKED = "{}: {} get 
flushQueryLock write lock";
   private static final String FLUSH_QUERY_WRITE_RELEASE = "{}: {} get 
flushQueryLock write lock released";
@@ -277,8 +276,6 @@ public class TsFileProcessor {
     long textDataIncrement = 0L;
     long chunkMetadataIncrement = 0L;
     String deviceId = insertRowPlan.getDeviceId().getFullPath();
-    long unsealedResourceIncrement = 
-        tsFileResource.estimateRamIncrement(deviceId);
     for (int i = 0; i < insertRowPlan.getDataTypes().length; i++) {
       // skip failed Measurements
       if (insertRowPlan.getDataTypes()[i] == null) {
@@ -302,8 +299,7 @@ public class TsFileProcessor {
         textDataIncrement += MemUtils.getBinarySize((Binary) 
insertRowPlan.getValues()[i]);
       }
     }
-    updateMemoryInfo(memTableIncrement, unsealedResourceIncrement, 
-        chunkMetadataIncrement, textDataIncrement);
+    updateMemoryInfo(memTableIncrement, chunkMetadataIncrement, 
textDataIncrement);
   }
 
   private void checkMemCostAndAddToTspInfo(InsertTabletPlan insertTabletPlan, 
int start, int end)
@@ -315,7 +311,6 @@ public class TsFileProcessor {
     long textDataIncrement = 0L;
     long chunkMetadataIncrement = 0L;
     String deviceId = insertTabletPlan.getDeviceId().getFullPath();
-    long unsealedResourceIncrement = 
tsFileResource.estimateRamIncrement(deviceId);
 
     for (int i = 0; i < insertTabletPlan.getDataTypes().length; i++) {
       // skip failed Measurements
@@ -352,23 +347,23 @@ public class TsFileProcessor {
         textDataIncrement += MemUtils.getBinaryColumnSize(column, start, end);
       }
     }
-    updateMemoryInfo(memTableIncrement, unsealedResourceIncrement, 
-        chunkMetadataIncrement, textDataIncrement);
+    updateMemoryInfo(memTableIncrement, chunkMetadataIncrement, 
textDataIncrement);
   }
 
-  private void updateMemoryInfo(long memTableIncrement, long 
unsealedResourceIncrement,
+  private void updateMemoryInfo(long memTableIncrement,
       long chunkMetadataIncrement, long textDataIncrement) throws 
WriteProcessException {
     memTableIncrement += textDataIncrement;
     storageGroupInfo.addStorageGroupMemCost(memTableIncrement);
-    tsFileProcessorInfo.addTSPMemCost(unsealedResourceIncrement + 
chunkMetadataIncrement);
+    tsFileProcessorInfo.addTSPMemCost(chunkMetadataIncrement);
     if (storageGroupInfo.needToReportToSystem()) {
-      SystemInfo.getInstance().reportStorageGroupStatus(storageGroupInfo);
       try {
-        StorageEngine.blockInsertionIfReject();
+        if 
(!SystemInfo.getInstance().reportStorageGroupStatus(storageGroupInfo, this)) {
+          StorageEngine.blockInsertionIfReject(this);
+        }
       } catch (WriteProcessRejectException e) {
         storageGroupInfo.releaseStorageGroupMemCost(memTableIncrement);
-        tsFileProcessorInfo.releaseTSPMemCost(unsealedResourceIncrement + 
chunkMetadataIncrement);
-        SystemInfo.getInstance().resetStorageGroupStatus(storageGroupInfo, 
false);
+        tsFileProcessorInfo.releaseTSPMemCost(chunkMetadataIncrement);
+        SystemInfo.getInstance().resetStorageGroupStatus(storageGroupInfo);
         throw e;
       }
     }
@@ -420,7 +415,7 @@ public class TsFileProcessor {
     if (workMemTable == null) {
       return false;
     }
-    if (shouldFlush) {
+    if (workMemTable.shouldFlush()) {
       logger.info("The memtable size {} of tsfile {} reaches the mem control 
threshold",
           workMemTable.memSize(), 
tsFileResource.getTsFile().getAbsolutePath());
       return true;
@@ -636,6 +631,9 @@ public class TsFileProcessor {
       flushListener.onFlushStart(tobeFlushed);
     }
 
+    if (enableMemControl) {
+      
SystemInfo.getInstance().addFlushingMemTableCost(tobeFlushed.getTVListsRamCost());
+    }
     flushingMemTables.addLast(tobeFlushed);
     if (logger.isDebugEnabled()) {
       logger.debug(
@@ -650,7 +648,6 @@ public class TsFileProcessor {
       totalMemTableSize += tobeFlushed.memSize();
     }
     workMemTable = null;
-    shouldFlush = false;
     FlushManager.getInstance().registerTsFileProcessor(this);
   }
 
@@ -687,7 +684,8 @@ public class TsFileProcessor {
           tsFileResource.getTsFile().getName(), flushingMemTables.size());
         }
         // report to System
-        SystemInfo.getInstance().resetStorageGroupStatus(storageGroupInfo, 
true);
+        SystemInfo.getInstance().resetStorageGroupStatus(storageGroupInfo);
+        
SystemInfo.getInstance().resetFlushingMemTableCost(memTable.getTVListsRamCost());
       }
       if (logger.isDebugEnabled()) {
         logger.debug("{}: {} flush finished, remove a memtable from flushing 
list, "
@@ -982,12 +980,12 @@ public class TsFileProcessor {
     return sequence;
   }
 
-  public void startAsyncFlush() {
-    
storageGroupInfo.getStorageGroupProcessor().asyncFlushMemTableInTsFileProcessor(this);
+  public void submitAFlushTask() {
+    
this.storageGroupInfo.getStorageGroupProcessor().submitAFlushTaskIfShouldFlush(this);
   }
 
-  public void setFlush() {
-    shouldFlush = true;
+  public void setWorkMemTableShouldFlush() {
+    workMemTable.setShouldFlush();
   }
 
   public void addFlushListener(FlushListener listener) {
@@ -1005,4 +1003,8 @@ public class TsFileProcessor {
   public void addCloseFileListeners(Collection<CloseFileListener> listeners) {
     closeFileListeners.addAll(listeners);
   }
+
+  public boolean alreadyMarkedClosing() {
+    return shouldClose;
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
index 2433259..6e79298 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
@@ -798,28 +798,6 @@ public class TsFileResource {
         + RamUsageEstimator.sizeOf(endTimes);
   }
 
-  /**
-   * Calculate the resource ram increment when insert data in TsFileProcessor
-   *
-   * @return ramIncrement
-   */
-  public long estimateRamIncrement(String deviceToBeChecked) {
-    long ramIncrement = 0L;
-    if (!containsDevice(deviceToBeChecked)) {
-      // 80 is the Map.Entry header ram size
-      if (deviceToIndex.isEmpty()) {
-        ramIncrement += 80;
-      }
-      // Map.Entry ram size
-      ramIncrement += RamUsageEstimator.sizeOf(deviceToBeChecked) + 16;
-      // if needs to extend the startTimes and endTimes arrays
-      if (deviceToIndex.size() >= startTimes.length) {
-        ramIncrement += startTimes.length * Long.BYTES;
-      }
-    }
-    return ramIncrement;
-  }
-
   public void delete() throws IOException {
     if (file.exists()) {
       Files.delete(file.toPath());
diff --git a/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java 
b/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java
index dc314e1..2960d50 100644
--- a/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java
+++ b/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java
@@ -19,17 +19,16 @@
 
 package org.apache.iotdb.db.rescon;
 
-import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.PriorityQueue;
-
+import java.util.concurrent.ExecutorService;
+import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.engine.flush.FlushManager;
 import org.apache.iotdb.db.engine.storagegroup.StorageGroupInfo;
 import org.apache.iotdb.db.engine.storagegroup.TsFileProcessor;
+import org.apache.iotdb.db.exception.WriteProcessRejectException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -38,10 +37,14 @@ public class SystemInfo {
   private static final IoTDBConfig config = 
IoTDBDescriptor.getInstance().getConfig();
   private static final Logger logger = 
LoggerFactory.getLogger(SystemInfo.class);
 
-  private long totalSgMemCost = 0L;
+  private long totalStorageGroupMemCost = 0L;
+  private ExecutorService flushTaskSubmitThreadPool =
+      IoTDBThreadPoolFactory.newSingleThreadExecutor("FlushTask-Submit-Pool");
   private volatile boolean rejected = false;
 
-  private Map<StorageGroupInfo, Long> reportedSgMemCostMap = new HashMap<>();
+  private Map<StorageGroupInfo, Long> reportedStorageGroupMemCostMap = new 
HashMap<>();
+
+  private long flushingMemTablesCost = 0L;
 
   private static final double FLUSH_THERSHOLD =
       config.getAllocateMemoryForWrite() * config.getFlushProportion();
@@ -55,24 +58,41 @@ public class SystemInfo {
    *
    * @param storageGroupInfo storage group
    */
-  public synchronized void reportStorageGroupStatus(StorageGroupInfo 
storageGroupInfo) {
-    long delta = storageGroupInfo.getMemCost() -
-        reportedSgMemCostMap.getOrDefault(storageGroupInfo, 0L);
-    totalSgMemCost += delta;
+  public synchronized boolean reportStorageGroupStatus(
+      StorageGroupInfo storageGroupInfo, TsFileProcessor tsFileProcessor)
+      throws WriteProcessRejectException {
+    long delta = storageGroupInfo.getMemCost()
+        - reportedStorageGroupMemCostMap.getOrDefault(storageGroupInfo, 0L);
+    totalStorageGroupMemCost += delta;
     if (logger.isDebugEnabled()) {
       logger.debug("Report Storage Group Status to the system. "
-          + "After adding {}, current sg mem cost is {}.", delta, 
totalSgMemCost);
+          + "After adding {}, current sg mem cost is {}.", delta, 
totalStorageGroupMemCost);
     }
-    reportedSgMemCostMap.put(storageGroupInfo, storageGroupInfo.getMemCost());
+    reportedStorageGroupMemCostMap.put(storageGroupInfo, 
storageGroupInfo.getMemCost());
     storageGroupInfo.setLastReportedSize(storageGroupInfo.getMemCost());
-    if (totalSgMemCost >= FLUSH_THERSHOLD) {
+    if (totalStorageGroupMemCost < FLUSH_THERSHOLD) {
+      return true;
+    } else if (totalStorageGroupMemCost >= FLUSH_THERSHOLD
+        && totalStorageGroupMemCost < REJECT_THERSHOLD) {
       logger.debug("The total storage group mem costs are too large, call for 
flushing. "
-          + "Current sg cost is {}", totalSgMemCost);
-      chooseTSPToMarkFlush();
-    }
-    if (totalSgMemCost >= REJECT_THERSHOLD) {
-      logger.info("Change system to reject status...");
+          + "Current sg cost is {}", totalStorageGroupMemCost);
+      chooseMemTablesToMarkFlush(tsFileProcessor);
+      return true;
+    } else {
       rejected = true;
+      if (chooseMemTablesToMarkFlush(tsFileProcessor)) {
+        if (totalStorageGroupMemCost < config.getAllocateMemoryForWrite()) {
+          return true;
+        } else {
+          throw new WriteProcessRejectException(
+              "Total Storage Group MemCost "
+                  + totalStorageGroupMemCost
+                  + " is over than memorySizeForWriting "
+                  + config.getAllocateMemoryForWrite());
+        }
+      } else {
+        return false;
+      }
     }
   }
 
@@ -82,103 +102,106 @@ public class SystemInfo {
    *
    * @param storageGroupInfo storage group
    */
-  public void resetStorageGroupStatus(
-      StorageGroupInfo storageGroupInfo, boolean shouldInvokeFlush) {
-    boolean needForceAsyncFlush = false;
-    synchronized (this) {
-      if (reportedSgMemCostMap.containsKey(storageGroupInfo)) {
-        this.totalSgMemCost -=
-            (reportedSgMemCostMap.get(storageGroupInfo) - 
storageGroupInfo.getMemCost());
-        storageGroupInfo.setLastReportedSize(storageGroupInfo.getMemCost());
-        reportedSgMemCostMap.put(storageGroupInfo, 
storageGroupInfo.getMemCost());
-      }
-
-      if (totalSgMemCost >= FLUSH_THERSHOLD && totalSgMemCost < 
REJECT_THERSHOLD) {
-        logger.debug("Some sg memory released but still exceeding flush 
proportion, call flush.");
-        if (rejected) {
-          logger.info("Some sg memory released, set system to normal status.");
-        }
-        logCurrentTotalSGMemory();
-        rejected = false;
-        needForceAsyncFlush = true;
-      } else if (totalSgMemCost >= REJECT_THERSHOLD) {
-        logger.warn("Some sg memory released, but system is still in reject 
status.");
-        logCurrentTotalSGMemory();
-        rejected = true;
-        needForceAsyncFlush = true;
+  /**
+   * Report resetting the mem cost of sg to system. It will be called after 
flushing, closing and
+   * failed to insert
+   *
+   * @param storageGroupInfo storage group
+   */
+  public synchronized void resetStorageGroupStatus(StorageGroupInfo 
storageGroupInfo) {
+    long delta = 0;
+
+    if (reportedStorageGroupMemCostMap.containsKey(storageGroupInfo)) {
+      delta = reportedStorageGroupMemCostMap.get(storageGroupInfo) - 
storageGroupInfo.getMemCost();
+      this.totalStorageGroupMemCost -= delta;
+      storageGroupInfo.setLastReportedSize(storageGroupInfo.getMemCost());
+      reportedStorageGroupMemCostMap.put(storageGroupInfo, 
storageGroupInfo.getMemCost());
+    }
 
-      } else {
-        logger.debug("Some sg memory released, system is in normal status.");
-        logCurrentTotalSGMemory();
-        rejected = false;
+    if (totalStorageGroupMemCost >= FLUSH_THERSHOLD
+        && totalStorageGroupMemCost < REJECT_THERSHOLD) {
+      logger.debug(
+          "SG ({}) released memory (delta: {}) but still exceeding flush 
proportion (totalSgMemCost: {}), call flush.",
+          storageGroupInfo.getStorageGroupProcessor().getStorageGroupName(),
+          delta,
+          totalStorageGroupMemCost);
+      if (rejected) {
+        logger.info(
+            "SG ({}) released memory (delta: {}), set system to normal status 
(totalSgMemCost: {}).",
+            storageGroupInfo.getStorageGroupProcessor().getStorageGroupName(),
+            delta,
+            totalStorageGroupMemCost);
       }
-    }
-    if (shouldInvokeFlush && needForceAsyncFlush) {
-      forceAsyncFlush();
+      logCurrentTotalSGMemory();
+      rejected = false;
+    } else if (totalStorageGroupMemCost >= REJECT_THERSHOLD) {
+      logger.warn(
+          "SG ({}) released memory (delta: {}), but system is still in reject 
status (totalSgMemCost: {}).",
+          storageGroupInfo.getStorageGroupProcessor().getStorageGroupName(),
+          delta,
+          totalStorageGroupMemCost);
+      logCurrentTotalSGMemory();
+      rejected = true;
+    } else {
+      logger.debug(
+          "SG ({}) released memory (delta: {}), system is in normal status 
(totalSgMemCost: {}).",
+          storageGroupInfo.getStorageGroupProcessor().getStorageGroupName(),
+          delta,
+          totalStorageGroupMemCost);
+      logCurrentTotalSGMemory();
+      rejected = false;
     }
   }
 
   private void logCurrentTotalSGMemory() {
-    logger.debug("Current Sg cost is {}", totalSgMemCost);
+    logger.debug("Current Sg cost is {}", totalStorageGroupMemCost);
   }
 
-  /**
-   * Order all tsfileProcessors in system by memory cost of actual data points 
in memtable.
-   * Mark the top K TSPs as to be flushed,
-   * so that after flushing the K TSPs, the memory cost should be less than 
FLUSH_THRESHOLD
-   */
-  private void chooseTSPToMarkFlush() {
-    if (FlushManager.getInstance().getNumberOfWorkingTasks() > 0) {
-      return;
-    }
-    // If invoke flush by replaying logs, do not flush now!
-    if (reportedSgMemCostMap.size() == 0) {
-      return;
-    }
-    // get the tsFile processors which has the max work MemTable size
-    List<TsFileProcessor> processors = getTsFileProcessorsToFlush();
-    for (TsFileProcessor processor : processors) {
-      if (processor != null) {
-        processor.setFlush();
-      }
-    }
+  public synchronized void addFlushingMemTableCost(long flushingMemTableCost) {
+    this.flushingMemTablesCost += flushingMemTableCost;
+  }
+
+  public synchronized void resetFlushingMemTableCost(long 
flushingMemTableCost) {
+    this.flushingMemTablesCost -= flushingMemTableCost;
   }
 
   /**
-   * Be Careful!! This method can only be called by flush thread!
+   * Order all tsfileProcessors in system by memory cost of actual data points 
in memtable. Mark the
+   * top K TSPs as to be flushed, so that after flushing the K TSPs, the 
memory cost should be less
+   * than FLUSH_THRESHOLD
    */
-  private void forceAsyncFlush() {
-    if (FlushManager.getInstance().getNumberOfWorkingTasks() > 1) {
-      return;
-    }
-    List<TsFileProcessor> processors = getTsFileProcessorsToFlush();
-    if (logger.isDebugEnabled()) {
-      logger.debug("[mem control] get {} tsp to flush", processors.size());
-    }
-    for (TsFileProcessor processor : processors) {
-      if (processor != null) {
-        processor.startAsyncFlush();
-      }
+  private boolean chooseMemTablesToMarkFlush(TsFileProcessor 
currentTsFileProcessor) {
+    if (reportedStorageGroupMemCostMap.size() == 0) {
+      return false;
     }
-  }
-
-  private List<TsFileProcessor> getTsFileProcessorsToFlush() {
-    PriorityQueue<TsFileProcessor> tsps = new PriorityQueue<>(
-        (o1, o2) -> Long.compare(o2.getWorkMemTableRamCost(), 
o1.getWorkMemTableRamCost()));
-    for (StorageGroupInfo sgInfo : reportedSgMemCostMap.keySet()) {
-      tsps.addAll(sgInfo.getAllReportedTsp());
+    PriorityQueue<TsFileProcessor> allTsFileProcessors =
+        new PriorityQueue<>(
+            (o1, o2) -> Long.compare(o2.getWorkMemTableRamCost(), 
o1.getWorkMemTableRamCost()));
+    for (StorageGroupInfo storageGroupInfo : 
reportedStorageGroupMemCostMap.keySet()) {
+      allTsFileProcessors.addAll(storageGroupInfo.getAllReportedTsp());
     }
-    List<TsFileProcessor> processors = new ArrayList<>();
+    boolean isCurrentTsFileProcessorSelected = false;
     long memCost = 0;
-    while (totalSgMemCost - memCost > FLUSH_THERSHOLD / 2) {
-      if (tsps.isEmpty() || tsps.peek().getWorkMemTableRamCost() == 0) {
-        return processors;
+    long activeMemSize = totalStorageGroupMemCost - flushingMemTablesCost;
+    while (activeMemSize - memCost > FLUSH_THERSHOLD) {
+      if (allTsFileProcessors.isEmpty()
+          || allTsFileProcessors.peek().getWorkMemTableRamCost() == 0) {
+        return isCurrentTsFileProcessorSelected;
+      }
+      TsFileProcessor selectedTsFileProcessor = allTsFileProcessors.peek();
+      memCost += selectedTsFileProcessor.getWorkMemTableRamCost();
+      selectedTsFileProcessor.setWorkMemTableShouldFlush();
+      // Open a new thread for submit a flush task
+      flushTaskSubmitThreadPool.submit(
+          () -> {
+            selectedTsFileProcessor.submitAFlushTask();
+          });
+      if (selectedTsFileProcessor == currentTsFileProcessor) {
+        isCurrentTsFileProcessorSelected = true;
       }
-      processors.add(tsps.peek());
-      memCost += tsps.peek().getWorkMemTableRamCost();
-      tsps.poll();
+      allTsFileProcessors.poll();
     }
-    return processors;
+    return isCurrentTsFileProcessorSelected;
   }
 
   public boolean isRejected() {
@@ -186,8 +209,8 @@ public class SystemInfo {
   }
 
   public void close() {
-    reportedSgMemCostMap.clear();
-    totalSgMemCost = 0;
+    reportedStorageGroupMemCostMap.clear();
+    totalStorageGroupMemCost = 0;
     rejected = false;
   }
 
diff --git 
a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorTest.java
 
b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorTest.java
index 35d9876..4eac99e 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorTest.java
@@ -92,9 +92,7 @@ public class TsFileProcessorTest {
     TsFileProcessorInfo tsFileProcessorInfo = new TsFileProcessorInfo(sgInfo);
     processor.setTsFileProcessorInfo(tsFileProcessorInfo);
     this.sgInfo.initTsFileProcessorInfo(processor);
-    tsFileProcessorInfo.addTSPMemCost(processor
-        .getTsFileResource().calculateRamSize());
-    SystemInfo.getInstance().reportStorageGroupStatus(sgInfo);
+    SystemInfo.getInstance().reportStorageGroupStatus(sgInfo, processor);
     List<TsFileResource> tsfileResourcesForQuery = new ArrayList<>();
     processor.query(deviceId, measurementId, dataType, encoding, props, 
context,
         tsfileResourcesForQuery);
@@ -148,9 +146,7 @@ public class TsFileProcessorTest {
     TsFileProcessorInfo tsFileProcessorInfo = new TsFileProcessorInfo(sgInfo);
     processor.setTsFileProcessorInfo(tsFileProcessorInfo);
     this.sgInfo.initTsFileProcessorInfo(processor);
-    tsFileProcessorInfo.addTSPMemCost(processor
-        .getTsFileResource().calculateRamSize());
-    SystemInfo.getInstance().reportStorageGroupStatus(sgInfo);
+    SystemInfo.getInstance().reportStorageGroupStatus(sgInfo, processor);
     List<TsFileResource> tsfileResourcesForQuery = new ArrayList<>();
     processor.query(deviceId, measurementId, dataType, encoding, props, 
context,
         tsfileResourcesForQuery);
@@ -230,9 +226,7 @@ public class TsFileProcessorTest {
     TsFileProcessorInfo tsFileProcessorInfo = new TsFileProcessorInfo(sgInfo);
     processor.setTsFileProcessorInfo(tsFileProcessorInfo);
     this.sgInfo.initTsFileProcessorInfo(processor);
-    tsFileProcessorInfo.addTSPMemCost(processor
-        .getTsFileResource().calculateRamSize());
-    SystemInfo.getInstance().reportStorageGroupStatus(sgInfo);
+    SystemInfo.getInstance().reportStorageGroupStatus(sgInfo, processor);
     List<TsFileResource> tsfileResourcesForQuery = new ArrayList<>();
     processor.query(deviceId, measurementId, dataType, encoding, props, 
context,
         tsfileResourcesForQuery);
@@ -271,9 +265,7 @@ public class TsFileProcessorTest {
     TsFileProcessorInfo tsFileProcessorInfo = new TsFileProcessorInfo(sgInfo);
     processor.setTsFileProcessorInfo(tsFileProcessorInfo);
     this.sgInfo.initTsFileProcessorInfo(processor);
-    tsFileProcessorInfo.addTSPMemCost(processor
-        .getTsFileResource().calculateRamSize());
-    SystemInfo.getInstance().reportStorageGroupStatus(sgInfo);
+    SystemInfo.getInstance().reportStorageGroupStatus(sgInfo, processor);
     List<TsFileResource> tsfileResourcesForQuery = new ArrayList<>();
 
     processor.query(deviceId, measurementId, dataType, encoding, props, 
context,

Reply via email to