This is an automated email from the ASF dual-hosted git repository.
marklau99 pushed a commit to branch IOTDB-3164
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/IOTDB-3164 by this push:
new 2ab1dddf96 finish for test
2ab1dddf96 is described below
commit 2ab1dddf9637244a58a7b00b7712848bcebd4e45
Author: Liu Xuxin <[email protected]>
AuthorDate: Thu Jul 14 17:44:33 2022 +0800
finish for test
---
.../db/engine/storagegroup/StorageGroupInfo.java | 12 ++++-
.../db/engine/storagegroup/TsFileProcessor.java | 10 ++--
.../iotdb/db/rescon/memory/MemoryController.java | 7 ++-
.../db/rescon/memory/WriteMemoryController.java | 59 ++++++++++++----------
4 files changed, 47 insertions(+), 41 deletions(-)
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupInfo.java
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupInfo.java
index 33b310c5d6..e3f3769dfa 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupInfo.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupInfo.java
@@ -19,7 +19,6 @@
package org.apache.iotdb.db.engine.storagegroup;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.rescon.memory.WriteMemoryController;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
@@ -45,6 +44,8 @@ public class StorageGroupInfo {
/** A set of all unclosed TsFileProcessors in this SG */
private List<TsFileProcessor> reportedTsps = new CopyOnWriteArrayList<>();
+ private volatile boolean recorded = false;
+
public StorageGroupInfo(DataRegion dataRegion) {
this.dataRegion = dataRegion;
memoryCost = new AtomicLong();
@@ -87,6 +88,13 @@ public class StorageGroupInfo {
*/
public void closeTsFileProcessorAndReportToSystem(TsFileProcessor
tsFileProcessor) {
reportedTsps.remove(tsFileProcessor);
- WriteMemoryController.getInstance().resetStorageGroupInfo(this);
+ }
+
+ public boolean isRecorded() {
+ return recorded;
+ }
+
+ public void setRecorded(boolean recorded) {
+ this.recorded = recorded;
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index ce96d157bb..adaee20f82 100644
---
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -809,7 +809,6 @@ public class TsFileProcessor {
storageGroupInfo.releaseStorageGroupMemCost(memTableIncrement);
tsFileProcessorInfo.releaseTSPMemCost(chunkMetadataIncrement);
WriteMemoryController.getInstance().releaseMemory(memTableIncrement);
-
WriteMemoryController.getInstance().resetStorageGroupInfo(storageGroupInfo);
workMemTable.releaseTVListRamCost(memTableIncrement);
workMemTable.releaseTextDataSize(textDataIncrement);
}
@@ -1167,12 +1166,9 @@ public class TsFileProcessor {
flushingMemTables.size());
}
// report to System
-
WriteMemoryController.getInstance().resetStorageGroupInfo(storageGroupInfo);
- logger.error(
- "Memory usage for {} is {}",
- storageGroupName,
-
WriteMemoryController.getInstance().getMemoryUsageForSg(storageGroupName));
-
WriteMemoryController.getInstance().releaseMemory(memTable.getTVListsRamCost());
+ WriteMemoryController.getInstance()
+ .releaseFlushingMemory(
+ memTable.getTVListsRamCost(), storageGroupName,
memTable.getMemTableId());
logger.error("Release size {} for {}", memTable.getTVListsRamCost(),
storageGroupName);
}
if (logger.isDebugEnabled()) {
diff --git
a/server/src/main/java/org/apache/iotdb/db/rescon/memory/MemoryController.java
b/server/src/main/java/org/apache/iotdb/db/rescon/memory/MemoryController.java
index 9e39fad30e..80fe1063a7 100644
---
a/server/src/main/java/org/apache/iotdb/db/rescon/memory/MemoryController.java
+++
b/server/src/main/java/org/apache/iotdb/db/rescon/memory/MemoryController.java
@@ -70,7 +70,7 @@ public class MemoryController<T> {
}
if (memoryUsage.compareAndSet(current, newUsage)) {
- checkTrigger(current, newUsage, triggerParam);
+ checkTrigger(newUsage, triggerParam);
return true;
}
}
@@ -137,10 +137,9 @@ public class MemoryController<T> {
}
}
- private void checkTrigger(long prevUsage, long newUsage, T triggerParam) {
- if (newUsage >= triggerThreshold && prevUsage < triggerThreshold &&
trigger != null) {
+ private void checkTrigger(long usage, T triggerParam) {
+ if (usage >= triggerThreshold && trigger != null) {
if (triggerRunning.compareAndSet(false, true)) {
- log.info("Start to execute memory controller trigger");
try {
trigger.run(triggerParam);
} finally {
diff --git
a/server/src/main/java/org/apache/iotdb/db/rescon/memory/WriteMemoryController.java
b/server/src/main/java/org/apache/iotdb/db/rescon/memory/WriteMemoryController.java
index d9dcb48c5c..a487ffcc43 100644
---
a/server/src/main/java/org/apache/iotdb/db/rescon/memory/WriteMemoryController.java
+++
b/server/src/main/java/org/apache/iotdb/db/rescon/memory/WriteMemoryController.java
@@ -27,9 +27,9 @@ import
org.apache.iotdb.db.engine.storagegroup.TsFileProcessor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Map;
import java.util.PriorityQueue;
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicLong;
@@ -42,8 +42,7 @@ public class WriteMemoryController extends
MemoryController<TsFileProcessor> {
private static final double REJECT_THRESHOLD = memorySizeForWrite *
config.getRejectProportion();
private volatile boolean rejected = false;
private AtomicLong flushingMemory = new AtomicLong(0);
- private Map<StorageGroupInfo, Long> reportedStorageGroupMemCostMap = new
ConcurrentHashMap<>();
- private Map<String, AtomicLong> memoryUsageForEachSg = new
ConcurrentHashMap<>();
+ private Set<StorageGroupInfo> infoSet = new CopyOnWriteArraySet<>();
private ExecutorService flushTaskSubmitThreadPool =
IoTDBThreadPoolFactory.newSingleThreadExecutor("FlushTask-Submit-Pool");
@@ -63,23 +62,21 @@ public class WriteMemoryController extends
MemoryController<TsFileProcessor> {
memoryUsage.get());
rejected = true;
}
- if (success) {
- reportedStorageGroupMemCostMap.put(info, info.getMemCost());
- memoryUsageForEachSg
- .computeIfAbsent(
- info.getDataRegion().getLogicalStorageGroupName()
- + "-"
- + info.getDataRegion().getDataRegionId(),
- x -> new AtomicLong(0))
- .addAndGet(size);
+ if (!info.isRecorded()) {
+ info.setRecorded(true);
+ infoSet.add(info);
}
return success;
}
- @Override
- public void releaseMemory(long size) {
+ public void releaseFlushingMemory(long size, String storageGroup, long
memTableId) {
+ this.flushingMemory.addAndGet(-size);
+ this.releaseMemory(size, storageGroup, memTableId);
+ }
+
+ public void releaseMemory(long size, String storageGroup, long memTableId) {
super.releaseMemory(size);
- if (memoryUsage.get() < REJECT_THRESHOLD) {
+ if (rejected && memoryUsage.get() < REJECT_THRESHOLD) {
rejected = false;
}
}
@@ -101,17 +98,21 @@ public class WriteMemoryController extends
MemoryController<TsFileProcessor> {
protected void chooseMemtableToFlush(TsFileProcessor currentTsFileProcessor)
{
// If invoke flush by replaying logs, do not flush now!
- if (reportedStorageGroupMemCostMap.size() == 0) {
+ if (infoSet.size() == 0) {
+ return;
+ }
+ long memCost = 0;
+ long activeMemSize = memoryUsage.get() - flushingMemory.get();
+ if (activeMemSize - memCost < FLUSH_THRESHOLD) {
return;
}
PriorityQueue<TsFileProcessor> allTsFileProcessors =
new PriorityQueue<>(
(o1, o2) -> Long.compare(o2.getWorkMemTableRamCost(),
o1.getWorkMemTableRamCost()));
- for (StorageGroupInfo storageGroupInfo :
reportedStorageGroupMemCostMap.keySet()) {
+ for (StorageGroupInfo storageGroupInfo : infoSet) {
allTsFileProcessors.addAll(storageGroupInfo.getAllReportedTsp());
}
- long memCost = 0;
- long activeMemSize = memoryUsage.get() - flushingMemory.get();
+ long selectedCount = 0;
while (activeMemSize - memCost > FLUSH_THRESHOLD) {
if (allTsFileProcessors.isEmpty()
|| allTsFileProcessors.peek().getWorkMemTableRamCost() == 0) {
@@ -121,18 +122,20 @@ public class WriteMemoryController extends
MemoryController<TsFileProcessor> {
if (selectedTsFileProcessor == null) {
break;
}
+ if (selectedTsFileProcessor.shouldFlush()) {
+ continue;
+ }
memCost += selectedTsFileProcessor.getWorkMemTableRamCost();
selectedTsFileProcessor.setWorkMemTableShouldFlush();
+
flushingMemory.addAndGet(selectedTsFileProcessor.getWorkMemTableRamCost());
flushTaskSubmitThreadPool.submit(selectedTsFileProcessor::submitAFlushTask);
+ selectedCount++;
allTsFileProcessors.poll();
}
- }
-
- public void resetStorageGroupInfo(StorageGroupInfo info) {
- reportedStorageGroupMemCostMap.put(info, info.getMemCost());
- }
-
- public long getMemoryUsageForSg(String sgName) {
- return memoryUsageForEachSg.get(sgName).get();
+ logger.info(
+ "Select {} memtable to flush, flushing memory is {}, remaining memory
is {}",
+ selectedCount,
+ flushingMemory.get(),
+ memoryUsage.get() - flushingMemory.get());
}
}