This is an automated email from the ASF dual-hosted git repository.

marklau99 pushed a commit to branch IOTDB-3164
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/IOTDB-3164 by this push:
     new 8d3637e622 add param write memory frame size
8d3637e622 is described below

commit 8d3637e622670bbf75eef0947e94fe88093fedd0
Author: Liu Xuxin <[email protected]>
AuthorDate: Thu Jul 21 19:29:34 2022 +0800

    add param write memory frame size
---
 .../resources/conf/iotdb-datanode.properties       |  4 +++
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 11 ++++++
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |  5 +++
 .../db/engine/storagegroup/TsFileProcessor.java    |  3 +-
 .../iotdb/db/rescon/memory/MemoryController.java   |  4 ++-
 .../db/rescon/memory/WriteMemoryController.java    | 41 ++++++++++------------
 6 files changed, 44 insertions(+), 24 deletions(-)

diff --git a/server/src/assembly/resources/conf/iotdb-datanode.properties 
b/server/src/assembly/resources/conf/iotdb-datanode.properties
index 5f7eb7c0a1..3f4b8a0226 100644
--- a/server/src/assembly/resources/conf/iotdb-datanode.properties
+++ b/server/src/assembly/resources/conf/iotdb-datanode.properties
@@ -433,6 +433,10 @@ timestamp_precision=ms
 # Datatype: double
 # flush_proportion=0.4
 
