This is an automated email from the ASF dual-hosted git repository.
marklau99 pushed a commit to branch IOTDB-3164
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/IOTDB-3164 by this push:
new 8d3637e622 add param write memory frame size
8d3637e622 is described below
commit 8d3637e622670bbf75eef0947e94fe88093fedd0
Author: Liu Xuxin <[email protected]>
AuthorDate: Thu Jul 21 19:29:34 2022 +0800
add param write memory frame size
---
.../resources/conf/iotdb-datanode.properties | 4 +++
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 11 ++++++
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 5 +++
.../db/engine/storagegroup/TsFileProcessor.java | 3 +-
.../iotdb/db/rescon/memory/MemoryController.java | 4 ++-
.../db/rescon/memory/WriteMemoryController.java | 41 ++++++++++------------
6 files changed, 44 insertions(+), 24 deletions(-)
diff --git a/server/src/assembly/resources/conf/iotdb-datanode.properties
b/server/src/assembly/resources/conf/iotdb-datanode.properties
index 5f7eb7c0a1..3f4b8a0226 100644
--- a/server/src/assembly/resources/conf/iotdb-datanode.properties
+++ b/server/src/assembly/resources/conf/iotdb-datanode.properties
@@ -433,6 +433,10 @@ timestamp_precision=ms
# Datatype: double
# flush_proportion=0.4
+# Frame size of write memory allocation, the unit is KB
+# Datatype: int
+# write_memory_frame_size=8196
+
# Ratio of read memory allocated for timeIndex, 0.2 by default
# Datatype: double
# time_index_memory_proportion=0.2
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index e0a2906d68..4cd5f208bd 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -143,6 +143,9 @@ public class IoTDBConfig {
/** Reject proportion for system */
private double rejectProportion = 0.8;
+ /** Frame size in write memory allocation, the unit is KB */
+ private int writeMemoryFrameSize = 8196;
+
/** If storage group increased more than this threshold, report to system.
Unit: byte */
private long storageGroupSizeReportThreshold = 16 * 1024 * 1024L;
@@ -2987,4 +2990,12 @@ public class IoTDBConfig {
public void setAllocateMemoryForLastCache(long allocateMemoryForLastCache) {
this.allocateMemoryForLastCache = allocateMemoryForLastCache;
}
+
+ public int getWriteMemoryFrameSize() {
+ return writeMemoryFrameSize;
+ }
+
+ public void setWriteMemoryFrameSize(int writeMemoryFrameSize) {
+ this.writeMemoryFrameSize = writeMemoryFrameSize;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 8668ab986a..8354493564 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -218,6 +218,11 @@ public class IoTDBDescriptor {
properties.getProperty(
"reject_proportion",
Double.toString(conf.getRejectProportion()))));
+ conf.setWriteMemoryFrameSize(
+ Integer.parseInt(
+ properties.getProperty(
+ "write_memory_frame_size",
Integer.toString(conf.getWriteMemoryFrameSize()))));
+
conf.setStorageGroupSizeReportThreshold(
Long.parseLong(
properties.getProperty(
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index 1f041b8285..b161341f97 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -955,11 +955,12 @@ public class TsFileProcessor {
if (logger.isInfoEnabled()) {
if (workMemTable != null) {
logger.info(
- "{}: flush a working memtable in async close tsfile {}, memtable
size: {}, tsfile "
+ "{}: flush a working memtable in async close tsfile {}, memtable
size: {}, allocated size: {}, tsfile "
+ "size: {}, plan index: [{}, {}]",
storageGroupName,
tsFileResource.getTsFile().getAbsolutePath(),
workMemTable.memSize(),
+ workMemTable.getAllocatedMemSize(),
tsFileResource.getTsFileSize(),
workMemTable.getMinPlanIndex(),
workMemTable.getMaxPlanIndex());
diff --git
a/server/src/main/java/org/apache/iotdb/db/rescon/memory/MemoryController.java
b/server/src/main/java/org/apache/iotdb/db/rescon/memory/MemoryController.java
index 71e7b4d96f..894a4234a8 100644
---
a/server/src/main/java/org/apache/iotdb/db/rescon/memory/MemoryController.java
+++
b/server/src/main/java/org/apache/iotdb/db/rescon/memory/MemoryController.java
@@ -137,16 +137,18 @@ public class MemoryController<T> {
}
}
- protected void checkTrigger(long newUsage, T triggerParam) {
+ protected boolean checkTrigger(long newUsage, T triggerParam) {
if (newUsage >= triggerThreshold && trigger != null) {
if (triggerRunning.compareAndSet(false, true)) {
try {
trigger.run(triggerParam);
+ return true;
} finally {
triggerRunning.set(false);
}
}
}
+ return false;
}
public long getCurrentMemoryUsage() {
diff --git
a/server/src/main/java/org/apache/iotdb/db/rescon/memory/WriteMemoryController.java
b/server/src/main/java/org/apache/iotdb/db/rescon/memory/WriteMemoryController.java
index bb4bb32c0c..a0cc893781 100644
---
a/server/src/main/java/org/apache/iotdb/db/rescon/memory/WriteMemoryController.java
+++
b/server/src/main/java/org/apache/iotdb/db/rescon/memory/WriteMemoryController.java
@@ -33,7 +33,7 @@ import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicLong;
-public class WriteMemoryController extends MemoryController<TsFileProcessor> {
+public class WriteMemoryController extends
MemoryController<WriteMemoryController.TriggerParam> {
private static final Logger logger =
LoggerFactory.getLogger(WriteMemoryController.class);
private static volatile WriteMemoryController INSTANCE;
private static final IoTDBConfig config =
IoTDBDescriptor.getInstance().getConfig();
@@ -46,7 +46,7 @@ public class WriteMemoryController extends
MemoryController<TsFileProcessor> {
private Set<StorageGroupInfo> infoSet = new CopyOnWriteArraySet<>();
private ExecutorService flushTaskSubmitThreadPool =
IoTDBThreadPoolFactory.newFixedThreadPool(1, "FlushTask-Submit-Pool");
- public static final long FRAME_SIZE = 16L * 1024L * 1024L;
+ public static final long FRAME_SIZE = config.getWriteMemoryFrameSize() *
1024L;
public WriteMemoryController(long limitSize) {
super(limitSize);
@@ -55,10 +55,8 @@ public class WriteMemoryController extends
MemoryController<TsFileProcessor> {
}
public boolean tryAllocateMemory(long size, StorageGroupInfo info,
TsFileProcessor processor) {
- boolean success = super.tryAllocateMemory(size, processor);
- if (!success) {
- checkTrigger(memoryUsage.get(), processor);
- }
+ TriggerParam param = new TriggerParam(processor);
+ boolean success = super.tryAllocateMemory(size, param);
if (memoryUsage.get() > REJECT_THRESHOLD && !rejected) {
logger.info(
"Change system to reject status. Triggered by: logical SG ({}), mem
cost delta ({}), totalSgMemCost ({}).",
@@ -69,10 +67,6 @@ public class WriteMemoryController extends
MemoryController<TsFileProcessor> {
}
if (!info.isRecorded()) {
info.setRecorded(true);
- logger.error(
- "Record {}-{}",
- info.getDataRegion().getLogicalStorageGroupName(),
- info.getDataRegion().getDataRegionId());
infoSet.add(info);
}
return success;
@@ -96,12 +90,6 @@ public class WriteMemoryController extends
MemoryController<TsFileProcessor> {
public void releaseFlushingMemory(StorageGroupInfo info, long size) {
this.flushingMemory.addAndGet(-size);
this.releaseMemory(size);
- logger.error(
- "Release {} size of {}-{}, remaining size is {}",
- ((double) size) / 1024.0d / 1024.0d,
- info.getDataRegion().getLogicalStorageGroupName(),
- info.getDataRegion().getDataRegionId(),
- ((double) (memorySizeForWrite - memoryUsage.get())) / 1024.0d /
1024.0d);
}
public void releaseMemory(long size) {
@@ -116,9 +104,6 @@ public class WriteMemoryController extends
MemoryController<TsFileProcessor> {
}
public boolean checkRejected() {
- if (rejected) {
- checkTrigger(memoryUsage.get(), null);
- }
return rejected;
}
@@ -147,7 +132,7 @@ public class WriteMemoryController extends
MemoryController<TsFileProcessor> {
END_FLUSH_THRESHOLD = 0.5 * FLUSH_THRESHOLD;
}
- protected void chooseMemtableToFlush(TsFileProcessor currentTsFileProcessor)
{
+ protected void chooseMemtableToFlush(TriggerParam triggerParam) {
// If invoke flush by replaying logs, do not flush now!
if (infoSet.size() == 0) {
return;
@@ -162,10 +147,9 @@ public class WriteMemoryController extends
MemoryController<TsFileProcessor> {
}
long selectedCount = 0;
long activeMemory = memoryUsage.get() - flushingMemory.get();
- while (activeMemory - memCost > END_FLUSH_THRESHOLD) {
+ while (activeMemory - memCost > FLUSH_THRESHOLD) {
if (allTsFileProcessors.isEmpty()
|| allTsFileProcessors.peek().getWorkMemTableRamCost() == 0) {
- logger.error("No memtable to flush");
return;
}
TsFileProcessor selectedTsFileProcessor = allTsFileProcessors.peek();
@@ -177,6 +161,9 @@ public class WriteMemoryController extends
MemoryController<TsFileProcessor> {
allTsFileProcessors.poll();
continue;
}
+ if (selectedTsFileProcessor == triggerParam.processor) {
+ triggerParam.currentProcessorSelected = true;
+ }
long memUsageForThisMemTable =
selectedTsFileProcessor.getWorkMemTableAllocateSize();
memCost += memUsageForThisMemTable;
selectedTsFileProcessor.setWorkMemTableShouldFlush();
@@ -191,4 +178,14 @@ public class WriteMemoryController extends
MemoryController<TsFileProcessor> {
((double) flushingMemory.get()) / 1024.0d / 1024.0d,
((double) (memoryUsage.get() - flushingMemory.get())) / 1024.0d /
1024.0d);
}
+
+ static class TriggerParam {
+ TsFileProcessor processor;
+ public volatile boolean triggerRan = false;
+ public volatile boolean currentProcessorSelected = false;
+
+ public TriggerParam(TsFileProcessor processor) {
+ this.processor = processor;
+ }
+ }
}