This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch load-task-cleanup in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 5fc5d3988a7e343a45167faa203e417e0c71ac29 Author: Steve Yurong Su <[email protected]> AuthorDate: Fri Jan 12 14:18:16 2024 +0800 refactor --- .../execution/load/LoadTsFileManager.java | 52 ++++++++++++++++------ .../iotdb/db/storageengine/StorageEngine.java | 2 +- 2 files changed, 39 insertions(+), 15 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java index 17907fd47f8..8455104dc12 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java @@ -19,6 +19,19 @@ package org.apache.iotdb.db.queryengine.execution.load; +import java.io.File; +import java.io.IOException; +import java.nio.file.DirectoryNotEmptyException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.PriorityBlockingQueue; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot; import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory; import org.apache.iotdb.commons.conf.IoTDBConstant; @@ -41,23 +54,9 @@ import org.apache.iotdb.db.storageengine.dataregion.utils.TsFileResourceUtils; import org.apache.iotdb.metrics.utils.MetricLevel; import org.apache.iotdb.tsfile.common.constant.TsFileConstant; import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.File; -import java.io.IOException; -import java.nio.file.DirectoryNotEmptyException; -import java.nio.file.Files; -import java.nio.file.Path; -import java.util.HashMap; -import java.util.Map; -import java.util.Objects; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; - /** * {@link LoadTsFileManager} is used for dealing with {@link LoadTsFilePieceNode} and {@link * LoadCommand}. This class turn the content of a piece of loading TsFile into a new TsFile. When @@ -80,6 +79,7 @@ public class LoadTsFileManager { private final ScheduledExecutorService cleanupExecutors; private final Map<String, ScheduledFuture<?>> uuid2Future; + private final PriorityBlockingQueue<String> uuidQueue = new PriorityBlockingQueue<>(); public LoadTsFileManager() { this.loadDir = SystemFileFactory.INSTANCE.getFile(CONFIG.getLoadTsFileDir()); @@ -375,6 +375,30 @@ public class LoadTsFileManager { } } + private static class TaskCleanupTask implements Runnable, Comparable<TaskCleanupTask> { + + private final String uuid; + private final LoadTsFileManager loadTsFileManager; + + private final long scheduledTime; + + private TaskCleanupTask(String uuid, LoadTsFileManager loadTsFileManager, long delayInMs) { + this.uuid = uuid; + this.loadTsFileManager = loadTsFileManager; + scheduledTime = System.currentTimeMillis() + delayInMs; + } + + @Override + public void run() { + loadTsFileManager.forceCloseWriterManager(uuid); + } + + @Override + public int compareTo(TaskCleanupTask that) { + return Long.compare(this.scheduledTime, that.scheduledTime); + } + } + private static class DataPartitionInfo { private final DataRegion dataRegion; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java index 3fe138d2dc1..f89247c26d3 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java @@ -143,7 +143,7 @@ public class StorageEngine implements IService { private List<FlushListener> customFlushListeners = new ArrayList<>(); private int recoverDataRegionNum = 0; - private LoadTsFileManager loadTsFileManager; + private volatile LoadTsFileManager loadTsFileManager; private StorageEngine() {}