+# Frame size of write memory allocation, the unit is KB
+# Datatype: int
+# write_memory_frame_size=8196
+
 # Ratio of read memory allocated for timeIndex, 0.2 by default
 # Datatype: double
 # time_index_memory_proportion=0.2
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java 
b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index e0a2906d68..4cd5f208bd 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -143,6 +143,9 @@ public class IoTDBConfig {
   /** Reject proportion for system */
   private double rejectProportion = 0.8;
 
+  /** Frame size in write memory allocation, the unit is KB */
+  private int writeMemoryFrameSize = 8196;
+
   /** If storage group increased more than this threshold, report to system. 
Unit: byte */
   private long storageGroupSizeReportThreshold = 16 * 1024 * 1024L;
 
@@ -2987,4 +2990,12 @@ public class IoTDBConfig {
   public void setAllocateMemoryForLastCache(long allocateMemoryForLastCache) {
     this.allocateMemoryForLastCache = allocateMemoryForLastCache;
   }
+
+  public int getWriteMemoryFrameSize() {
+    return writeMemoryFrameSize;
+  }
+
+  public void setWriteMemoryFrameSize(int writeMemoryFrameSize) {
+    this.writeMemoryFrameSize = writeMemoryFrameSize;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java 
b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 8668ab986a..8354493564 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -218,6 +218,11 @@ public class IoTDBDescriptor {
               properties.getProperty(
                   "reject_proportion", 
Double.toString(conf.getRejectProportion()))));
 
+      conf.setWriteMemoryFrameSize(
+          Integer.parseInt(
+              properties.getProperty(
+                  "write_memory_frame_size", 
Integer.toString(conf.getWriteMemoryFrameSize()))));
+
       conf.setStorageGroupSizeReportThreshold(
           Long.parseLong(
               properties.getProperty(
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index 1f041b8285..b161341f97 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -955,11 +955,12 @@ public class TsFileProcessor {
       if (logger.isInfoEnabled()) {
         if (workMemTable != null) {
           logger.info(
-              "{}: flush a working memtable in async close tsfile {}, memtable 
size: {}, tsfile "
+              "{}: flush a working memtable in async close tsfile {}, memtable 
size: {}, allocated size: {}, tsfile "
                   + "size: {}, plan index: [{}, {}]",
               storageGroupName,
               tsFileResource.getTsFile().getAbsolutePath(),
               workMemTable.memSize(),
+              workMemTable.getAllocatedMemSize(),
               tsFileResource.getTsFileSize(),
               workMemTable.getMinPlanIndex(),
               workMemTable.getMaxPlanIndex());
diff --git 
a/server/src/main/java/org/apache/iotdb/db/rescon/memory/MemoryController.java 
b/server/src/main/java/org/apache/iotdb/db/rescon/memory/MemoryController.java
index 71e7b4d96f..894a4234a8 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/rescon/memory/MemoryController.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/rescon/memory/MemoryController.java
@@ -137,16 +137,18 @@ public class MemoryController<T> {
     }
   }
 
-  protected void checkTrigger(long newUsage, T triggerParam) {
+  protected boolean checkTrigger(long newUsage, T triggerParam) {
     if (newUsage >= triggerThreshold && trigger != null) {
       if (triggerRunning.compareAndSet(false, true)) {
         try {
           trigger.run(triggerParam);
+          return true;
         } finally {
           triggerRunning.set(false);
         }
       }
     }
+    return false;
   }
 
   public long getCurrentMemoryUsage() {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/rescon/memory/WriteMemoryController.java
 
b/server/src/main/java/org/apache/iotdb/db/rescon/memory/WriteMemoryController.java
index bb4bb32c0c..a0cc893781 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/rescon/memory/WriteMemoryController.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/rescon/memory/WriteMemoryController.java
@@ -33,7 +33,7 @@ import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicLong;
 
-public class WriteMemoryController extends MemoryController<TsFileProcessor> {
+public class WriteMemoryController extends 
MemoryController<WriteMemoryController.TriggerParam> {
   private static final Logger logger = 
LoggerFactory.getLogger(WriteMemoryController.class);
   private static volatile WriteMemoryController INSTANCE;
   private static final IoTDBConfig config = 
IoTDBDescriptor.getInstance().getConfig();
@@ -46,7 +46,7 @@ public class WriteMemoryController extends 
MemoryController<TsFileProcessor> {
   private Set<StorageGroupInfo> infoSet = new CopyOnWriteArraySet<>();
   private ExecutorService flushTaskSubmitThreadPool =
       IoTDBThreadPoolFactory.newFixedThreadPool(1, "FlushTask-Submit-Pool");
-  public static final long FRAME_SIZE = 16L * 1024L * 1024L;
+  public static final long FRAME_SIZE = config.getWriteMemoryFrameSize() * 
1024L;
 
   public WriteMemoryController(long limitSize) {
     super(limitSize);
@@ -55,10 +55,8 @@ public class WriteMemoryController extends 
MemoryController<TsFileProcessor> {
   }
 
   public boolean tryAllocateMemory(long size, StorageGroupInfo info, 
TsFileProcessor processor) {
-    boolean success = super.tryAllocateMemory(size, processor);
-    if (!success) {
-      checkTrigger(memoryUsage.get(), processor);
-    }
+    TriggerParam param = new TriggerParam(processor);
+    boolean success = super.tryAllocateMemory(size, param);
     if (memoryUsage.get() > REJECT_THRESHOLD && !rejected) {
       logger.info(
           "Change system to reject status. Triggered by: logical SG ({}), mem 
cost delta ({}), totalSgMemCost ({}).",
@@ -69,10 +67,6 @@ public class WriteMemoryController extends 
MemoryController<TsFileProcessor> {
     }
     if (!info.isRecorded()) {
       info.setRecorded(true);
-      logger.error(
-          "Record {}-{}",
-          info.getDataRegion().getLogicalStorageGroupName(),
-          info.getDataRegion().getDataRegionId());
       infoSet.add(info);
     }
     return success;
@@ -96,12 +90,6 @@ public class WriteMemoryController extends 
MemoryController<TsFileProcessor> {
   public void releaseFlushingMemory(StorageGroupInfo info, long size) {
     this.flushingMemory.addAndGet(-size);
     this.releaseMemory(size);
-    logger.error(
-        "Release {} size of {}-{}, remaining size is {}",
-        ((double) size) / 1024.0d / 1024.0d,
-        info.getDataRegion().getLogicalStorageGroupName(),
-        info.getDataRegion().getDataRegionId(),
-        ((double) (memorySizeForWrite - memoryUsage.get())) / 1024.0d / 
1024.0d);
   }
 
   public void releaseMemory(long size) {
@@ -116,9 +104,6 @@ public class WriteMemoryController extends 
MemoryController<TsFileProcessor> {
   }
 
   public boolean checkRejected() {
-    if (rejected) {
-      checkTrigger(memoryUsage.get(), null);
-    }
     return rejected;
   }
 
@@ -147,7 +132,7 @@ public class WriteMemoryController extends 
MemoryController<TsFileProcessor> {
     END_FLUSH_THRESHOLD = 0.5 * FLUSH_THRESHOLD;
   }
 
-  protected void chooseMemtableToFlush(TsFileProcessor currentTsFileProcessor) 
{
+  protected void chooseMemtableToFlush(TriggerParam triggerParam) {
     // If invoke flush by replaying logs, do not flush now!
     if (infoSet.size() == 0) {
       return;
@@ -162,10 +147,9 @@ public class WriteMemoryController extends 
MemoryController<TsFileProcessor> {
     }
     long selectedCount = 0;
     long activeMemory = memoryUsage.get() - flushingMemory.get();
-    while (activeMemory - memCost > END_FLUSH_THRESHOLD) {
+    while (activeMemory - memCost > FLUSH_THRESHOLD) {
       if (allTsFileProcessors.isEmpty()
           || allTsFileProcessors.peek().getWorkMemTableRamCost() == 0) {
-        logger.error("No memtable to flush");
         return;
       }
       TsFileProcessor selectedTsFileProcessor = allTsFileProcessors.peek();
@@ -177,6 +161,9 @@ public class WriteMemoryController extends 
MemoryController<TsFileProcessor> {
         allTsFileProcessors.poll();
         continue;
       }
+      if (selectedTsFileProcessor == triggerParam.processor) {
+        triggerParam.currentProcessorSelected = true;
+      }
       long memUsageForThisMemTable = 
selectedTsFileProcessor.getWorkMemTableAllocateSize();
       memCost += memUsageForThisMemTable;
       selectedTsFileProcessor.setWorkMemTableShouldFlush();
@@ -191,4 +178,14 @@ public class WriteMemoryController extends 
MemoryController<TsFileProcessor> {
         ((double) flushingMemory.get()) / 1024.0d / 1024.0d,
         ((double) (memoryUsage.get() - flushingMemory.get())) / 1024.0d / 
1024.0d);
   }
+
+  static class TriggerParam {
+    TsFileProcessor processor;
+    public volatile boolean triggerRan = false;
+    public volatile boolean currentProcessorSelected = false;
+
+    public TriggerParam(TsFileProcessor processor) {
+      this.processor = processor;
+    }
+  }
 }

Reply via email to