This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 487a1a21ce9 Pipe & Load: Retry when a file operation causes an 
exception (#14598)
487a1a21ce9 is described below

commit 487a1a21ce9fd9c0708c25c72336d5eb646d62a0
Author: nanxiang xia <[email protected]>
AuthorDate: Wed Jan 8 10:21:37 2025 +0800

    Pipe & Load: Retry when a file operation causes an exception (#14598)
    
    Co-authored-by: Steve Yurong Su <[email protected]>
---
 .../payload/SubscriptionFileHandler.java           | 24 +++++++---
 .../org/apache/iotdb/session/util/RetryUtils.java  | 48 +++++++++++++++++++
 .../batch/PipeTabletEventTsFileBatch.java          |  1 +
 .../async/handler/PipeTransferTsFileHandler.java   | 13 ++++-
 .../thrift/sync/IoTDBDataRegionSyncConnector.java  | 15 ++++--
 .../pipeconsensus/PipeConsensusReceiver.java       | 31 ++++++++----
 .../protocol/thrift/IoTDBDataNodeReceiver.java     |  7 ++-
 .../db/storageengine/dataregion/DataRegion.java    | 55 ++++++++++++++++------
 .../db/storageengine/load/LoadTsFileManager.java   | 21 +++++++--
 .../load/active/ActiveLoadDirScanner.java          | 27 ++++++-----
 .../load/active/ActiveLoadTsFileLoader.java        | 13 ++++-
 .../commons/pipe/receiver/IoTDBFileReceiver.java   | 17 +++++--
 .../commons/pipe/receiver/IoTDBReceiverAgent.java  |  7 ++-
 .../snapshot/PipeSnapshotResourceManager.java      |  9 +++-
 .../org/apache/iotdb/commons/utils/FileUtils.java  | 22 +++++++++
 .../org/apache/iotdb/commons/utils/RetryUtils.java | 48 +++++++++++++++++++
 16 files changed, 297 insertions(+), 61 deletions(-)

diff --git 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/payload/SubscriptionFileHandler.java
 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/payload/SubscriptionFileHandler.java
index 0ec121f9699..8f2b3aaa860 100644
--- 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/payload/SubscriptionFileHandler.java
+++ 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/payload/SubscriptionFileHandler.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.session.subscription.payload;
 
 import 
org.apache.iotdb.rpc.subscription.exception.SubscriptionIncompatibleHandlerException;
+import org.apache.iotdb.session.util.RetryUtils;
 
 import java.io.File;
 import java.io.IOException;
@@ -56,8 +57,11 @@ public abstract class SubscriptionFileHandler implements 
SubscriptionMessageHand
    */
   public synchronized Path deleteFile() throws IOException {
     final Path sourcePath = getPath();
-    Files.delete(sourcePath);
-    return sourcePath;
+    return RetryUtils.retryOnException(
+        () -> {
+          Files.delete(sourcePath);
+          return sourcePath;
+        });
   }
 
   /**
@@ -66,7 +70,7 @@ public abstract class SubscriptionFileHandler implements 
SubscriptionMessageHand
    * @throws IOException if an I/O error occurs
    */
   public synchronized Path moveFile(final String target) throws IOException {
-    return this.moveFile(Paths.get(target));
+    return RetryUtils.retryOnException(() -> this.moveFile(Paths.get(target)));
   }
 
   /**
@@ -78,7 +82,8 @@ public abstract class SubscriptionFileHandler implements 
SubscriptionMessageHand
     if (!Files.exists(target.getParent())) {
       Files.createDirectories(target.getParent());
     }
-    return Files.move(getPath(), target, StandardCopyOption.REPLACE_EXISTING);
+    return RetryUtils.retryOnException(
+        () -> Files.move(getPath(), target, 
StandardCopyOption.REPLACE_EXISTING));
   }
 
   /**
@@ -87,7 +92,7 @@ public abstract class SubscriptionFileHandler implements 
SubscriptionMessageHand
    * @throws IOException if an I/O error occurs
    */
   public synchronized Path copyFile(final String target) throws IOException {
-    return this.copyFile(Paths.get(target));
+    return RetryUtils.retryOnException(() -> this.copyFile(Paths.get(target)));
   }
 
   /**
@@ -99,8 +104,13 @@ public abstract class SubscriptionFileHandler implements 
SubscriptionMessageHand
     if (!Files.exists(target.getParent())) {
       Files.createDirectories(target.getParent());
     }
-    return Files.copy(
-        getPath(), target, StandardCopyOption.REPLACE_EXISTING, 
StandardCopyOption.COPY_ATTRIBUTES);
+    return RetryUtils.retryOnException(
+        () ->
+            Files.copy(
+                getPath(),
+                target,
+                StandardCopyOption.REPLACE_EXISTING,
+                StandardCopyOption.COPY_ATTRIBUTES));
   }
 
   @Override
diff --git 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/util/RetryUtils.java
 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/util/RetryUtils.java
new file mode 100644
index 00000000000..bd767d270b5
--- /dev/null
+++ 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/util/RetryUtils.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.session.util;
+
+public class RetryUtils {
+
+  public interface CallableWithException<T, E extends Exception> {
+    T call() throws E;
+  }
+
+  public static final int MAX_RETRIES = 3;
+
+  public static <T, E extends Exception> T retryOnException(
+      final CallableWithException<T, E> callable) throws E {
+    int attempt = 0;
+    while (true) {
+      try {
+        return callable.call();
+      } catch (Exception e) {
+        attempt++;
+        if (attempt >= MAX_RETRIES) {
+          throw e;
+        }
+      }
+    }
+  }
+
+  private RetryUtils() {
+    // utility class
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventTsFileBatch.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventTsFileBatch.java
index 1162961f567..12322816072 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventTsFileBatch.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventTsFileBatch.java
@@ -210,6 +210,7 @@ public class PipeTabletEventTsFileBatch extends 
PipeTabletEventBatch {
     super.close();
 
     pipeName2WeightMap.clear();
+
     tableModeTsFileBuilder.close();
     treeModeTsFileBuilder.close();
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
index 87f9b964d12..783d5ff7aa5 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import 
org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeTransferCompressedReq;
 import 
org.apache.iotdb.commons.pipe.connector.payload.thrift.response.PipeTransferFilePieceResp;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
+import org.apache.iotdb.commons.utils.RetryUtils;
 import 
org.apache.iotdb.db.pipe.connector.client.IoTDBDataNodeAsyncClientManager;
 import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTsFilePieceReq;
 import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTsFilePieceWithModReq;
@@ -238,7 +239,11 @@ public class PipeTransferTsFileHandler extends 
PipeTransferTrackableHandler {
 
         // Delete current file when using tsFile as batch
         if (events.stream().anyMatch(event -> !(event instanceof 
PipeTsFileInsertionEvent))) {
-          FileUtils.delete(currentFile);
+          RetryUtils.retryOnException(
+              () -> {
+                FileUtils.delete(currentFile);
+                return null;
+              });
         }
       } catch (final IOException e) {
         LOGGER.warn(
@@ -341,7 +346,11 @@ public class PipeTransferTsFileHandler extends 
PipeTransferTrackableHandler {
 
       // Delete current file when using tsFile as batch
       if (events.stream().anyMatch(event -> !(event instanceof 
PipeTsFileInsertionEvent))) {
-        FileUtils.delete(currentFile);
+        RetryUtils.retryOnException(
+            () -> {
+              FileUtils.delete(currentFile);
+              return null;
+            });
       }
     } catch (final IOException e) {
       LOGGER.warn("Failed to close file reader or delete tsFile when failed to 
transfer file.", e);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java
index 93ce77974b9..99b2e71c731 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.pipe.connector.client.IoTDBSyncClient;
 import 
org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeTransferFilePieceReq;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
+import org.apache.iotdb.commons.utils.RetryUtils;
 import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.batch.PipeTabletEventBatch;
 import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.batch.PipeTabletEventPlainBatch;
 import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.batch.PipeTabletEventTsFileBatch;
@@ -309,15 +310,19 @@ public class IoTDBDataRegionSyncConnector extends 
IoTDBDataNodeSyncConnector {
     final List<Pair<String, File>> dbTsFilePairs = 
batchToTransfer.sealTsFiles();
     final Map<Pair<String, Long>, Double> pipe2WeightMap = 
batchToTransfer.deepCopyPipe2WeightMap();
 
-    for (final Pair<String, File> tsFile : dbTsFilePairs) {
-      doTransfer(pipe2WeightMap, tsFile.right, null, tsFile.left);
+    for (final Pair<String, File> dbTsFile : dbTsFilePairs) {
+      doTransfer(pipe2WeightMap, dbTsFile.right, null, dbTsFile.left);
       try {
-        FileUtils.delete(tsFile.right);
+        RetryUtils.retryOnException(
+            () -> {
+              FileUtils.delete(dbTsFile.right);
+              return null;
+            });
       } catch (final NoSuchFileException e) {
-        LOGGER.info("The file {} is not found, may already be deleted.", 
tsFile);
+        LOGGER.info("The file {} is not found, may already be deleted.", 
dbTsFile);
       } catch (final Exception e) {
         LOGGER.warn(
-            "Failed to delete batch file {}, this file should be deleted 
manually later", tsFile);
+            "Failed to delete batch file {}, this file should be deleted 
manually later", dbTsFile);
       }
     }
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
index 6bf7b7b7f6e..eb7491e460e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
@@ -31,6 +31,7 @@ import 
org.apache.iotdb.commons.pipe.connector.payload.pipeconsensus.response.Pi
 import 
org.apache.iotdb.commons.pipe.connector.payload.thrift.response.PipeTransferFilePieceResp;
 import org.apache.iotdb.commons.pipe.receiver.IoTDBReceiverAgent;
 import org.apache.iotdb.commons.service.metric.MetricService;
+import org.apache.iotdb.commons.utils.RetryUtils;
 import org.apache.iotdb.consensus.exception.ConsensusGroupNotExistException;
 import org.apache.iotdb.consensus.pipe.PipeConsensus;
 import org.apache.iotdb.consensus.pipe.PipeConsensusServerImpl;
@@ -88,8 +89,6 @@ import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.stream.Collectors;
 
-import static 
org.apache.iotdb.db.storageengine.dataregion.utils.TsFileResourceUtils.generateTsFileResource;
-
 public class PipeConsensusReceiver {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeConsensusReceiver.class);
   private static final IoTDBConfig IOTDB_CONFIG = 
IoTDBDescriptor.getInstance().getConfig();
@@ -849,7 +848,7 @@ public class PipeConsensusReceiver {
   private void deleteFile(File file) {
     if (file.exists()) {
       try {
-        FileUtils.delete(file);
+        RetryUtils.retryOnException(() -> FileUtils.delete(file));
         LOGGER.info(
             "PipeConsensus-PipeName-{}: Original writing file {} was deleted.",
             consensusPipeName,
@@ -943,7 +942,11 @@ public class PipeConsensusReceiver {
     if (receiverFileDirWithIdSuffix.get() != null) {
       if (receiverFileDirWithIdSuffix.get().exists()) {
         try {
-          FileUtils.deleteDirectory(receiverFileDirWithIdSuffix.get());
+          RetryUtils.retryOnException(
+              () -> {
+                FileUtils.deleteDirectory(receiverFileDirWithIdSuffix.get());
+                return null;
+              });
           LOGGER.info(
               "PipeConsensus-PipeName-{}: Original receiver file dir {} was 
deleted successfully.",
               consensusPipeName,
@@ -1005,7 +1008,11 @@ public class PipeConsensusReceiver {
     }
     // Remove exists dir
     if (newReceiverDir.exists()) {
-      FileUtils.deleteDirectory(newReceiverDir);
+      RetryUtils.retryOnException(
+          () -> {
+            FileUtils.deleteDirectory(newReceiverDir);
+            return null;
+          });
       LOGGER.info(
           "PipeConsensus-PipeName-{}: Origin receiver file dir {} was 
deleted.",
           consensusPipeName,
@@ -1036,7 +1043,11 @@ public class PipeConsensusReceiver {
     if (receiverFileDirWithIdSuffix.get() != null) {
       if (receiverFileDirWithIdSuffix.get().exists()) {
         try {
-          FileUtils.deleteDirectory(receiverFileDirWithIdSuffix.get());
+          RetryUtils.retryOnException(
+              () -> {
+                FileUtils.deleteDirectory(receiverFileDirWithIdSuffix.get());
+                return null;
+              });
           LOGGER.info(
               "PipeConsensus-PipeName-{}: Receiver exit: Original receiver 
file dir {} was deleted.",
               consensusPipeName,
@@ -1172,7 +1183,11 @@ public class PipeConsensusReceiver {
       File tsFileWriterDirectory = new File(this.localWritingDirPath);
       // Remove exists dir
       if (tsFileWriterDirectory.exists()) {
-        FileUtils.deleteDirectory(tsFileWriterDirectory);
+        RetryUtils.retryOnException(
+            () -> {
+              FileUtils.deleteDirectory(tsFileWriterDirectory);
+              return null;
+            });
         LOGGER.info(
             "PipeConsensus-PipeName-{}: Origin receiver tsFileWriter-{} file 
dir {} was deleted.",
             consensusPipeName,
@@ -1267,7 +1282,7 @@ public class PipeConsensusReceiver {
       // close file
       if (writingFile != null) {
         try {
-          FileUtils.delete(writingFile);
+          RetryUtils.retryOnException(() -> FileUtils.delete(writingFile));
           LOGGER.info(
               "PipeConsensus-PipeName-{}: TsFileWriter exit: Writing file {} 
was deleted.",
               consensusPipeName,
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
index a87e229a03a..b6d5d525cf6 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
@@ -35,6 +35,7 @@ import 
org.apache.iotdb.commons.pipe.receiver.IoTDBFileReceiver;
 import org.apache.iotdb.commons.pipe.receiver.PipeReceiverStatusHandler;
 import org.apache.iotdb.commons.schema.column.ColumnHeaderConstant;
 import org.apache.iotdb.commons.utils.FileUtils;
+import org.apache.iotdb.commons.utils.RetryUtils;
 import org.apache.iotdb.commons.utils.StatusUtils;
 import org.apache.iotdb.confignode.rpc.thrift.TDatabaseSchema;
 import org.apache.iotdb.db.auth.AuthorityChecker;
@@ -559,7 +560,11 @@ public class IoTDBDataNodeReceiver extends 
IoTDBFileReceiver {
       final File sourceFile = new File(absolutePath);
       if (!Objects.equals(
           loadActiveListeningPipeDir, 
sourceFile.getParentFile().getAbsolutePath())) {
-        FileUtils.moveFileWithMD5Check(sourceFile, new 
File(loadActiveListeningPipeDir));
+        RetryUtils.retryOnException(
+            () -> {
+              FileUtils.moveFileWithMD5Check(sourceFile, new 
File(loadActiveListeningPipeDir));
+              return null;
+            });
       }
     }
     return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index fdb4e771148..54b469de3fe 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -34,6 +34,7 @@ import 
org.apache.iotdb.commons.service.metric.PerformanceOverviewMetrics;
 import org.apache.iotdb.commons.service.metric.enums.Metric;
 import org.apache.iotdb.commons.service.metric.enums.Tag;
 import org.apache.iotdb.commons.utils.CommonDateTimeUtils;
+import org.apache.iotdb.commons.utils.RetryUtils;
 import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.commons.utils.TimePartitionUtils;
 import org.apache.iotdb.consensus.ConsensusFactory;
@@ -3025,8 +3026,7 @@ public class DataRegion implements IDataRegionForQuery {
       final boolean deleteOriginFile,
       boolean isGeneratedByPipe)
       throws LoadFileException, DiskSpaceInsufficientException {
-    final File targetFile;
-    targetFile =
+    final File targetFile =
         fsFactory.getFile(
             TierManager.getInstance().getNextFolderForTsFile(0, false),
             databaseName
@@ -3038,7 +3038,7 @@ public class DataRegion implements IDataRegionForQuery {
                 + tsFileResource.getTsFile().getName());
     tsFileResource.setFile(targetFile);
     if (tsFileManager.contains(tsFileResource, false)) {
-      logger.error("The file {} has already been loaded in unsequence list", 
tsFileResource);
+      logger.warn("The file {} has already been loaded in unsequence list", 
tsFileResource);
       return false;
     }
 
@@ -3055,12 +3055,20 @@ public class DataRegion implements IDataRegionForQuery {
     }
     try {
       if (deleteOriginFile) {
-        FileUtils.moveFile(tsFileToLoad, targetFile);
+        RetryUtils.retryOnException(
+            () -> {
+              FileUtils.moveFile(tsFileToLoad, targetFile);
+              return null;
+            });
       } else {
-        Files.copy(tsFileToLoad.toPath(), targetFile.toPath());
+        RetryUtils.retryOnException(
+            () -> {
+              Files.copy(tsFileToLoad.toPath(), targetFile.toPath());
+              return null;
+            });
       }
     } catch (final IOException e) {
-      logger.error(
+      logger.warn(
           "File renaming failed when loading tsfile. Origin: {}, Target: {}",
           tsFileToLoad.getAbsolutePath(),
           targetFile.getAbsolutePath(),
@@ -3077,13 +3085,20 @@ public class DataRegion implements IDataRegionForQuery {
         fsFactory.getFile(targetFile.getAbsolutePath() + RESOURCE_SUFFIX);
     try {
       if (deleteOriginFile) {
-        FileUtils.moveFile(resourceFileToLoad, targetResourceFile);
+        RetryUtils.retryOnException(
+            () -> {
+              FileUtils.moveFile(resourceFileToLoad, targetResourceFile);
+              return null;
+            });
       } else {
-        Files.copy(resourceFileToLoad.toPath(), targetResourceFile.toPath());
+        RetryUtils.retryOnException(
+            () -> {
+              Files.copy(resourceFileToLoad.toPath(), 
targetResourceFile.toPath());
+              return null;
+            });
       }
-
     } catch (final IOException e) {
-      logger.error(
+      logger.warn(
           "File renaming failed when loading .resource file. Origin: {}, 
Target: {}",
           resourceFileToLoad.getAbsolutePath(),
           targetResourceFile.getAbsolutePath(),
@@ -3133,18 +3148,30 @@ public class DataRegion implements IDataRegionForQuery {
       // when successfully loaded, the filepath of the resource will be 
changed to the IoTDB data
       // dir, so we can add a suffix to find the old modification file.
       try {
-        Files.deleteIfExists(targetModFile.toPath());
+        RetryUtils.retryOnException(
+            () -> {
+              Files.deleteIfExists(targetModFile.toPath());
+              return null;
+            });
       } catch (final IOException e) {
         logger.warn("Cannot delete localModFile {}", targetModFile, e);
       }
       try {
         if (deleteOriginFile) {
-          FileUtils.moveFile(modFileToLoad, targetModFile);
+          RetryUtils.retryOnException(
+              () -> {
+                FileUtils.moveFile(modFileToLoad, targetModFile);
+                return null;
+              });
         } else {
-          Files.copy(modFileToLoad.toPath(), targetModFile.toPath());
+          RetryUtils.retryOnException(
+              () -> {
+                Files.copy(modFileToLoad.toPath(), targetModFile.toPath());
+                return null;
+              });
         }
       } catch (final IOException e) {
-        logger.error(
+        logger.warn(
             "File renaming failed when loading .mod file. Origin: {}, Target: 
{}",
             modFileToLoad.getAbsolutePath(),
             targetModFile.getAbsolutePath(),
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java
index c4dc75f591b..b88ab2c6520 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java
@@ -29,6 +29,7 @@ import org.apache.iotdb.commons.service.metric.MetricService;
 import org.apache.iotdb.commons.service.metric.enums.Metric;
 import org.apache.iotdb.commons.service.metric.enums.Tag;
 import org.apache.iotdb.commons.utils.FileUtils;
+import org.apache.iotdb.commons.utils.RetryUtils;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.consensus.DataRegionConsensusImpl;
@@ -386,7 +387,7 @@ public class LoadTsFileManager {
 
     private void clearDir(File dir) {
       if (dir.exists()) {
-        FileUtils.deleteFileOrDirectory(dir);
+        FileUtils.deleteFileOrDirectoryWithRetry(dir);
       }
       if (dir.mkdirs()) {
         LOGGER.info("Load TsFile dir {} is created.", dir.getPath());
@@ -563,7 +564,11 @@ public class LoadTsFileManager {
             }
             final Path writerPath = writer.getFile().toPath();
             if (Files.exists(writerPath)) {
-              Files.delete(writerPath);
+              RetryUtils.retryOnException(
+                  () -> {
+                    Files.delete(writerPath);
+                    return null;
+                  });
             }
           } catch (IOException e) {
             LOGGER.warn("Close TsFileIOWriter {} error.", 
entry.getValue().getFile().getPath(), e);
@@ -578,7 +583,11 @@ public class LoadTsFileManager {
             modificationFile.close();
             final Path modificationFilePath = 
modificationFile.getFile().toPath();
             if (Files.exists(modificationFilePath)) {
-              Files.delete(modificationFilePath);
+              RetryUtils.retryOnException(
+                  () -> {
+                    Files.delete(modificationFilePath);
+                    return null;
+                  });
             }
           } catch (IOException e) {
             LOGGER.warn("Close ModificationFile {} error.", 
entry.getValue().getFile(), e);
@@ -586,7 +595,11 @@ public class LoadTsFileManager {
         }
       }
       try {
-        Files.delete(taskDir.toPath());
+        RetryUtils.retryOnException(
+            () -> {
+              Files.delete(taskDir.toPath());
+              return null;
+            });
       } catch (DirectoryNotEmptyException e) {
         LOGGER.info("Task dir {} is not empty, skip deleting.", 
taskDir.getPath());
       } catch (IOException e) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadDirScanner.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadDirScanner.java
index c5fc2b59b96..34bb392dc28 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadDirScanner.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadDirScanner.java
@@ -102,17 +102,22 @@ public class ActiveLoadDirScanner extends 
ActiveLoadScheduledExecutorService {
           listeningDir.equals(IOTDB_CONFIG.getLoadActiveListeningPipeDir());
       try (final Stream<File> fileStream =
           FileUtils.streamFiles(new File(listeningDir), true, (String[]) 
null)) {
-        fileStream
-            .filter(file -> 
!activeLoadTsFileLoader.isFilePendingOrLoading(file))
-            .filter(File::exists)
-            .map(
-                file ->
-                    (file.getName().endsWith(RESOURCE) || 
file.getName().endsWith(MODS))
-                        ? getTsFilePath(file.getAbsolutePath())
-                        : file.getAbsolutePath())
-            .filter(this::isTsFileCompleted)
-            .limit(currentAllowedPendingSize)
-            .forEach(file -> activeLoadTsFileLoader.tryTriggerTsFileLoad(file, 
isGeneratedByPipe));
+        try {
+          fileStream
+              .filter(file -> 
!activeLoadTsFileLoader.isFilePendingOrLoading(file))
+              .filter(File::exists)
+              .map(
+                  file ->
+                      (file.getName().endsWith(RESOURCE) || 
file.getName().endsWith(MODS))
+                          ? getTsFilePath(file.getAbsolutePath())
+                          : file.getAbsolutePath())
+              .filter(this::isTsFileCompleted)
+              .limit(currentAllowedPendingSize)
+              .forEach(
+                  file -> activeLoadTsFileLoader.tryTriggerTsFileLoad(file, 
isGeneratedByPipe));
+        } catch (final Exception e) {
+          LOGGER.warn("Exception occurred during scanning dir: {}", 
listeningDir, e);
+        }
       }
     }
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java
index 188ae30436d..8cb79658b66 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.commons.concurrent.IoTThreadFactory;
 import org.apache.iotdb.commons.concurrent.ThreadName;
 import 
org.apache.iotdb.commons.concurrent.threadpool.WrappedThreadPoolExecutor;
 import org.apache.iotdb.commons.conf.CommonDescriptor;
+import org.apache.iotdb.commons.utils.RetryUtils;
 import org.apache.iotdb.db.auth.AuthorityChecker;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -94,7 +95,11 @@ public class ActiveLoadTsFileLoader {
         if (!Objects.equals(failDir.get(), 
IOTDB_CONFIG.getLoadActiveListeningFailDir())) {
           final File failDirFile = new 
File(IOTDB_CONFIG.getLoadActiveListeningFailDir());
           try {
-            FileUtils.forceMkdir(failDirFile);
+            RetryUtils.retryOnException(
+                () -> {
+                  FileUtils.forceMkdir(failDirFile);
+                  return null;
+                });
           } catch (final IOException e) {
             LOGGER.warn(
                 "Error occurred during creating fail directory {} for active 
load.",
@@ -258,7 +263,11 @@ public class ActiveLoadTsFileLoader {
 
     final File targetDir = new File(failDir.get());
     try {
-      
org.apache.iotdb.commons.utils.FileUtils.moveFileWithMD5Check(sourceFile, 
targetDir);
+      RetryUtils.retryOnException(
+          () -> {
+            
org.apache.iotdb.commons.utils.FileUtils.moveFileWithMD5Check(sourceFile, 
targetDir);
+            return null;
+          });
     } catch (final IOException e) {
       LOGGER.warn("Error occurred during moving file {} to fail directory.", 
filePath, e);
     }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
index 23e89ddb7d7..336670f593f 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
@@ -33,6 +33,7 @@ import 
org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeTransf
 import 
org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeTransferHandshakeV1Req;
 import 
org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeTransferHandshakeV2Req;
 import 
org.apache.iotdb.commons.pipe.connector.payload.thrift.response.PipeTransferFilePieceResp;
+import org.apache.iotdb.commons.utils.RetryUtils;
 import org.apache.iotdb.commons.utils.StatusUtils;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
@@ -112,7 +113,11 @@ public abstract class IoTDBFileReceiver implements 
IoTDBReceiver {
     if (receiverFileDirWithIdSuffix.get() != null) {
       if (receiverFileDirWithIdSuffix.get().exists()) {
         try {
-          FileUtils.deleteDirectory(receiverFileDirWithIdSuffix.get());
+          RetryUtils.retryOnException(
+              () -> {
+                FileUtils.deleteDirectory(receiverFileDirWithIdSuffix.get());
+                return null;
+              });
           LOGGER.info(
               "Receiver id = {}: Original receiver file dir {} was deleted.",
               receiverId.get(),
@@ -440,7 +445,7 @@ public abstract class IoTDBFileReceiver implements 
IoTDBReceiver {
   private void deleteFile(final File file) {
     if (file.exists()) {
       try {
-        FileUtils.delete(file);
+        RetryUtils.retryOnException(() -> FileUtils.delete(file));
         LOGGER.info(
             "Receiver id = {}: Original writing file {} was deleted.",
             receiverId.get(),
@@ -749,7 +754,7 @@ public abstract class IoTDBFileReceiver implements 
IoTDBReceiver {
 
     if (writingFile != null) {
       try {
-        FileUtils.delete(writingFile);
+        RetryUtils.retryOnException(() -> FileUtils.delete(writingFile));
         LOGGER.info(
             "Receiver id = {}: Handling exit: Writing file {} was deleted.",
             receiverId.get(),
@@ -774,7 +779,11 @@ public abstract class IoTDBFileReceiver implements 
IoTDBReceiver {
     if (receiverFileDirWithIdSuffix.get() != null) {
       if (receiverFileDirWithIdSuffix.get().exists()) {
         try {
-          FileUtils.deleteDirectory(receiverFileDirWithIdSuffix.get());
+          RetryUtils.retryOnException(
+              () -> {
+                FileUtils.deleteDirectory(receiverFileDirWithIdSuffix.get());
+                return null;
+              });
           LOGGER.info(
               "Receiver id = {}: Handling exit: Original receiver file dir {} 
was deleted.",
               receiverId.get(),
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBReceiverAgent.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBReceiverAgent.java
index c09812f2003..7aaa66a8b75 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBReceiverAgent.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBReceiverAgent.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.commons.pipe.receiver;
 
+import org.apache.iotdb.commons.utils.RetryUtils;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
@@ -114,7 +115,11 @@ public abstract class IoTDBReceiverAgent {
 
   public static void cleanPipeReceiverDir(final File receiverFileDir) {
     try {
-      FileUtils.deleteDirectory(receiverFileDir);
+      RetryUtils.retryOnException(
+          () -> {
+            FileUtils.deleteDirectory(receiverFileDir);
+            return null;
+          });
       LOGGER.info("Clean pipe receiver dir {} successfully.", receiverFileDir);
     } catch (final Exception e) {
       LOGGER.warn("Clean pipe receiver dir {} failed.", receiverFileDir, e);
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/resource/snapshot/PipeSnapshotResourceManager.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/resource/snapshot/PipeSnapshotResourceManager.java
index 6b134d17205..24152a14da9 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/resource/snapshot/PipeSnapshotResourceManager.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/resource/snapshot/PipeSnapshotResourceManager.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.commons.pipe.resource.snapshot;
 
 import org.apache.iotdb.commons.utils.FileUtils;
+import org.apache.iotdb.commons.utils.RetryUtils;
 import org.apache.iotdb.commons.utils.TestOnly;
 
 import org.slf4j.Logger;
@@ -75,7 +76,11 @@ public abstract class PipeSnapshotResourceManager {
       }
 
       // Otherwise, copy the snapshot to pipe dir
-      FileUtils.copyFile(new File(snapshotPath), new File(copiedFilePath));
+      RetryUtils.retryOnException(
+          () -> {
+            FileUtils.copyFile(new File(snapshotPath), new 
File(copiedFilePath));
+            return null;
+          });
       copiedSnapshotPath2ReferenceCountMap.put(copiedFilePath, new 
AtomicLong(1));
       return copiedFilePath;
     } finally {
@@ -140,7 +145,7 @@ public abstract class PipeSnapshotResourceManager {
       final long count = referenceCount.decrementAndGet();
       if (count == 0) {
         copiedSnapshotPath2ReferenceCountMap.remove(snapshotPath);
-        FileUtils.deleteFileOrDirectory(new File(snapshotPath));
+        FileUtils.deleteFileOrDirectoryWithRetry(new File(snapshotPath));
       }
     } finally {
       lock.unlock();
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/FileUtils.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/FileUtils.java
index 99a7b4e7589..f1ae48c9956 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/FileUtils.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/FileUtils.java
@@ -86,6 +86,28 @@ public class FileUtils {
     }
   }
 
+  public static void deleteFileOrDirectoryWithRetry(File file) {
+    if (file.isDirectory()) {
+      File[] files = file.listFiles();
+      if (files != null) {
+        for (File subfile : files) {
+          deleteFileOrDirectoryWithRetry(subfile);
+        }
+      }
+    }
+    try {
+      RetryUtils.retryOnException(
+          () -> {
+            Files.delete(file.toPath());
+            return null;
+          });
+    } catch (DirectoryNotEmptyException e) {
+      LOGGER.warn("{}: {}", e.getMessage(), Arrays.toString(file.list()), e);
+    } catch (Exception e) {
+      LOGGER.warn("{}: {}", e.getMessage(), file.getName(), e);
+    }
+  }
+
   public static void deleteDirectoryAndEmptyParent(File folder) {
     deleteFileOrDirectory(folder);
     final File parentFolder = folder.getParentFile();
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/RetryUtils.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/RetryUtils.java
new file mode 100644
index 00000000000..19a6456ec30
--- /dev/null
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/RetryUtils.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.utils;
+
+public class RetryUtils {
+
+  public interface CallableWithException<T, E extends Exception> {
+    T call() throws E;
+  }
+
+  public static final int MAX_RETRIES = 3;
+
+  public static <T, E extends Exception> T retryOnException(
+      final CallableWithException<T, E> callable) throws E {
+    int attempt = 0;
+    while (true) {
+      try {
+        return callable.call();
+      } catch (Exception e) {
+        attempt++;
+        if (attempt >= MAX_RETRIES) {
+          throw e;
+        }
+      }
+    }
+  }
+
+  private RetryUtils() {
+    // utility class
+  }
+}


Reply via email to