This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch load-task-cleanup in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit c9fa43efc39d4e4dc44fda775e7f0c57e18da1e1 Author: Steve Yurong Su <[email protected]> AuthorDate: Thu Jan 18 13:10:18 2024 +0800 Load: Introduce LoadTsFileManager.CleanupTask to force close writer manager when exception occurs --- .../execution/load/LoadTsFileManager.java | 155 ++++++++++++--------- 1 file changed, 92 insertions(+), 63 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java index 19239b0bfd7..1d5cbd0205f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java @@ -19,21 +19,7 @@ package org.apache.iotdb.db.queryengine.execution.load; -import java.io.File; -import java.io.IOException; -import java.nio.file.DirectoryNotEmptyException; -import java.nio.file.Files; -import java.nio.file.Path; -import java.util.HashMap; -import java.util.Map; -import java.util.Objects; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.PriorityBlockingQueue; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot; -import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory; import org.apache.iotdb.commons.conf.IoTDBConstant; import org.apache.iotdb.commons.consensus.index.ProgressIndex; import org.apache.iotdb.commons.file.SystemFileFactory; @@ -44,6 +30,7 @@ import org.apache.iotdb.commons.utils.FileUtils; import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.exception.LoadFileException; +import org.apache.iotdb.db.pipe.agent.PipeAgent; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadTsFilePieceNode; import org.apache.iotdb.db.queryengine.plan.scheduler.load.LoadTsFileScheduler; import org.apache.iotdb.db.queryengine.plan.scheduler.load.LoadTsFileScheduler.LoadCommand; @@ -54,9 +41,21 @@ import org.apache.iotdb.db.storageengine.dataregion.utils.TsFileResourceUtils; import org.apache.iotdb.metrics.utils.MetricLevel; import org.apache.iotdb.tsfile.common.constant.TsFileConstant; import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.File; +import java.io.IOException; +import java.nio.file.DirectoryNotEmptyException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.PriorityBlockingQueue; + /** * {@link LoadTsFileManager} is used for dealing with {@link LoadTsFilePieceNode} and {@link * LoadCommand}. This class turn the content of a piece of loading TsFile into a new TsFile. When @@ -75,22 +74,52 @@ public class LoadTsFileManager { private final File loadDir; - private final Map<String, TsFileWriterManager> uuid2WriterManager; + private final Map<String, TsFileWriterManager> uuid2WriterManager = new ConcurrentHashMap<>(); - private final ScheduledExecutorService cleanupExecutors; - private final Map<String, ScheduledFuture<?>> uuid2Future; - private final PriorityBlockingQueue<String> uuidQueue = new PriorityBlockingQueue<>(); + private final Map<String, CleanupTask> uuid2CleanupTask = new ConcurrentHashMap<>(); + private final PriorityBlockingQueue<CleanupTask> cleanupTaskQueue = new PriorityBlockingQueue<>(); public LoadTsFileManager() { this.loadDir = SystemFileFactory.INSTANCE.getFile(CONFIG.getLoadTsFileDir()); - this.uuid2WriterManager = new ConcurrentHashMap<>(); - this.cleanupExecutors = - IoTDBThreadPoolFactory.newScheduledThreadPool(1, LoadTsFileManager.class.getName()); - this.uuid2Future = new ConcurrentHashMap<>(); + registerCleanupTaskExecutor(); recover(); } + private void registerCleanupTaskExecutor() { + PipeAgent.runtime() + .registerPeriodicalJob( + "LoadTsFileManager#cleanupTasks", + this::cleanupTasks, + LoadTsFileScheduler.LOAD_TASK_MAX_TIME_IN_SECOND); + } + + private void cleanupTasks() { + while (!cleanupTaskQueue.isEmpty()) { + synchronized (uuid2CleanupTask) { + if (cleanupTaskQueue.isEmpty()) { + continue; + } + + final CleanupTask cleanupTask = cleanupTaskQueue.peek(); + if (cleanupTask.scheduledTime <= System.currentTimeMillis()) { + cleanupTask.run(); + + uuid2CleanupTask.remove(cleanupTask.uuid); + cleanupTaskQueue.poll(); + } else { + final long waitTimeInMs = cleanupTask.scheduledTime - System.currentTimeMillis(); + LOGGER.info( + "Next load cleanup task {} is not ready to run, wait for at least {} ms ({}s).", + cleanupTask.uuid, + waitTimeInMs, + waitTimeInMs / 1000.0); + return; + } + } + } + } + private void recover() { if (!loadDir.exists()) { return; @@ -106,26 +135,28 @@ public class LoadTsFileManager { uuid2WriterManager.put(uuid, writerManager); writerManager.close(); - uuid2Future.put( - uuid, - cleanupExecutors.schedule( - () -> forceCloseWriterManager(uuid), - LoadTsFileScheduler.LOAD_TASK_MAX_TIME_IN_SECOND, - TimeUnit.SECONDS)); + + synchronized (uuid2CleanupTask) { + final CleanupTask cleanupTask = + new CleanupTask(uuid, LoadTsFileScheduler.LOAD_TASK_MAX_TIME_IN_SECOND * 1000); + uuid2CleanupTask.put(uuid, cleanupTask); + cleanupTaskQueue.add(cleanupTask); + } } } public void writeToDataRegion(DataRegion dataRegion, LoadTsFilePieceNode pieceNode, String uuid) throws IOException { if (!uuid2WriterManager.containsKey(uuid)) { - uuid2Future.put( - uuid, - cleanupExecutors.schedule( - () -> forceCloseWriterManager(uuid), - LoadTsFileScheduler.LOAD_TASK_MAX_TIME_IN_SECOND, - TimeUnit.SECONDS)); + synchronized (uuid2CleanupTask) { + final CleanupTask cleanupTask = + new CleanupTask(uuid, LoadTsFileScheduler.LOAD_TASK_MAX_TIME_IN_SECOND * 1000); + uuid2CleanupTask.put(uuid, cleanupTask); + cleanupTaskQueue.add(cleanupTask); + } } - TsFileWriterManager writerManager = + + final TsFileWriterManager writerManager = uuid2WriterManager.computeIfAbsent( uuid, o -> new TsFileWriterManager(SystemFileFactory.INSTANCE.getFile(loadDir, uuid))); for (TsFileData tsFileData : pieceNode.getAllTsFileData()) { @@ -158,29 +189,19 @@ public class LoadTsFileManager { } private void clean(String uuid) { - uuid2WriterManager.get(uuid).close(); - uuid2WriterManager.remove(uuid); - uuid2Future.get(uuid).cancel(true); - uuid2Future.remove(uuid); - - final Path loadDirPath = loadDir.toPath(); - if (!Files.exists(loadDirPath)) { - return; - } - try { - Files.deleteIfExists(loadDirPath); - LOGGER.info("Load dir {} was deleted.", loadDirPath); - } catch (DirectoryNotEmptyException e) { - LOGGER.info("Load dir {} is not empty, skip deleting.", loadDirPath); - } catch (IOException e) { - LOGGER.info(MESSAGE_DELETE_FAIL, loadDirPath); + synchronized (uuid2CleanupTask) { + final CleanupTask cleanupTask = uuid2CleanupTask.get(uuid); + if (cleanupTask != null) { + cleanupTask.cancel(); + } } + + forceCloseWriterManager(uuid); } private void forceCloseWriterManager(String uuid) { uuid2WriterManager.get(uuid).close(); uuid2WriterManager.remove(uuid); - uuid2Future.remove(uuid); final Path loadDirPath = loadDir.toPath(); if (!Files.exists(loadDirPath)) { @@ -375,26 +396,38 @@ public class LoadTsFileManager { } } - private static class TaskCleanupTask implements Runnable, Comparable<TaskCleanupTask> { + private class CleanupTask implements Runnable, Comparable<CleanupTask> { private final String uuid; - private final LoadTsFileManager loadTsFileManager; - private final long scheduledTime; - private TaskCleanupTask(String uuid, LoadTsFileManager loadTsFileManager, long delayInMs) { + private volatile boolean isCanceled = false; + + private CleanupTask(String uuid, long delayInMs) { this.uuid = uuid; - this.loadTsFileManager = loadTsFileManager; scheduledTime = System.currentTimeMillis() + delayInMs; } + public void cancel() { + isCanceled = true; + } + @Override public void run() { - loadTsFileManager.forceCloseWriterManager(uuid); + if (isCanceled) { + LOGGER.info("Load cleanup task {} is canceled.", uuid); + } else { + LOGGER.info("Load cleanup task {} starts.", uuid); + try { + forceCloseWriterManager(uuid); + } catch (Exception e) { + LOGGER.warn("Load cleanup task {} error.", uuid, e); + } + } } @Override - public int compareTo(TaskCleanupTask that) { + public int compareTo(CleanupTask that) { return Long.compare(this.scheduledTime, that.scheduledTime); } } @@ -413,10 +446,6 @@ public class LoadTsFileManager { return dataRegion; } - public TTimePartitionSlot getTimePartitionSlot() { - return timePartitionSlot; - } - @Override public String toString() { return String.join(
