This is an automated email from the ASF dual-hosted git repository. haonan pushed a commit to branch rc/1.3.1 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 711b83c1b91e48f3461296944760a7c7bf47e3e8 Author: shuwenwei <[email protected]> AuthorDate: Wed Apr 10 20:36:26 2024 +0800 delay estimate memory of InnerSpaceCompactionTask (#12314) --- .../compaction/schedule/CompactionScheduler.java | 6 --- .../compaction/schedule/CompactionTaskQueue.java | 52 ++++++++++------------ 2 files changed, 24 insertions(+), 34 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduler.java index 707f251ded7..6ebb5578670 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduler.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduler.java @@ -200,12 +200,6 @@ public class CompactionScheduler { "Compaction task start check failed because disk free ratio is less than disk_space_warning_threshold"); return false; } - // check task memory cost - long allocatedTotalCompactionMemory = SystemInfo.getInstance().getMemorySizeForCompaction(); - long estimatedTaskMemoryCost = task.getEstimatedMemoryCost(); - if (estimatedTaskMemoryCost < 0 || estimatedTaskMemoryCost > allocatedTotalCompactionMemory) { - return false; - } return true; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionTaskQueue.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionTaskQueue.java index 66a6900afb1..9f0a5dbbc7d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionTaskQueue.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionTaskQueue.java @@ -36,48 +36,44 @@ public class CompactionTaskQueue extends FixedPriorityBlockingQueue<AbstractComp public AbstractCompactionTask take() throws InterruptedException { final ReentrantLock lock = this.lock; while (true) { + AbstractCompactionTask task = null; lock.lockInterruptibly(); try { while (queue.isEmpty()) { notEmpty.await(); } - AbstractCompactionTask task = tryPollExecutableTask(); - // task == null indicates that there is no runnable task now - if (task != null) { - return task; - } + task = queue.pollFirst(); } finally { lock.unlock(); } - Thread.sleep(TimeUnit.SECONDS.toMillis(1)); - } - } - - private AbstractCompactionTask tryPollExecutableTask() { - while (true) { - if (queue.isEmpty()) { - return null; - } - AbstractCompactionTask task = queue.pollFirst(); - if (task == null) { - continue; - } - if (!checkTaskValid(task)) { - dropCompactionTask(task); - continue; - } - if (!task.tryOccupyResourcesForRunning()) { - queue.add(task); - return null; - } - if (!transitTaskFileStatus(task)) { - dropCompactionTask(task); + boolean prepareTaskSuccess = prepareTask(task); + if (!prepareTaskSuccess) { + Thread.sleep(TimeUnit.SECONDS.toMillis(1)); continue; } return task; } } + private boolean prepareTask(AbstractCompactionTask task) throws InterruptedException { + if (task == null) { + return false; + } + if (!checkTaskValid(task)) { + dropCompactionTask(task); + return false; + } + if (!task.tryOccupyResourcesForRunning()) { + put(task); + return false; + } + if (!transitTaskFileStatus(task)) { + dropCompactionTask(task); + return false; + } + return true; + } + private void dropCompactionTask(AbstractCompactionTask task) { task.resetCompactionCandidateStatusForAllSourceFiles(); task.handleTaskCleanup();
