This is an automated email from the ASF dual-hosted git repository. haonan pushed a commit to branch NewMemControl in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 036a637e6232c014439033c99b67f2ce9ea36493 Author: HTHou <[email protected]> AuthorDate: Sun Apr 25 17:03:35 2021 +0800 New memory ontrol strategy --- .../db/engine/storagegroup/StorageGroupProcessor.java | 19 +++++++++++++++++++ .../iotdb/db/engine/storagegroup/TsFileProcessor.java | 16 +++++++++++++++- .../java/org/apache/iotdb/db/rescon/SystemInfo.java | 12 +++++++++++- .../db/engine/storagegroup/TsFileProcessorTest.java | 8 ++++---- 4 files changed, 49 insertions(+), 6 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java index 69277b1..f829517 100755 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java @@ -110,6 +110,7 @@ import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -164,6 +165,8 @@ public class StorageGroupProcessor { * partitionLatestFlushedTimeForEachDevice) */ private final ReadWriteLock insertLock = new ReentrantReadWriteLock(); + + private final Condition writeLockConditionForReject = insertLock.writeLock().newCondition(); /** closeStorageGroupCondition is used to wait for all currently closing TsFiles to be done. */ private final Object closeStorageGroupCondition = new Object(); /** @@ -1110,6 +1113,18 @@ public class StorageGroupProcessor { } } + public void submitAFlushTaskWhenShouldFlush(TsFileProcessor tsFileProcessor) { + writeLock(); + try { + // check memtable size and may asyncTryToFlush the work memtable + if (tsFileProcessor.shouldFlush()) { + fileFlushPolicy.apply(this, tsFileProcessor, tsFileProcessor.isSequence()); + } + } finally { + writeUnlock(); + } + } + private TsFileProcessor getOrCreateTsFileProcessor(long timeRangeId, boolean sequence) { TsFileProcessor tsFileProcessor = null; try { @@ -1596,6 +1611,10 @@ public class StorageGroupProcessor { insertLock.writeLock().unlock(); } + public void writeLockConditionAwait() throws InterruptedException { + writeLockConditionForReject.await(config.getCheckPeriodWhenInsertBlocked(), TimeUnit.MILLISECONDS); + } + /** * @param tsFileResources includes sealed and unsealed tsfile resources * @return fill unsealed tsfile resources with memory data and ChunkMetadataList of data in disk diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java index 8cc6b50..d967843 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java @@ -459,13 +459,23 @@ public class TsFileProcessor { if (storageGroupInfo.needToReportToSystem()) { try { if (!SystemInfo.getInstance().reportStorageGroupStatus(storageGroupInfo, this)) { - StorageEngine.blockInsertionIfReject(); + long startTime = System.currentTimeMillis(); + while (SystemInfo.getInstance().isRejected()) { + storageGroupInfo.getStorageGroupProcessor().writeLockConditionAwait(); + if (System.currentTimeMillis() - startTime + > config.getMaxWaitingTimeWhenInsertBlocked()) { + throw new WriteProcessRejectException( + "System rejected over " + config.getMaxWaitingTimeWhenInsertBlocked() + "ms"); + } + } } } catch (WriteProcessRejectException e) { storageGroupInfo.releaseStorageGroupMemCost(memTableIncrement); tsFileProcessorInfo.releaseTSPMemCost(chunkMetadataIncrement); SystemInfo.getInstance().resetStorageGroupStatus(storageGroupInfo); throw e; + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); } } workMemTable.addTVListRamCost(memTableIncrement); @@ -1284,4 +1294,8 @@ public class TsFileProcessor { public void addCloseFileListeners(Collection<CloseFileListener> listeners) { closeFileListeners.addAll(listeners); } + + public void submitAFlushTask() { + this.storageGroupInfo.getStorageGroupProcessor().submitAFlushTaskWhenShouldFlush(this); + } } diff --git a/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java b/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java index e9b1312..3ad7fba 100644 --- a/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java +++ b/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java @@ -31,6 +31,9 @@ import org.slf4j.LoggerFactory; import java.util.HashMap; import java.util.Map; import java.util.PriorityQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicInteger; public class SystemInfo { @@ -44,6 +47,10 @@ public class SystemInfo { private Map<StorageGroupInfo, Long> reportedStorageGroupMemCostMap = new HashMap<>(); private long flushingMemTablesCost = 0L; + private AtomicInteger threadCnt = new AtomicInteger(); + private ExecutorService flushTaskSubmitThreadPool = + Executors.newFixedThreadPool( + config.getConcurrentFlushThread(), r -> new Thread(r, "FlushTaskSubmitThread-" + threadCnt.getAndIncrement())); private static double FLUSH_THERSHOLD = memorySizeForWrite * config.getFlushProportion(); private static double REJECT_THERSHOLD = memorySizeForWrite * config.getRejectProportion(); @@ -89,7 +96,7 @@ public class SystemInfo { if (totalStorageGroupMemCost < memorySizeForWrite) { return true; } else { - throw new WriteProcessRejectException("Total Storage Group MemCost "+ totalStorageGroupMemCost +" is over than memorySizeForWrite"); + throw new WriteProcessRejectException("Total Storage Group MemCost "+ totalStorageGroupMemCost +" is over than memorySizeForWriting " + memorySizeForWrite); } } else { return false; @@ -188,6 +195,9 @@ public class SystemInfo { if (selectedTsFileProcessor == currentTsFileProcessor) { isCurrentTsFileProcessorSelected = true; } + flushTaskSubmitThreadPool.submit(() -> { + selectedTsFileProcessor.submitAFlushTask(); + }); allTsFileProcessors.poll(); } return isCurrentTsFileProcessorSelected; diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorTest.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorTest.java index b3d20c6..ad96383 100644 --- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorTest.java +++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorTest.java @@ -102,7 +102,7 @@ public class TsFileProcessorTest { TsFileProcessorInfo tsFileProcessorInfo = new TsFileProcessorInfo(sgInfo); processor.setTsFileProcessorInfo(tsFileProcessorInfo); this.sgInfo.initTsFileProcessorInfo(processor); - SystemInfo.getInstance().reportStorageGroupStatus(sgInfo); + SystemInfo.getInstance().reportStorageGroupStatus(sgInfo, processor); List<TsFileResource> tsfileResourcesForQuery = new ArrayList<>(); processor.query( deviceId, @@ -178,7 +178,7 @@ public class TsFileProcessorTest { TsFileProcessorInfo tsFileProcessorInfo = new TsFileProcessorInfo(sgInfo); processor.setTsFileProcessorInfo(tsFileProcessorInfo); this.sgInfo.initTsFileProcessorInfo(processor); - SystemInfo.getInstance().reportStorageGroupStatus(sgInfo); + SystemInfo.getInstance().reportStorageGroupStatus(sgInfo, processor); List<TsFileResource> tsfileResourcesForQuery = new ArrayList<>(); processor.query( deviceId, @@ -280,7 +280,7 @@ public class TsFileProcessorTest { TsFileProcessorInfo tsFileProcessorInfo = new TsFileProcessorInfo(sgInfo); processor.setTsFileProcessorInfo(tsFileProcessorInfo); this.sgInfo.initTsFileProcessorInfo(processor); - SystemInfo.getInstance().reportStorageGroupStatus(sgInfo); + SystemInfo.getInstance().reportStorageGroupStatus(sgInfo, processor); List<TsFileResource> tsfileResourcesForQuery = new ArrayList<>(); processor.query( deviceId, @@ -336,7 +336,7 @@ public class TsFileProcessorTest { TsFileProcessorInfo tsFileProcessorInfo = new TsFileProcessorInfo(sgInfo); processor.setTsFileProcessorInfo(tsFileProcessorInfo); this.sgInfo.initTsFileProcessorInfo(processor); - SystemInfo.getInstance().reportStorageGroupStatus(sgInfo); + SystemInfo.getInstance().reportStorageGroupStatus(sgInfo, processor); List<TsFileResource> tsfileResourcesForQuery = new ArrayList<>(); processor.query(
