This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch load-task-cleanup
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit c9fa43efc39d4e4dc44fda775e7f0c57e18da1e1
Author: Steve Yurong Su <[email protected]>
AuthorDate: Thu Jan 18 13:10:18 2024 +0800

    Load: Introduce LoadTsFileManager.CleanupTask to force close writer manager 
when exception occurs
---
 .../execution/load/LoadTsFileManager.java          | 155 ++++++++++++---------
 1 file changed, 92 insertions(+), 63 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java
index 19239b0bfd7..1d5cbd0205f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java
@@ -19,21 +19,7 @@
 
 package org.apache.iotdb.db.queryengine.execution.load;
 
-import java.io.File;
-import java.io.IOException;
-import java.nio.file.DirectoryNotEmptyException;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Objects;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.PriorityBlockingQueue;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
 import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
-import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.commons.consensus.index.ProgressIndex;
 import org.apache.iotdb.commons.file.SystemFileFactory;
@@ -44,6 +30,7 @@ import org.apache.iotdb.commons.utils.FileUtils;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.LoadFileException;
+import org.apache.iotdb.db.pipe.agent.PipeAgent;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadTsFilePieceNode;
 import org.apache.iotdb.db.queryengine.plan.scheduler.load.LoadTsFileScheduler;
 import 
org.apache.iotdb.db.queryengine.plan.scheduler.load.LoadTsFileScheduler.LoadCommand;
@@ -54,9 +41,21 @@ import 
org.apache.iotdb.db.storageengine.dataregion.utils.TsFileResourceUtils;
 import org.apache.iotdb.metrics.utils.MetricLevel;
 import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
 import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.DirectoryNotEmptyException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.PriorityBlockingQueue;
