This is an automated email from the ASF dual-hosted git repository. xingtanzjr pushed a commit to branch compaction_worker_refactor_0928 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 28ab2d75825cc990aab8322a194e1a4e947f0cba Author: Jinrui.Zhang <[email protected]> AuthorDate: Thu Sep 28 15:11:02 2023 +0800 tmp save --- .../FileCannotTransitToCompactingException.java | 49 ++++++++++++++++++++++ .../execute/task/AbstractCompactionTask.java | 15 +++++++ .../execute/task/CrossSpaceCompactionTask.java | 25 +++++++++++ .../execute/task/InnerSpaceCompactionTask.java | 23 ++++++++++ .../compaction/schedule/CompactionWorker.java | 43 +++++++++++++++---- .../db/storageengine/rescon/memory/SystemInfo.java | 8 ++-- 6 files changed, 153 insertions(+), 10 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/exception/FileCannotTransitToCompactingException.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/exception/FileCannotTransitToCompactingException.java new file mode 100644 index 00000000000..d14ae4a44db --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/exception/FileCannotTransitToCompactingException.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception; + +import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; + +public class FileCannotTransitToCompactingException extends Exception { + + public FileCannotTransitToCompactingException(TsFileResource f) { + super( + String.format("TsFile %s cannot transit to COMPACTING. its status: %s", f, f.getStatus())); + } + + public FileCannotTransitToCompactingException() {} + + public FileCannotTransitToCompactingException(String message) { + super(message); + } + + public FileCannotTransitToCompactingException(String message, Throwable cause) { + super(message, cause); + } + + public FileCannotTransitToCompactingException(Throwable cause) { + super(cause); + } + + public FileCannotTransitToCompactingException( + String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) { + super(message, cause, enableSuppression, writableStackTrace); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/AbstractCompactionTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/AbstractCompactionTask.java index 40504650338..245961b382e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/AbstractCompactionTask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/AbstractCompactionTask.java @@ -20,6 +20,7 @@ package org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task; import org.apache.iotdb.db.service.metrics.CompactionMetrics; +import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.FileCannotTransitToCompactingException; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.ICompactionPerformer; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.CompactionUtils; import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionTaskManager; @@ -27,6 +28,10 @@ import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileManager; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; @@ -150,6 +155,16 @@ public abstract class AbstractCompactionTask { */ public abstract boolean checkValidAndSetMerging(); + public abstract void transitSourceFilesToMerging() throws FileCannotTransitToCompactingException; + + public abstract long getEstimatedMemoryCost() throws IOException; + + public abstract int getProcessedFileNum(); + + public boolean isCompactionAllowed() { + return tsFileManager.isAllowCompaction(); + } + @Override public int hashCode() { return super.hashCode(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/CrossSpaceCompactionTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/CrossSpaceCompactionTask.java index 99236e11962..15cb7d1f05c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/CrossSpaceCompactionTask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/CrossSpaceCompactionTask.java @@ -26,6 +26,7 @@ import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.CompactionFileCountExceededException; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.CompactionMemoryNotEnoughException; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.CompactionValidationFailedException; +import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.FileCannotTransitToCompactingException; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.ICrossCompactionPerformer; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl.FastCompactionPerformer; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.subtask.FastCompactionTaskSummary; @@ -400,6 +401,30 @@ public class CrossSpaceCompactionTask extends AbstractCompactionTask { return addReadLockSuccess; } + @Override + public void transitSourceFilesToMerging() throws FileCannotTransitToCompactingException { + for (TsFileResource f : selectedSequenceFiles) { + if (!f.setStatus(TsFileResourceStatus.COMPACTING)) { + throw new FileCannotTransitToCompactingException(f); + } + } + for (TsFileResource f : selectedUnsequenceFiles) { + if (!f.setStatus(TsFileResourceStatus.COMPACTING)) { + throw new FileCannotTransitToCompactingException(f); + } + } + } + + @Override + public long getEstimatedMemoryCost() { + return memoryCost; + } + + @Override + public int getProcessedFileNum() { + return selectedSequenceFiles.size() + selectedUnsequenceFiles.size(); + } + private boolean addReadLock(List<TsFileResource> tsFileResourceList) { try { for (TsFileResource tsFileResource : tsFileResourceList) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java index 6a22b9fa606..809e618d989 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/InnerSpaceCompactionTask.java @@ -27,6 +27,7 @@ import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.CompactionFileCountExceededException; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.CompactionMemoryNotEnoughException; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.CompactionValidationFailedException; +import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.FileCannotTransitToCompactingException; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.ICompactionPerformer; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl.FastCompactionPerformer; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl.ReadChunkCompactionPerformer; @@ -520,6 +521,28 @@ public class InnerSpaceCompactionTask extends AbstractCompactionTask { return true; } + @Override + public void transitSourceFilesToMerging() throws FileCannotTransitToCompactingException { + for (TsFileResource f : selectedTsFileResourceList) { + if (!f.setStatus(TsFileResourceStatus.COMPACTING)) { + throw new FileCannotTransitToCompactingException(f); + } + } + } + + @Override + public long getEstimatedMemoryCost() throws IOException { + if (memoryCost == 0L) { + memoryCost = innerSpaceEstimator.estimateInnerCompactionMemory(selectedTsFileResourceList); + } + return memoryCost; + } + + @Override + public int getProcessedFileNum() { + return selectedTsFileResourceList.size(); + } + @Override protected void createSummary() { if (performer instanceof FastCompactionPerformer) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionWorker.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionWorker.java index f5bb18b5b54..64be3cdd09f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionWorker.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionWorker.java @@ -19,8 +19,12 @@ package org.apache.iotdb.db.storageengine.dataregion.compaction.schedule; +import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.CompactionFileCountExceededException; +import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.CompactionMemoryNotEnoughException; +import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.FileCannotTransitToCompactingException; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.AbstractCompactionTask; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.CompactionTaskSummary; +import org.apache.iotdb.db.storageengine.rescon.memory.SystemInfo; import org.apache.iotdb.db.utils.datastructure.FixedPriorityBlockingQueue; import org.slf4j.Logger; @@ -28,6 +32,7 @@ import org.slf4j.LoggerFactory; import javax.validation.constraints.NotNull; +import java.io.IOException; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @@ -55,15 +60,39 @@ public class CompactionWorker implements Runnable { log.warn("CompactionThread-{} terminates because interruption", threadId); return; } + if (task == null || !task.isCompactionAllowed()) { + log.info("Compaction task is not allowed to be executed by TsFileManager. Task {}", task); + return; + } + long estimatedMemoryCost = 0L; + boolean memoryAcquired = false; + boolean fileHandleAcquired = false; try { - if (task != null && task.checkValidAndSetMerging()) { - CompactionTaskSummary summary = task.getSummary(); - CompactionTaskFuture future = new CompactionTaskFuture(summary); - CompactionTaskManager.getInstance().recordTask(task, future); - task.start(); + task.transitSourceFilesToMerging(); + estimatedMemoryCost = task.getEstimatedMemoryCost(); + memoryAcquired = SystemInfo.getInstance().addCompactionMemoryCost(estimatedMemoryCost, 60); + fileHandleAcquired = + SystemInfo.getInstance().addCompactionFileNum(task.getProcessedFileNum(), 60); + CompactionTaskSummary summary = task.getSummary(); + CompactionTaskFuture future = new CompactionTaskFuture(summary); + CompactionTaskManager.getInstance().recordTask(task, future); + task.start(); + } catch (FileCannotTransitToCompactingException + | IOException + | CompactionMemoryNotEnoughException + | CompactionFileCountExceededException e) { + log.info("CompactionTask {} cannot be executed. Reason: {}", task, e); + } catch (InterruptedException e) { + log.warn("InterruptedException occurred when preparing compaction task. {}", task, e); + Thread.currentThread().interrupt(); + } finally { + task.resetCompactionCandidateStatusForAllSourceFiles(); + if (memoryAcquired) { + SystemInfo.getInstance().resetCompactionMemoryCost(estimatedMemoryCost); + } + if (fileHandleAcquired) { + SystemInfo.getInstance().decreaseCompactionFileNumCost(task.getProcessedFileNum()); } - } catch (Exception e) { - log.error("CompactionWorker.run(), Exception.", e); } } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/SystemInfo.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/SystemInfo.java index 50cbe144f61..334575f26e0 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/SystemInfo.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/SystemInfo.java @@ -188,7 +188,7 @@ public class SystemInfo { this.flushingMemTablesCost -= flushingMemTableCost; } - public void addCompactionFileNum(int fileNum, long timeOutInSecond) + public boolean addCompactionFileNum(int fileNum, long timeOutInSecond) throws InterruptedException, CompactionFileCountExceededException { if (fileNum > totalFileLimitForCrossTask) { // source file num is greater than the max file num for compaction @@ -210,12 +210,13 @@ public class SystemInfo { Thread.sleep(100); originFileNum = this.compactionFileNumCost.get(); } + return true; } - public void addCompactionMemoryCost(long memoryCost, long timeOutInSecond) + public boolean addCompactionMemoryCost(long memoryCost, long timeOutInSecond) throws InterruptedException, CompactionMemoryNotEnoughException { if (!config.isEnableCompactionMemControl()) { - return; + return false; } if (memoryCost > memorySizeForCompaction) { // required memory cost is greater than the total memory budget for compaction @@ -239,6 +240,7 @@ public class SystemInfo { Thread.sleep(100); originSize = this.compactionMemoryCost.get(); } + return true; } public synchronized void resetCompactionMemoryCost(long compactionMemoryCost) {
