This is an automated email from the ASF dual-hosted git repository.

jt2594838 pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new cef9184c294 Pipe: avoid blocking startup on hardlink dir cleanup 
(#17932) (#17943)
cef9184c294 is described below

commit cef9184c294496063957ac047b5133fef33f8392
Author: Caideyipi <[email protected]>
AuthorDate: Tue Jun 16 09:36:03 2026 +0800

    Pipe: avoid blocking startup on hardlink dir cleanup (#17932) (#17943)
    
    (cherry picked from commit 8fd6b0490dacec1f4034ed427c9062ab10447d1a)
---
 .../agent/runtime/PipeDataNodeRuntimeAgent.java    |   2 +-
 ...aNodeHardlinkOrCopiedFileDirStartupCleaner.java | 257 ++++++++++++++++++++-
 2 files changed, 248 insertions(+), 11 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeDataNodeRuntimeAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeDataNodeRuntimeAgent.java
index 437c15bdc1c..d47a8c4dbbd 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeDataNodeRuntimeAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeDataNodeRuntimeAgent.java
@@ -75,7 +75,7 @@ public class PipeDataNodeRuntimeAgent implements IService {
   public synchronized void preparePipeResources(
       final ResourcesInformationHolder resourcesInformationHolder) throws 
StartupException {
     // Clean sender (connector) hardlink file dir and snapshot dir
-    PipeDataNodeHardlinkOrCopiedFileDirStartupCleaner.clean();
+    
PipeDataNodeHardlinkOrCopiedFileDirStartupCleaner.clean(this::registerPeriodicalJob);
 
     // Clean receiver file dir
     PipeDataNodeAgent.receiver().cleanPipeReceiverDirs();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/PipeDataNodeHardlinkOrCopiedFileDirStartupCleaner.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/PipeDataNodeHardlinkOrCopiedFileDirStartupCleaner.java
index b58d934988f..009daea6c24 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/PipeDataNodeHardlinkOrCopiedFileDirStartupCleaner.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/PipeDataNodeHardlinkOrCopiedFileDirStartupCleaner.java
@@ -28,33 +28,270 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
+import java.io.IOException;
+import java.nio.file.FileVisitResult;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.SimpleFileVisitor;
+import java.nio.file.attribute.BasicFileAttributes;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
 
 public class PipeDataNodeHardlinkOrCopiedFileDirStartupCleaner {
 
   private static final Logger LOGGER =
       
LoggerFactory.getLogger(PipeDataNodeHardlinkOrCopiedFileDirStartupCleaner.class);
+  private static final String STALE_PIPE_DIR_SUFFIX = ".startup-cleaning-";
+  private static final String PERIODICAL_CLEANUP_JOB_ID =
+      "PipeDataNodeHardlinkOrCopiedFileDirStartupCleaner#cleanTsFileDir()";
+  private static final long DELETE_MAX_PATH_COUNT_PER_ROUND = 100_000L;
+  private static final long DELETE_MAX_TIME_PER_ROUND_MS = 1_000L;
 
   /**
    * Delete the data directory and all of its subdirectories that contain the
    * PipeConfig.PIPE_TSFILE_DIR_NAME directory.
    */
-  public static void clean() {
-    cleanTsFileDir();
+  public static void clean(final PeriodicalJobRegistrar 
periodicalJobRegistrar) {
+    cleanTsFileDir(periodicalJobRegistrar);
     cleanSnapshotDir();
   }
 
-  private static void cleanTsFileDir() {
+  private static void cleanTsFileDir(final PeriodicalJobRegistrar 
periodicalJobRegistrar) {
+    final String pipeHardlinkBaseDirName = 
PipeConfig.getInstance().getPipeHardlinkBaseDirName();
+    final List<File> stalePipeDirs = new ArrayList<>();
     for (final String dataDir : 
IoTDBDescriptor.getInstance().getConfig().getDataDirs()) {
-      final File pipeHardLinkDir =
-          new File(
-              dataDir + File.separator + 
PipeConfig.getInstance().getPipeHardlinkBaseDirName());
+      final File localDataDir = new File(dataDir);
+      collectInterruptedStalePipeDirs(localDataDir, pipeHardlinkBaseDirName, 
stalePipeDirs);
+
+      final File pipeHardLinkDir = new File(localDataDir, 
pipeHardlinkBaseDirName);
       if (pipeHardLinkDir.isDirectory()) {
-        LOGGER.info(
-            "Pipe hardlink dir found, deleting it: {}, result: {}",
-            pipeHardLinkDir,
-            FileUtils.deleteQuietly(pipeHardLinkDir));
+        moveAsideAndCollect(pipeHardLinkDir, pipeHardlinkBaseDirName, 
stalePipeDirs);
       }
     }
+    registerPeriodicalCleanupJob(periodicalJobRegistrar, stalePipeDirs);
+  }
+
+  private static void collectInterruptedStalePipeDirs(
+      final File localDataDir,
+      final String pipeHardlinkBaseDirName,
+      final List<File> stalePipeDirs) {
+    final File[] stalePipeDirFiles =
+        localDataDir.listFiles(
+            file ->
+                file.isDirectory()
+                    && file.getName().startsWith(pipeHardlinkBaseDirName + 
STALE_PIPE_DIR_SUFFIX));
+    if (stalePipeDirFiles == null) {
+      return;
+    }
+
+    for (final File stalePipeDir : stalePipeDirFiles) {
+      LOGGER.info(
+          "Stale pipe hardlink dir found, registering it for throttled 
periodical deletion: {}",
+          stalePipeDir);
+      stalePipeDirs.add(stalePipeDir);
+    }
+  }
+
+  private static void moveAsideAndCollect(
+      final File pipeHardLinkDir,
+      final String pipeHardlinkBaseDirName,
+      final List<File> stalePipeDirs) {
+    try {
+      final File stalePipeDir = moveAside(pipeHardLinkDir, 
pipeHardlinkBaseDirName);
+      LOGGER.info(
+          "Pipe hardlink dir found, moved it from {} to {} for throttled 
periodical deletion.",
+          pipeHardLinkDir,
+          stalePipeDir);
+      stalePipeDirs.add(stalePipeDir);
+    } catch (final IOException e) {
+      LOGGER.warn(
+          "Failed to move pipe hardlink dir {} for periodical deletion, 
deleting it synchronously.",
+          pipeHardLinkDir,
+          e);
+      LOGGER.info(
+          "Pipe hardlink dir found, deleting it: {}, result: {}",
+          pipeHardLinkDir,
+          FileUtils.deleteQuietly(pipeHardLinkDir));
+    }
+  }
+
+  private static File moveAside(final File pipeHardLinkDir, final String 
pipeHardlinkBaseDirName)
+      throws IOException {
+    final File parentDir = pipeHardLinkDir.getParentFile();
+    if (parentDir == null) {
+      throw new IOException("Failed to get parent dir of " + pipeHardLinkDir);
+    }
+
+    final long timestamp = System.currentTimeMillis();
+    for (int i = 0; ; ++i) {
+      final File stalePipeDir =
+          new File(
+              parentDir, pipeHardlinkBaseDirName + STALE_PIPE_DIR_SUFFIX + 
timestamp + "-" + i);
+      if (!stalePipeDir.exists()) {
+        Files.move(pipeHardLinkDir.toPath(), stalePipeDir.toPath());
+        return stalePipeDir;
+      }
+    }
+  }
+
+  private static void registerPeriodicalCleanupJob(
+      final PeriodicalJobRegistrar periodicalJobRegistrar, final List<File> 
stalePipeDirs) {
+    if (stalePipeDirs.isEmpty()) {
+      return;
+    }
+
+    periodicalJobRegistrar.register(
+        PERIODICAL_CLEANUP_JOB_ID,
+        new PeriodicalStalePipeDirCleaner(stalePipeDirs)::cleanOneRound,
+        
PipeConfig.getInstance().getPipeSubtaskExecutorCronHeartbeatEventIntervalSeconds());
+  }
+
+  private static CleanupRoundResult deleteQuietlyWithThrottle(final File 
stalePipeDir) {
+    if (!stalePipeDir.exists()) {
+      return CleanupRoundResult.finished();
+    }
+
+    final AtomicBoolean deleteResult = new AtomicBoolean(true);
+    final AtomicLong deletedPathCount = new AtomicLong(0);
+    final long deadlineNanos = System.nanoTime() + 
DELETE_MAX_TIME_PER_ROUND_MS * 1_000_000L;
+    try {
+      Files.walkFileTree(
+          stalePipeDir.toPath(),
+          new SimpleFileVisitor<Path>() {
+            @Override
+            public FileVisitResult preVisitDirectory(
+                final Path dir, final BasicFileAttributes attrs) {
+              return shouldStop(deletedPathCount, deadlineNanos)
+                  ? FileVisitResult.TERMINATE
+                  : FileVisitResult.CONTINUE;
+            }
+
+            @Override
+            public FileVisitResult visitFile(final Path file, final 
BasicFileAttributes attrs) {
+              return deletePath(file, deleteResult, deletedPathCount, 
deadlineNanos);
+            }
+
+            @Override
+            public FileVisitResult visitFileFailed(final Path file, final 
IOException exc) {
+              deleteResult.set(false);
+              return deletePath(file, deleteResult, deletedPathCount, 
deadlineNanos);
+            }
+
+            @Override
+            public FileVisitResult postVisitDirectory(final Path dir, final 
IOException exc) {
+              if (exc != null) {
+                deleteResult.set(false);
+              }
+              return deletePath(dir, deleteResult, deletedPathCount, 
deadlineNanos);
+            }
+          });
+    } catch (final IOException e) {
+      deleteResult.set(false);
+    }
+
+    return new CleanupRoundResult(
+        deletedPathCount.get(), deleteResult.get() && !stalePipeDir.exists(), 
deleteResult.get());
+  }
+
+  private static FileVisitResult deletePath(
+      final Path path,
+      final AtomicBoolean deleteResult,
+      final AtomicLong deletedPathCount,
+      final long deadlineNanos) {
+    if (shouldStop(deletedPathCount, deadlineNanos)) {
+      return FileVisitResult.TERMINATE;
+    }
+
+    try {
+      if (Files.deleteIfExists(path)) {
+        deletedPathCount.incrementAndGet();
+      }
+    } catch (final IOException e) {
+      deleteResult.set(false);
+    }
+    return shouldStop(deletedPathCount, deadlineNanos)
+        ? FileVisitResult.TERMINATE
+        : FileVisitResult.CONTINUE;
+  }
+
+  private static boolean shouldStop(final AtomicLong deletedPathCount, final 
long deadlineNanos) {
+    return Thread.currentThread().isInterrupted()
+        || deletedPathCount.get() >= DELETE_MAX_PATH_COUNT_PER_ROUND
+        || System.nanoTime() >= deadlineNanos;
+  }
+
+  @FunctionalInterface
+  public interface PeriodicalJobRegistrar {
+
+    void register(String id, Runnable periodicalJob, long intervalInSeconds);
+  }
+
+  private static class PeriodicalStalePipeDirCleaner {
+
+    private final List<File> stalePipeDirs;
+    private int currentDirIndex;
+    private boolean finished;
+
+    private PeriodicalStalePipeDirCleaner(final List<File> stalePipeDirs) {
+      this.stalePipeDirs = stalePipeDirs;
+      currentDirIndex = 0;
+      finished = false;
+    }
+
+    private void cleanOneRound() {
+      if (finished) {
+        return;
+      }
+
+      long deletedPathCount = 0;
+      while (currentDirIndex < stalePipeDirs.size()) {
+        final File stalePipeDir = stalePipeDirs.get(currentDirIndex);
+        final CleanupRoundResult result = 
deleteQuietlyWithThrottle(stalePipeDir);
+        deletedPathCount += result.deletedPathCount;
+
+        if (result.finished) {
+          LOGGER.info(
+              "Finished deleting stale pipe hardlink dir {} by periodical job, 
result: {}",
+              stalePipeDir,
+              result.success);
+          ++currentDirIndex;
+          continue;
+        }
+
+        if (deletedPathCount > 0 || !result.success) {
+          LOGGER.info(
+              "Periodically deleted {} paths from stale pipe hardlink dirs, 
current dir: {}, "
+                  + "current round result: {}",
+              deletedPathCount,
+              stalePipeDir,
+              result.success);
+        }
+        return;
+      }
+
+      finished = true;
+      LOGGER.info("Finished deleting all stale pipe hardlink dirs by 
periodical job.");
+    }
+  }
+
+  private static class CleanupRoundResult {
+
+    private final long deletedPathCount;
+    private final boolean finished;
+    private final boolean success;
+
+    private CleanupRoundResult(
+        final long deletedPathCount, final boolean finished, final boolean 
success) {
+      this.deletedPathCount = deletedPathCount;
+      this.finished = finished;
+      this.success = success;
+    }
+
+    private static CleanupRoundResult finished() {
+      return new CleanupRoundResult(0, true, true);
+    }
   }
 
   private static void cleanSnapshotDir() {

Reply via email to