This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch load-task-cleanup
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 5fc5d3988a7e343a45167faa203e417e0c71ac29
Author: Steve Yurong Su <[email protected]>
AuthorDate: Fri Jan 12 14:18:16 2024 +0800

    refactor
---
 .../execution/load/LoadTsFileManager.java          | 52 ++++++++++++++++------
 .../iotdb/db/storageengine/StorageEngine.java      |  2 +-
 2 files changed, 39 insertions(+), 15 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java
index 17907fd47f8..8455104dc12 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java
@@ -19,6 +19,19 @@
 
 package org.apache.iotdb.db.queryengine.execution.load;
 
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.DirectoryNotEmptyException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.PriorityBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
 import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
 import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.commons.conf.IoTDBConstant;
@@ -41,23 +54,9 @@ import 
org.apache.iotdb.db.storageengine.dataregion.utils.TsFileResourceUtils;
 import org.apache.iotdb.metrics.utils.MetricLevel;
 import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
 import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.File;
-import java.io.IOException;
-import java.nio.file.DirectoryNotEmptyException;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Objects;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-
 /**
  * {@link LoadTsFileManager} is used for dealing with {@link 
LoadTsFilePieceNode} and {@link
  * LoadCommand}. This class turn the content of a piece of loading TsFile into 
a new TsFile. When
@@ -80,6 +79,7 @@ public class LoadTsFileManager {
 
   private final ScheduledExecutorService cleanupExecutors;
   private final Map<String, ScheduledFuture<?>> uuid2Future;
+  private final PriorityBlockingQueue<String> uuidQueue = new 
PriorityBlockingQueue<>();
 
   public LoadTsFileManager() {
     this.loadDir = 
SystemFileFactory.INSTANCE.getFile(CONFIG.getLoadTsFileDir());
@@ -375,6 +375,30 @@ public class LoadTsFileManager {
     }
   }
 
+  private static class TaskCleanupTask implements Runnable, 
Comparable<TaskCleanupTask> {
+
+    private final String uuid;
+    private final LoadTsFileManager loadTsFileManager;
+
+    private final long scheduledTime;
+
+    private TaskCleanupTask(String uuid, LoadTsFileManager loadTsFileManager, 
long delayInMs) {
+      this.uuid = uuid;
+      this.loadTsFileManager = loadTsFileManager;
+      scheduledTime = System.currentTimeMillis() + delayInMs;
+    }
+
+    @Override
+    public void run() {
+      loadTsFileManager.forceCloseWriterManager(uuid);
+    }
+
+    @Override
+    public int compareTo(TaskCleanupTask that) {
+      return Long.compare(this.scheduledTime, that.scheduledTime);
+    }
+  }
+
   private static class DataPartitionInfo {
 
     private final DataRegion dataRegion;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
index 3fe138d2dc1..f89247c26d3 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
@@ -143,7 +143,7 @@ public class StorageEngine implements IService {
   private List<FlushListener> customFlushListeners = new ArrayList<>();
   private int recoverDataRegionNum = 0;
 
-  private LoadTsFileManager loadTsFileManager;
+  private volatile LoadTsFileManager loadTsFileManager;
 
   private StorageEngine() {}
 

Reply via email to