This is an automated email from the ASF dual-hosted git repository.
jt2594838 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 8fd6b0490da Pipe: avoid blocking startup on hardlink dir cleanup
(#17932)
8fd6b0490da is described below
commit 8fd6b0490dacec1f4034ed427c9062ab10447d1a
Author: Caideyipi <[email protected]>
AuthorDate: Mon Jun 15 14:19:41 2026 +0800
Pipe: avoid blocking startup on hardlink dir cleanup (#17932)
---
.../apache/iotdb/db/i18n/DataNodePipeMessages.java | 12 +
.../apache/iotdb/db/i18n/DataNodePipeMessages.java | 12 +
.../agent/runtime/PipeDataNodeRuntimeAgent.java | 2 +-
...aNodeHardlinkOrCopiedFileDirStartupCleaner.java | 254 ++++++++++++++++++++-
4 files changed, 269 insertions(+), 11 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/i18n/en/org/apache/iotdb/db/i18n/DataNodePipeMessages.java
b/iotdb-core/datanode/src/main/i18n/en/org/apache/iotdb/db/i18n/DataNodePipeMessages.java
index f669934149d..fe924fbbaf9 100644
---
a/iotdb-core/datanode/src/main/i18n/en/org/apache/iotdb/db/i18n/DataNodePipeMessages.java
+++
b/iotdb-core/datanode/src/main/i18n/en/org/apache/iotdb/db/i18n/DataNodePipeMessages.java
@@ -1266,6 +1266,18 @@ public final class DataNodePipeMessages {
"PipeTsFileResource's reference count is decreased to below 0.";
public static final String PIPE_HARDLINK_DIR_FOUND_DELETING_IT_RESULT =
"Pipe hardlink dir found, deleting it: {}, result: {}";
+ public static final String
PIPE_HARDLINK_DIR_FOUND_MOVED_TO_PERIODICAL_DELETE =
+ "Pipe hardlink dir found, moved it from {} to {} for throttled
periodical deletion.";
+ public static final String
PIPE_STALE_HARDLINK_DIR_FOUND_REGISTERING_PERIODICAL_DELETE =
+ "Stale pipe hardlink dir found, registering it for throttled periodical
deletion: {}";
+ public static final String PIPE_HARDLINK_DIR_PERIODICAL_DELETE_FINISHED =
+ "Finished deleting stale pipe hardlink dir {} by periodical job, result:
{}";
+ public static final String PIPE_HARDLINK_DIR_PERIODICAL_DELETE_PROGRESS =
+ "Periodically deleted {} paths from stale pipe hardlink dirs, current
dir: {}, current round result: {}";
+ public static final String PIPE_HARDLINK_DIR_PERIODICAL_DELETE_ALL_FINISHED =
+ "Finished deleting all stale pipe hardlink dirs by periodical job.";
+ public static final String PIPE_HARDLINK_DIR_MOVE_FAILED_DELETING_SYNC =
+ "Failed to move pipe hardlink dir {} for periodical deletion, deleting
it synchronously.";
public static final String PIPE_SNAPSHOT_DIR_FOUND_DELETING_IT =
"Pipe snapshot dir found, deleting it: {},";
public static final String
SHRINK_CALLBACK_IS_NOT_SUPPORTED_IN_PIPEFIXEDMEMORYBLOCK =
diff --git
a/iotdb-core/datanode/src/main/i18n/zh/org/apache/iotdb/db/i18n/DataNodePipeMessages.java
b/iotdb-core/datanode/src/main/i18n/zh/org/apache/iotdb/db/i18n/DataNodePipeMessages.java
index 131a83cf808..b286d7607f8 100644
---
a/iotdb-core/datanode/src/main/i18n/zh/org/apache/iotdb/db/i18n/DataNodePipeMessages.java
+++
b/iotdb-core/datanode/src/main/i18n/zh/org/apache/iotdb/db/i18n/DataNodePipeMessages.java
@@ -1221,6 +1221,18 @@ public final class DataNodePipeMessages {
"PipeTsFileResource's reference count is decreased to below 0.";
public static final String PIPE_HARDLINK_DIR_FOUND_DELETING_IT_RESULT =
"Pipe hardlink dir found, deleting it: {}, result: {}";
+ public static final String
PIPE_HARDLINK_DIR_FOUND_MOVED_TO_PERIODICAL_DELETE =
+ "Pipe hardlink dir found, moved it from {} to {} for throttled
periodical deletion.";
+ public static final String
PIPE_STALE_HARDLINK_DIR_FOUND_REGISTERING_PERIODICAL_DELETE =
+ "Stale pipe hardlink dir found, registering it for throttled periodical
deletion: {}";
+ public static final String PIPE_HARDLINK_DIR_PERIODICAL_DELETE_FINISHED =
+ "Finished deleting stale pipe hardlink dir {} by periodical job, result:
{}";
+ public static final String PIPE_HARDLINK_DIR_PERIODICAL_DELETE_PROGRESS =
+ "Periodically deleted {} paths from stale pipe hardlink dirs, current
dir: {}, current round result: {}";
+ public static final String PIPE_HARDLINK_DIR_PERIODICAL_DELETE_ALL_FINISHED =
+ "Finished deleting all stale pipe hardlink dirs by periodical job.";
+ public static final String PIPE_HARDLINK_DIR_MOVE_FAILED_DELETING_SYNC =
+ "Failed to move pipe hardlink dir {} for periodical deletion, deleting
it synchronously.";
public static final String PIPE_SNAPSHOT_DIR_FOUND_DELETING_IT =
"Pipe snapshot dir found, deleting it: {},";
public static final String
SHRINK_CALLBACK_IS_NOT_SUPPORTED_IN_PIPEFIXEDMEMORYBLOCK =
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeDataNodeRuntimeAgent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeDataNodeRuntimeAgent.java
index 3cdf92d5392..80c7f16a0b7 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeDataNodeRuntimeAgent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeDataNodeRuntimeAgent.java
@@ -81,7 +81,7 @@ public class PipeDataNodeRuntimeAgent implements IService {
public synchronized void preparePipeResources(
final ResourcesInformationHolder resourcesInformationHolder) throws
StartupException {
// Clean sender (connector) hardlink file dir and snapshot dir
- PipeDataNodeHardlinkOrCopiedFileDirStartupCleaner.clean();
+
PipeDataNodeHardlinkOrCopiedFileDirStartupCleaner.clean(this::registerPeriodicalJob);
// Clean receiver file dir
PipeDataNodeAgent.receiver().cleanPipeReceiverDirs();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/PipeDataNodeHardlinkOrCopiedFileDirStartupCleaner.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/PipeDataNodeHardlinkOrCopiedFileDirStartupCleaner.java
index 0ee642e7d52..1302134b6ce 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/PipeDataNodeHardlinkOrCopiedFileDirStartupCleaner.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/PipeDataNodeHardlinkOrCopiedFileDirStartupCleaner.java
@@ -29,33 +29,267 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
+import java.io.IOException;
+import java.nio.file.FileVisitResult;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.SimpleFileVisitor;
+import java.nio.file.attribute.BasicFileAttributes;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
public class PipeDataNodeHardlinkOrCopiedFileDirStartupCleaner {
private static final Logger LOGGER =
LoggerFactory.getLogger(PipeDataNodeHardlinkOrCopiedFileDirStartupCleaner.class);
+ private static final String STALE_PIPE_DIR_SUFFIX = ".startup-cleaning-";
+ private static final String PERIODICAL_CLEANUP_JOB_ID =
+ "PipeDataNodeHardlinkOrCopiedFileDirStartupCleaner#cleanTsFileDir()";
+ private static final long DELETE_MAX_PATH_COUNT_PER_ROUND = 100_000L;
+ private static final long DELETE_MAX_TIME_PER_ROUND_MS = 1_000L;
/**
* Delete the data directory and all of its subdirectories that contain the
* PipeConfig.PIPE_TSFILE_DIR_NAME directory.
*/
- public static void clean() {
- cleanTsFileDir();
+ public static void clean(final PeriodicalJobRegistrar
periodicalJobRegistrar) {
+ cleanTsFileDir(periodicalJobRegistrar);
cleanSnapshotDir();
}
- private static void cleanTsFileDir() {
+ private static void cleanTsFileDir(final PeriodicalJobRegistrar
periodicalJobRegistrar) {
+ final String pipeHardlinkBaseDirName =
PipeConfig.getInstance().getPipeHardlinkBaseDirName();
+ final List<File> stalePipeDirs = new ArrayList<>();
for (final String dataDir :
IoTDBDescriptor.getInstance().getConfig().getDataDirs()) {
- final File pipeHardLinkDir =
- new File(
- dataDir + File.separator +
PipeConfig.getInstance().getPipeHardlinkBaseDirName());
+ final File localDataDir = new File(dataDir);
+ collectInterruptedStalePipeDirs(localDataDir, pipeHardlinkBaseDirName,
stalePipeDirs);
+
+ final File pipeHardLinkDir = new File(localDataDir,
pipeHardlinkBaseDirName);
if (pipeHardLinkDir.isDirectory()) {
- LOGGER.info(
- DataNodePipeMessages.PIPE_HARDLINK_DIR_FOUND_DELETING_IT_RESULT,
- pipeHardLinkDir,
- FileUtils.deleteQuietly(pipeHardLinkDir));
+ moveAsideAndCollect(pipeHardLinkDir, pipeHardlinkBaseDirName,
stalePipeDirs);
}
}
+ registerPeriodicalCleanupJob(periodicalJobRegistrar, stalePipeDirs);
+ }
+
+ private static void collectInterruptedStalePipeDirs(
+ final File localDataDir,
+ final String pipeHardlinkBaseDirName,
+ final List<File> stalePipeDirs) {
+ final File[] stalePipeDirFiles =
+ localDataDir.listFiles(
+ file ->
+ file.isDirectory()
+ && file.getName().startsWith(pipeHardlinkBaseDirName +
STALE_PIPE_DIR_SUFFIX));
+ if (stalePipeDirFiles == null) {
+ return;
+ }
+
+ for (final File stalePipeDir : stalePipeDirFiles) {
+ LOGGER.info(
+
DataNodePipeMessages.PIPE_STALE_HARDLINK_DIR_FOUND_REGISTERING_PERIODICAL_DELETE,
+ stalePipeDir);
+ stalePipeDirs.add(stalePipeDir);
+ }
+ }
+
+ private static void moveAsideAndCollect(
+ final File pipeHardLinkDir,
+ final String pipeHardlinkBaseDirName,
+ final List<File> stalePipeDirs) {
+ try {
+ final File stalePipeDir = moveAside(pipeHardLinkDir,
pipeHardlinkBaseDirName);
+ LOGGER.info(
+
DataNodePipeMessages.PIPE_HARDLINK_DIR_FOUND_MOVED_TO_PERIODICAL_DELETE,
+ pipeHardLinkDir,
+ stalePipeDir);
+ stalePipeDirs.add(stalePipeDir);
+ } catch (final IOException e) {
+ LOGGER.warn(
+ DataNodePipeMessages.PIPE_HARDLINK_DIR_MOVE_FAILED_DELETING_SYNC,
pipeHardLinkDir, e);
+ LOGGER.info(
+ DataNodePipeMessages.PIPE_HARDLINK_DIR_FOUND_DELETING_IT_RESULT,
+ pipeHardLinkDir,
+ FileUtils.deleteQuietly(pipeHardLinkDir));
+ }
+ }
+
+ private static File moveAside(final File pipeHardLinkDir, final String
pipeHardlinkBaseDirName)
+ throws IOException {
+ final File parentDir = pipeHardLinkDir.getParentFile();
+ if (parentDir == null) {
+ throw new IOException("Failed to get parent dir of " + pipeHardLinkDir);
+ }
+
+ final long timestamp = System.currentTimeMillis();
+ for (int i = 0; ; ++i) {
+ final File stalePipeDir =
+ new File(
+ parentDir, pipeHardlinkBaseDirName + STALE_PIPE_DIR_SUFFIX +
timestamp + "-" + i);
+ if (!stalePipeDir.exists()) {
+ Files.move(pipeHardLinkDir.toPath(), stalePipeDir.toPath());
+ return stalePipeDir;
+ }
+ }
+ }
+
+ private static void registerPeriodicalCleanupJob(
+ final PeriodicalJobRegistrar periodicalJobRegistrar, final List<File>
stalePipeDirs) {
+ if (stalePipeDirs.isEmpty()) {
+ return;
+ }
+
+ periodicalJobRegistrar.register(
+ PERIODICAL_CLEANUP_JOB_ID,
+ new PeriodicalStalePipeDirCleaner(stalePipeDirs)::cleanOneRound,
+
PipeConfig.getInstance().getPipeSubtaskExecutorCronHeartbeatEventIntervalSeconds());
+ }
+
+ private static CleanupRoundResult deleteQuietlyWithThrottle(final File
stalePipeDir) {
+ if (!stalePipeDir.exists()) {
+ return CleanupRoundResult.finished();
+ }
+
+ final AtomicBoolean deleteResult = new AtomicBoolean(true);
+ final AtomicLong deletedPathCount = new AtomicLong(0);
+ final long deadlineNanos = System.nanoTime() +
DELETE_MAX_TIME_PER_ROUND_MS * 1_000_000L;
+ try {
+ Files.walkFileTree(
+ stalePipeDir.toPath(),
+ new SimpleFileVisitor<Path>() {
+ @Override
+ public FileVisitResult preVisitDirectory(
+ final Path dir, final BasicFileAttributes attrs) {
+ return shouldStop(deletedPathCount, deadlineNanos)
+ ? FileVisitResult.TERMINATE
+ : FileVisitResult.CONTINUE;
+ }
+
+ @Override
+ public FileVisitResult visitFile(final Path file, final
BasicFileAttributes attrs) {
+ return deletePath(file, deleteResult, deletedPathCount,
deadlineNanos);
+ }
+
+ @Override
+ public FileVisitResult visitFileFailed(final Path file, final
IOException exc) {
+ deleteResult.set(false);
+ return deletePath(file, deleteResult, deletedPathCount,
deadlineNanos);
+ }
+
+ @Override
+ public FileVisitResult postVisitDirectory(final Path dir, final
IOException exc) {
+ if (exc != null) {
+ deleteResult.set(false);
+ }
+ return deletePath(dir, deleteResult, deletedPathCount,
deadlineNanos);
+ }
+ });
+ } catch (final IOException e) {
+ deleteResult.set(false);
+ }
+
+ return new CleanupRoundResult(
+ deletedPathCount.get(), deleteResult.get() && !stalePipeDir.exists(),
deleteResult.get());
+ }
+
+ private static FileVisitResult deletePath(
+ final Path path,
+ final AtomicBoolean deleteResult,
+ final AtomicLong deletedPathCount,
+ final long deadlineNanos) {
+ if (shouldStop(deletedPathCount, deadlineNanos)) {
+ return FileVisitResult.TERMINATE;
+ }
+
+ try {
+ if (Files.deleteIfExists(path)) {
+ deletedPathCount.incrementAndGet();
+ }
+ } catch (final IOException e) {
+ deleteResult.set(false);
+ }
+ return shouldStop(deletedPathCount, deadlineNanos)
+ ? FileVisitResult.TERMINATE
+ : FileVisitResult.CONTINUE;
+ }
+
+ private static boolean shouldStop(final AtomicLong deletedPathCount, final
long deadlineNanos) {
+ return Thread.currentThread().isInterrupted()
+ || deletedPathCount.get() >= DELETE_MAX_PATH_COUNT_PER_ROUND
+ || System.nanoTime() >= deadlineNanos;
+ }
+
+ @FunctionalInterface
+ public interface PeriodicalJobRegistrar {
+
+ void register(String id, Runnable periodicalJob, long intervalInSeconds);
+ }
+
+ private static class PeriodicalStalePipeDirCleaner {
+
+ private final List<File> stalePipeDirs;
+ private int currentDirIndex;
+ private boolean finished;
+
+ private PeriodicalStalePipeDirCleaner(final List<File> stalePipeDirs) {
+ this.stalePipeDirs = stalePipeDirs;
+ currentDirIndex = 0;
+ finished = false;
+ }
+
+ private void cleanOneRound() {
+ if (finished) {
+ return;
+ }
+
+ long deletedPathCount = 0;
+ while (currentDirIndex < stalePipeDirs.size()) {
+ final File stalePipeDir = stalePipeDirs.get(currentDirIndex);
+ final CleanupRoundResult result =
deleteQuietlyWithThrottle(stalePipeDir);
+ deletedPathCount += result.deletedPathCount;
+
+ if (result.finished) {
+ LOGGER.info(
+
DataNodePipeMessages.PIPE_HARDLINK_DIR_PERIODICAL_DELETE_FINISHED,
+ stalePipeDir,
+ result.success);
+ ++currentDirIndex;
+ continue;
+ }
+
+ if (deletedPathCount > 0 || !result.success) {
+ LOGGER.info(
+
DataNodePipeMessages.PIPE_HARDLINK_DIR_PERIODICAL_DELETE_PROGRESS,
+ deletedPathCount,
+ stalePipeDir,
+ result.success);
+ }
+ return;
+ }
+
+ finished = true;
+
LOGGER.info(DataNodePipeMessages.PIPE_HARDLINK_DIR_PERIODICAL_DELETE_ALL_FINISHED);
+ }
+ }
+
+ private static class CleanupRoundResult {
+
+ private final long deletedPathCount;
+ private final boolean finished;
+ private final boolean success;
+
+ private CleanupRoundResult(
+ final long deletedPathCount, final boolean finished, final boolean
success) {
+ this.deletedPathCount = deletedPathCount;
+ this.finished = finished;
+ this.success = success;
+ }
+
+ private static CleanupRoundResult finished() {
+ return new CleanupRoundResult(0, true, true);
+ }
}
private static void cleanSnapshotDir() {