This is an automated email from the ASF dual-hosted git repository. marklau99 pushed a commit to branch IOTDB-3164-Allocate-By-Tablets in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 2ad223946bb48fcad5589234591870e1aeb221a1 Author: Liu Xuxin <liuxu...@outlook.com> AuthorDate: Thu Jul 21 20:36:43 2022 +0800 finish for test --- .../db/engine/storagegroup/TsFileProcessor.java | 25 ++++++++++++++++---- .../iotdb/db/rescon/memory/MemoryController.java | 14 ++++++----- .../db/rescon/memory/WriteMemoryController.java | 27 ++++++++++++++-------- 3 files changed, 45 insertions(+), 21 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java index 4d1e2891d1..c2b7ff861e 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java @@ -26,7 +26,6 @@ import org.apache.iotdb.commons.utils.TestOnly; import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.conf.adapter.CompressionRatio; -import org.apache.iotdb.db.engine.StorageEngine; import org.apache.iotdb.db.engine.flush.CloseFileListener; import org.apache.iotdb.db.engine.flush.FlushListener; import org.apache.iotdb.db.engine.flush.FlushManager; @@ -85,6 +84,7 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -786,11 +786,26 @@ public class TsFileProcessor { tsFileProcessorInfo.addTSPMemCost(chunkMetadataIncrement); WriteMemoryController controller = WriteMemoryController.getInstance(); boolean allocateMemory = false; + long startTime = System.currentTimeMillis(); try { - allocateMemory = controller.tryAllocateMemory(memTableIncrement, storageGroupInfo, this); - if (!allocateMemory) { - StorageEngine.blockInsertionIfReject(this); - } + do { + if (this.shouldFlush()) { + break; + } + allocateMemory = controller.tryAllocateMemory(memTableIncrement, storageGroupInfo, this); + try { + if (!allocateMemory) { + TimeUnit.MILLISECONDS.sleep(config.getCheckPeriodWhenInsertBlocked()); + if (System.currentTimeMillis() - startTime + > config.getMaxWaitingTimeWhenInsertBlocked()) { + throw new WriteProcessRejectException( + "System rejected over " + (System.currentTimeMillis() - startTime) + "ms"); + } + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } while (!allocateMemory); } catch (WriteProcessRejectException e) { storageGroupInfo.releaseStorageGroupMemCost(memTableIncrement); tsFileProcessorInfo.releaseTSPMemCost(chunkMetadataIncrement); diff --git a/server/src/main/java/org/apache/iotdb/db/rescon/memory/MemoryController.java b/server/src/main/java/org/apache/iotdb/db/rescon/memory/MemoryController.java index 80fe1063a7..cffe20e9f4 100644 --- a/server/src/main/java/org/apache/iotdb/db/rescon/memory/MemoryController.java +++ b/server/src/main/java/org/apache/iotdb/db/rescon/memory/MemoryController.java @@ -58,7 +58,7 @@ public class MemoryController<T> { * @param size * @return true if success to allocate else false */ - public boolean tryAllocateMemory(long size, T triggerParam) { + public boolean tryAllocateMemory(long size, T triggerParam, boolean runTrigger) { while (true) { long current = memoryUsage.get(); long newUsage = current + size; @@ -70,7 +70,9 @@ public class MemoryController<T> { } if (memoryUsage.compareAndSet(current, newUsage)) { - checkTrigger(newUsage, triggerParam); + if (runTrigger) { + checkTrigger(newUsage, triggerParam); + } return true; } } @@ -84,10 +86,10 @@ public class MemoryController<T> { * @throws InterruptedException */ public void allocateMemoryMayBlock(long size, T triggerParam) throws InterruptedException { - if (!tryAllocateMemory(size, triggerParam)) { + if (!tryAllocateMemory(size, triggerParam, true)) { lock.lock(); try { - while (!tryAllocateMemory(size, triggerParam)) { + while (!tryAllocateMemory(size, triggerParam, true)) { condition.await(); } } finally { @@ -107,10 +109,10 @@ public class MemoryController<T> { public boolean allocateMemoryMayBlock(long size, long timeout, T triggerParam) throws InterruptedException { long startTime = System.currentTimeMillis(); - if (!tryAllocateMemory(size, triggerParam)) { + if (!tryAllocateMemory(size, triggerParam, true)) { lock.lock(); try { - while (tryAllocateMemory(size, triggerParam)) { + while (tryAllocateMemory(size, triggerParam, true)) { if (System.currentTimeMillis() - startTime >= timeout) { return false; } diff --git a/server/src/main/java/org/apache/iotdb/db/rescon/memory/WriteMemoryController.java b/server/src/main/java/org/apache/iotdb/db/rescon/memory/WriteMemoryController.java index 6fe2feb6e2..c2538bf59d 100644 --- a/server/src/main/java/org/apache/iotdb/db/rescon/memory/WriteMemoryController.java +++ b/server/src/main/java/org/apache/iotdb/db/rescon/memory/WriteMemoryController.java @@ -53,7 +53,14 @@ public class WriteMemoryController extends MemoryController<TsFileProcessor> { } public boolean tryAllocateMemory(long size, StorageGroupInfo info, TsFileProcessor processor) { - boolean success = super.tryAllocateMemory(size, processor); + boolean success = false; + if (size < REJECT_THRESHOLD) { + success = super.tryAllocateMemory(size, processor, true); + } else { + if (chooseMemtableToFlush(processor)) { + success = super.tryAllocateMemory(size, processor, false); + } + } if (memoryUsage.get() > REJECT_THRESHOLD && !rejected) { logger.info( "Change system to reject status. Triggered by: logical SG ({}), mem cost delta ({}), totalSgMemCost ({}).", @@ -100,15 +107,15 @@ public class WriteMemoryController extends MemoryController<TsFileProcessor> { flushingMemory.addAndGet(size); } - protected void chooseMemtableToFlush(TsFileProcessor currentTsFileProcessor) { + protected boolean chooseMemtableToFlush(TsFileProcessor currentTsFileProcessor) { // If invoke flush by replaying logs, do not flush now! if (infoSet.size() == 0) { - return; + return false; } long memCost = 0; long activeMemSize = memoryUsage.get() - flushingMemory.get(); if (activeMemSize - memCost < FLUSH_THRESHOLD) { - return; + return false; } PriorityQueue<TsFileProcessor> allTsFileProcessors = new PriorityQueue<>( @@ -117,10 +124,11 @@ public class WriteMemoryController extends MemoryController<TsFileProcessor> { allTsFileProcessors.addAll(storageGroupInfo.getAllReportedTsp()); } long selectedCount = 0; + boolean currentTsFileProcessorSelected = false; while (activeMemSize - memCost > FLUSH_THRESHOLD) { if (allTsFileProcessors.isEmpty() || allTsFileProcessors.peek().getWorkMemTableRamCost() == 0) { - return; + return false; } TsFileProcessor selectedTsFileProcessor = allTsFileProcessors.poll(); if (selectedTsFileProcessor == null) { @@ -130,15 +138,14 @@ public class WriteMemoryController extends MemoryController<TsFileProcessor> { || selectedTsFileProcessor.getWorkMemTable().shouldFlush()) { continue; } + if (selectedTsFileProcessor == currentTsFileProcessor) { + currentTsFileProcessorSelected = true; + } memCost += selectedTsFileProcessor.getWorkMemTableRamCost(); selectedTsFileProcessor.setWorkMemTableShouldFlush(); flushTaskSubmitThreadPool.submit(selectedTsFileProcessor::submitAFlushTask); selectedCount++; } - logger.info( - "Select {} memtable to flush, flushing memory is {}, remaining memory is {}", - selectedCount, - flushingMemory.get(), - memoryUsage.get() - flushingMemory.get()); + return currentTsFileProcessorSelected; } }