+
 /**
  * {@link LoadTsFileManager} is used for dealing with {@link 
LoadTsFilePieceNode} and {@link
  * LoadCommand}. This class turn the content of a piece of loading TsFile into 
a new TsFile. When
@@ -75,22 +74,52 @@ public class LoadTsFileManager {
 
   private final File loadDir;
 
-  private final Map<String, TsFileWriterManager> uuid2WriterManager;
+  private final Map<String, TsFileWriterManager> uuid2WriterManager = new 
ConcurrentHashMap<>();
 
-  private final ScheduledExecutorService cleanupExecutors;
-  private final Map<String, ScheduledFuture<?>> uuid2Future;
-  private final PriorityBlockingQueue<String> uuidQueue = new 
PriorityBlockingQueue<>();
+  private final Map<String, CleanupTask> uuid2CleanupTask = new 
ConcurrentHashMap<>();
+  private final PriorityBlockingQueue<CleanupTask> cleanupTaskQueue = new 
PriorityBlockingQueue<>();
 
   public LoadTsFileManager() {
     this.loadDir = 
SystemFileFactory.INSTANCE.getFile(CONFIG.getLoadTsFileDir());
-    this.uuid2WriterManager = new ConcurrentHashMap<>();
-    this.cleanupExecutors =
-        IoTDBThreadPoolFactory.newScheduledThreadPool(1, 
LoadTsFileManager.class.getName());
-    this.uuid2Future = new ConcurrentHashMap<>();
 
+    registerCleanupTaskExecutor();
     recover();
   }
 
+  private void registerCleanupTaskExecutor() {
+    PipeAgent.runtime()
+        .registerPeriodicalJob(
+            "LoadTsFileManager#cleanupTasks",
+            this::cleanupTasks,
+            LoadTsFileScheduler.LOAD_TASK_MAX_TIME_IN_SECOND);
+  }
+
+  private void cleanupTasks() {
+    while (!cleanupTaskQueue.isEmpty()) {
+      synchronized (uuid2CleanupTask) {
+        if (cleanupTaskQueue.isEmpty()) {
+          continue;
+        }
+
+        final CleanupTask cleanupTask = cleanupTaskQueue.peek();
+        if (cleanupTask.scheduledTime <= System.currentTimeMillis()) {
+          cleanupTask.run();
+
+          uuid2CleanupTask.remove(cleanupTask.uuid);
+          cleanupTaskQueue.poll();
+        } else {
+          final long waitTimeInMs = cleanupTask.scheduledTime - 
System.currentTimeMillis();
+          LOGGER.info(
+              "Next load cleanup task {} is not ready to run, wait for at 
least {} ms ({}s).",
+              cleanupTask.uuid,
+              waitTimeInMs,
+              waitTimeInMs / 1000.0);
+          return;
+        }
+      }
+    }
+  }
+
   private void recover() {
     if (!loadDir.exists()) {
       return;
@@ -106,26 +135,28 @@ public class LoadTsFileManager {
 
       uuid2WriterManager.put(uuid, writerManager);
       writerManager.close();
-      uuid2Future.put(
-          uuid,
-          cleanupExecutors.schedule(
-              () -> forceCloseWriterManager(uuid),
-              LoadTsFileScheduler.LOAD_TASK_MAX_TIME_IN_SECOND,
-              TimeUnit.SECONDS));
+
+      synchronized (uuid2CleanupTask) {
+        final CleanupTask cleanupTask =
+            new CleanupTask(uuid, 
LoadTsFileScheduler.LOAD_TASK_MAX_TIME_IN_SECOND * 1000);
+        uuid2CleanupTask.put(uuid, cleanupTask);
+        cleanupTaskQueue.add(cleanupTask);
+      }
     }
   }
 
   public void writeToDataRegion(DataRegion dataRegion, LoadTsFilePieceNode 
pieceNode, String uuid)
       throws IOException {
     if (!uuid2WriterManager.containsKey(uuid)) {
-      uuid2Future.put(
-          uuid,
-          cleanupExecutors.schedule(
-              () -> forceCloseWriterManager(uuid),
-              LoadTsFileScheduler.LOAD_TASK_MAX_TIME_IN_SECOND,
-              TimeUnit.SECONDS));
+      synchronized (uuid2CleanupTask) {
+        final CleanupTask cleanupTask =
+            new CleanupTask(uuid, 
LoadTsFileScheduler.LOAD_TASK_MAX_TIME_IN_SECOND * 1000);
+        uuid2CleanupTask.put(uuid, cleanupTask);
+        cleanupTaskQueue.add(cleanupTask);
+      }
     }
-    TsFileWriterManager writerManager =
+
+    final TsFileWriterManager writerManager =
         uuid2WriterManager.computeIfAbsent(
             uuid, o -> new 
TsFileWriterManager(SystemFileFactory.INSTANCE.getFile(loadDir, uuid)));
     for (TsFileData tsFileData : pieceNode.getAllTsFileData()) {
@@ -158,29 +189,19 @@ public class LoadTsFileManager {
   }
 
   private void clean(String uuid) {
-    uuid2WriterManager.get(uuid).close();
-    uuid2WriterManager.remove(uuid);
-    uuid2Future.get(uuid).cancel(true);
-    uuid2Future.remove(uuid);
-
-    final Path loadDirPath = loadDir.toPath();
-    if (!Files.exists(loadDirPath)) {
-      return;
-    }
-    try {
-      Files.deleteIfExists(loadDirPath);
-      LOGGER.info("Load dir {} was deleted.", loadDirPath);
-    } catch (DirectoryNotEmptyException e) {
-      LOGGER.info("Load dir {} is not empty, skip deleting.", loadDirPath);
-    } catch (IOException e) {
-      LOGGER.info(MESSAGE_DELETE_FAIL, loadDirPath);
+    synchronized (uuid2CleanupTask) {
+      final CleanupTask cleanupTask = uuid2CleanupTask.get(uuid);
+      if (cleanupTask != null) {
+        cleanupTask.cancel();
+      }
     }
+
+    forceCloseWriterManager(uuid);
   }
 
   private void forceCloseWriterManager(String uuid) {
     uuid2WriterManager.get(uuid).close();
     uuid2WriterManager.remove(uuid);
-    uuid2Future.remove(uuid);
 
     final Path loadDirPath = loadDir.toPath();
     if (!Files.exists(loadDirPath)) {
@@ -375,26 +396,38 @@ public class LoadTsFileManager {
     }
   }
 
-  private static class TaskCleanupTask implements Runnable, 
Comparable<TaskCleanupTask> {
+  private class CleanupTask implements Runnable, Comparable<CleanupTask> {
 
     private final String uuid;
-    private final LoadTsFileManager loadTsFileManager;
-
     private final long scheduledTime;
 
-    private TaskCleanupTask(String uuid, LoadTsFileManager loadTsFileManager, 
long delayInMs) {
+    private volatile boolean isCanceled = false;
+
+    private CleanupTask(String uuid, long delayInMs) {
       this.uuid = uuid;
-      this.loadTsFileManager = loadTsFileManager;
       scheduledTime = System.currentTimeMillis() + delayInMs;
     }
 
+    public void cancel() {
+      isCanceled = true;
+    }
+
     @Override
     public void run() {
-      loadTsFileManager.forceCloseWriterManager(uuid);
+      if (isCanceled) {
+        LOGGER.info("Load cleanup task {} is canceled.", uuid);
+      } else {
+        LOGGER.info("Load cleanup task {} starts.", uuid);
+        try {
+          forceCloseWriterManager(uuid);
+        } catch (Exception e) {
+          LOGGER.warn("Load cleanup task {} error.", uuid, e);
+        }
+      }
     }
 
     @Override
-    public int compareTo(TaskCleanupTask that) {
+    public int compareTo(CleanupTask that) {
       return Long.compare(this.scheduledTime, that.scheduledTime);
     }
   }
@@ -413,10 +446,6 @@ public class LoadTsFileManager {
       return dataRegion;
     }
 
-    public TTimePartitionSlot getTimePartitionSlot() {
-      return timePartitionSlot;
-    }
-
     @Override
     public String toString() {
       return String.join(

Reply via email to