This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 487a1a21ce9 Pipe & Load: Retry when a file operation causes an
exception (#14598)
487a1a21ce9 is described below
commit 487a1a21ce9fd9c0708c25c72336d5eb646d62a0
Author: nanxiang xia <[email protected]>
AuthorDate: Wed Jan 8 10:21:37 2025 +0800
Pipe & Load: Retry when a file operation causes an exception (#14598)
Co-authored-by: Steve Yurong Su <[email protected]>
---
.../payload/SubscriptionFileHandler.java | 24 +++++++---
.../org/apache/iotdb/session/util/RetryUtils.java | 48 +++++++++++++++++++
.../batch/PipeTabletEventTsFileBatch.java | 1 +
.../async/handler/PipeTransferTsFileHandler.java | 13 ++++-
.../thrift/sync/IoTDBDataRegionSyncConnector.java | 15 ++++--
.../pipeconsensus/PipeConsensusReceiver.java | 31 ++++++++----
.../protocol/thrift/IoTDBDataNodeReceiver.java | 7 ++-
.../db/storageengine/dataregion/DataRegion.java | 55 ++++++++++++++++------
.../db/storageengine/load/LoadTsFileManager.java | 21 +++++++--
.../load/active/ActiveLoadDirScanner.java | 27 ++++++-----
.../load/active/ActiveLoadTsFileLoader.java | 13 ++++-
.../commons/pipe/receiver/IoTDBFileReceiver.java | 17 +++++--
.../commons/pipe/receiver/IoTDBReceiverAgent.java | 7 ++-
.../snapshot/PipeSnapshotResourceManager.java | 9 +++-
.../org/apache/iotdb/commons/utils/FileUtils.java | 22 +++++++++
.../org/apache/iotdb/commons/utils/RetryUtils.java | 48 +++++++++++++++++++
16 files changed, 297 insertions(+), 61 deletions(-)
diff --git
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/payload/SubscriptionFileHandler.java
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/payload/SubscriptionFileHandler.java
index 0ec121f9699..8f2b3aaa860 100644
---
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/payload/SubscriptionFileHandler.java
+++
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/payload/SubscriptionFileHandler.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.session.subscription.payload;
import
org.apache.iotdb.rpc.subscription.exception.SubscriptionIncompatibleHandlerException;
+import org.apache.iotdb.session.util.RetryUtils;
import java.io.File;
import java.io.IOException;
@@ -56,8 +57,11 @@ public abstract class SubscriptionFileHandler implements
SubscriptionMessageHand
*/
public synchronized Path deleteFile() throws IOException {
final Path sourcePath = getPath();
- Files.delete(sourcePath);
- return sourcePath;
+ return RetryUtils.retryOnException(
+ () -> {
+ Files.delete(sourcePath);
+ return sourcePath;
+ });
}
/**
@@ -66,7 +70,7 @@ public abstract class SubscriptionFileHandler implements
SubscriptionMessageHand
* @throws IOException if an I/O error occurs
*/
public synchronized Path moveFile(final String target) throws IOException {
- return this.moveFile(Paths.get(target));
+ return RetryUtils.retryOnException(() -> this.moveFile(Paths.get(target)));
}
/**
@@ -78,7 +82,8 @@ public abstract class SubscriptionFileHandler implements
SubscriptionMessageHand
if (!Files.exists(target.getParent())) {
Files.createDirectories(target.getParent());
}
- return Files.move(getPath(), target, StandardCopyOption.REPLACE_EXISTING);
+ return RetryUtils.retryOnException(
+ () -> Files.move(getPath(), target,
StandardCopyOption.REPLACE_EXISTING));
}
/**
@@ -87,7 +92,7 @@ public abstract class SubscriptionFileHandler implements
SubscriptionMessageHand
* @throws IOException if an I/O error occurs
*/
public synchronized Path copyFile(final String target) throws IOException {
- return this.copyFile(Paths.get(target));
+ return RetryUtils.retryOnException(() -> this.copyFile(Paths.get(target)));
}
/**
@@ -99,8 +104,13 @@ public abstract class SubscriptionFileHandler implements
SubscriptionMessageHand
if (!Files.exists(target.getParent())) {
Files.createDirectories(target.getParent());
}
- return Files.copy(
- getPath(), target, StandardCopyOption.REPLACE_EXISTING,
StandardCopyOption.COPY_ATTRIBUTES);
+ return RetryUtils.retryOnException(
+ () ->
+ Files.copy(
+ getPath(),
+ target,
+ StandardCopyOption.REPLACE_EXISTING,
+ StandardCopyOption.COPY_ATTRIBUTES));
}
@Override
diff --git
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/util/RetryUtils.java
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/util/RetryUtils.java
new file mode 100644
index 00000000000..bd767d270b5
--- /dev/null
+++
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/util/RetryUtils.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.session.util;
+
+public class RetryUtils {
+
+ public interface CallableWithException<T, E extends Exception> {
+ T call() throws E;
+ }
+
+ public static final int MAX_RETRIES = 3;
+
+ public static <T, E extends Exception> T retryOnException(
+ final CallableWithException<T, E> callable) throws E {
+ int attempt = 0;
+ while (true) {
+ try {
+ return callable.call();
+ } catch (Exception e) {
+ attempt++;
+ if (attempt >= MAX_RETRIES) {
+ throw e;
+ }
+ }
+ }
+ }
+
+ private RetryUtils() {
+ // utility class
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventTsFileBatch.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventTsFileBatch.java
index 1162961f567..12322816072 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventTsFileBatch.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventTsFileBatch.java
@@ -210,6 +210,7 @@ public class PipeTabletEventTsFileBatch extends
PipeTabletEventBatch {
super.close();
pipeName2WeightMap.clear();
+
tableModeTsFileBuilder.close();
treeModeTsFileBuilder.close();
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
index 87f9b964d12..783d5ff7aa5 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.commons.pipe.config.PipeConfig;
import
org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeTransferCompressedReq;
import
org.apache.iotdb.commons.pipe.connector.payload.thrift.response.PipeTransferFilePieceResp;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
+import org.apache.iotdb.commons.utils.RetryUtils;
import
org.apache.iotdb.db.pipe.connector.client.IoTDBDataNodeAsyncClientManager;
import
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTsFilePieceReq;
import
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTsFilePieceWithModReq;
@@ -238,7 +239,11 @@ public class PipeTransferTsFileHandler extends
PipeTransferTrackableHandler {
// Delete current file when using tsFile as batch
if (events.stream().anyMatch(event -> !(event instanceof
PipeTsFileInsertionEvent))) {
- FileUtils.delete(currentFile);
+ RetryUtils.retryOnException(
+ () -> {
+ FileUtils.delete(currentFile);
+ return null;
+ });
}
} catch (final IOException e) {
LOGGER.warn(
@@ -341,7 +346,11 @@ public class PipeTransferTsFileHandler extends
PipeTransferTrackableHandler {
// Delete current file when using tsFile as batch
if (events.stream().anyMatch(event -> !(event instanceof
PipeTsFileInsertionEvent))) {
- FileUtils.delete(currentFile);
+ RetryUtils.retryOnException(
+ () -> {
+ FileUtils.delete(currentFile);
+ return null;
+ });
}
} catch (final IOException e) {
LOGGER.warn("Failed to close file reader or delete tsFile when failed to
transfer file.", e);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java
index 93ce77974b9..99b2e71c731 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.pipe.connector.client.IoTDBSyncClient;
import
org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeTransferFilePieceReq;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
+import org.apache.iotdb.commons.utils.RetryUtils;
import
org.apache.iotdb.db.pipe.connector.payload.evolvable.batch.PipeTabletEventBatch;
import
org.apache.iotdb.db.pipe.connector.payload.evolvable.batch.PipeTabletEventPlainBatch;
import
org.apache.iotdb.db.pipe.connector.payload.evolvable.batch.PipeTabletEventTsFileBatch;
@@ -309,15 +310,19 @@ public class IoTDBDataRegionSyncConnector extends
IoTDBDataNodeSyncConnector {
final List<Pair<String, File>> dbTsFilePairs =
batchToTransfer.sealTsFiles();
final Map<Pair<String, Long>, Double> pipe2WeightMap =
batchToTransfer.deepCopyPipe2WeightMap();
- for (final Pair<String, File> tsFile : dbTsFilePairs) {
- doTransfer(pipe2WeightMap, tsFile.right, null, tsFile.left);
+ for (final Pair<String, File> dbTsFile : dbTsFilePairs) {
+ doTransfer(pipe2WeightMap, dbTsFile.right, null, dbTsFile.left);
try {
- FileUtils.delete(tsFile.right);
+ RetryUtils.retryOnException(
+ () -> {
+ FileUtils.delete(dbTsFile.right);
+ return null;
+ });
} catch (final NoSuchFileException e) {
- LOGGER.info("The file {} is not found, may already be deleted.",
tsFile);
+ LOGGER.info("The file {} is not found, may already be deleted.",
dbTsFile);
} catch (final Exception e) {
LOGGER.warn(
- "Failed to delete batch file {}, this file should be deleted
manually later", tsFile);
+ "Failed to delete batch file {}, this file should be deleted
manually later", dbTsFile);
}
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
index 6bf7b7b7f6e..eb7491e460e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
@@ -31,6 +31,7 @@ import
org.apache.iotdb.commons.pipe.connector.payload.pipeconsensus.response.Pi
import
org.apache.iotdb.commons.pipe.connector.payload.thrift.response.PipeTransferFilePieceResp;
import org.apache.iotdb.commons.pipe.receiver.IoTDBReceiverAgent;
import org.apache.iotdb.commons.service.metric.MetricService;
+import org.apache.iotdb.commons.utils.RetryUtils;
import org.apache.iotdb.consensus.exception.ConsensusGroupNotExistException;
import org.apache.iotdb.consensus.pipe.PipeConsensus;
import org.apache.iotdb.consensus.pipe.PipeConsensusServerImpl;
@@ -88,8 +89,6 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
-import static
org.apache.iotdb.db.storageengine.dataregion.utils.TsFileResourceUtils.generateTsFileResource;
-
public class PipeConsensusReceiver {
private static final Logger LOGGER =
LoggerFactory.getLogger(PipeConsensusReceiver.class);
private static final IoTDBConfig IOTDB_CONFIG =
IoTDBDescriptor.getInstance().getConfig();
@@ -849,7 +848,7 @@ public class PipeConsensusReceiver {
private void deleteFile(File file) {
if (file.exists()) {
try {
- FileUtils.delete(file);
+ RetryUtils.retryOnException(() -> FileUtils.delete(file));
LOGGER.info(
"PipeConsensus-PipeName-{}: Original writing file {} was deleted.",
consensusPipeName,
@@ -943,7 +942,11 @@ public class PipeConsensusReceiver {
if (receiverFileDirWithIdSuffix.get() != null) {
if (receiverFileDirWithIdSuffix.get().exists()) {
try {
- FileUtils.deleteDirectory(receiverFileDirWithIdSuffix.get());
+ RetryUtils.retryOnException(
+ () -> {
+ FileUtils.deleteDirectory(receiverFileDirWithIdSuffix.get());
+ return null;
+ });
LOGGER.info(
"PipeConsensus-PipeName-{}: Original receiver file dir {} was
deleted successfully.",
consensusPipeName,
@@ -1005,7 +1008,11 @@ public class PipeConsensusReceiver {
}
// Remove exists dir
if (newReceiverDir.exists()) {
- FileUtils.deleteDirectory(newReceiverDir);
+ RetryUtils.retryOnException(
+ () -> {
+ FileUtils.deleteDirectory(newReceiverDir);
+ return null;
+ });
LOGGER.info(
"PipeConsensus-PipeName-{}: Origin receiver file dir {} was
deleted.",
consensusPipeName,
@@ -1036,7 +1043,11 @@ public class PipeConsensusReceiver {
if (receiverFileDirWithIdSuffix.get() != null) {
if (receiverFileDirWithIdSuffix.get().exists()) {
try {
- FileUtils.deleteDirectory(receiverFileDirWithIdSuffix.get());
+ RetryUtils.retryOnException(
+ () -> {
+ FileUtils.deleteDirectory(receiverFileDirWithIdSuffix.get());
+ return null;
+ });
LOGGER.info(
"PipeConsensus-PipeName-{}: Receiver exit: Original receiver
file dir {} was deleted.",
consensusPipeName,
@@ -1172,7 +1183,11 @@ public class PipeConsensusReceiver {
File tsFileWriterDirectory = new File(this.localWritingDirPath);
// Remove exists dir
if (tsFileWriterDirectory.exists()) {
- FileUtils.deleteDirectory(tsFileWriterDirectory);
+ RetryUtils.retryOnException(
+ () -> {
+ FileUtils.deleteDirectory(tsFileWriterDirectory);
+ return null;
+ });
LOGGER.info(
"PipeConsensus-PipeName-{}: Origin receiver tsFileWriter-{} file
dir {} was deleted.",
consensusPipeName,
@@ -1267,7 +1282,7 @@ public class PipeConsensusReceiver {
// close file
if (writingFile != null) {
try {
- FileUtils.delete(writingFile);
+ RetryUtils.retryOnException(() -> FileUtils.delete(writingFile));
LOGGER.info(
"PipeConsensus-PipeName-{}: TsFileWriter exit: Writing file {}
was deleted.",
consensusPipeName,
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
index a87e229a03a..b6d5d525cf6 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
@@ -35,6 +35,7 @@ import
org.apache.iotdb.commons.pipe.receiver.IoTDBFileReceiver;
import org.apache.iotdb.commons.pipe.receiver.PipeReceiverStatusHandler;
import org.apache.iotdb.commons.schema.column.ColumnHeaderConstant;
import org.apache.iotdb.commons.utils.FileUtils;
+import org.apache.iotdb.commons.utils.RetryUtils;
import org.apache.iotdb.commons.utils.StatusUtils;
import org.apache.iotdb.confignode.rpc.thrift.TDatabaseSchema;
import org.apache.iotdb.db.auth.AuthorityChecker;
@@ -559,7 +560,11 @@ public class IoTDBDataNodeReceiver extends
IoTDBFileReceiver {
final File sourceFile = new File(absolutePath);
if (!Objects.equals(
loadActiveListeningPipeDir,
sourceFile.getParentFile().getAbsolutePath())) {
- FileUtils.moveFileWithMD5Check(sourceFile, new
File(loadActiveListeningPipeDir));
+ RetryUtils.retryOnException(
+ () -> {
+ FileUtils.moveFileWithMD5Check(sourceFile, new
File(loadActiveListeningPipeDir));
+ return null;
+ });
}
}
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index fdb4e771148..54b469de3fe 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -34,6 +34,7 @@ import
org.apache.iotdb.commons.service.metric.PerformanceOverviewMetrics;
import org.apache.iotdb.commons.service.metric.enums.Metric;
import org.apache.iotdb.commons.service.metric.enums.Tag;
import org.apache.iotdb.commons.utils.CommonDateTimeUtils;
+import org.apache.iotdb.commons.utils.RetryUtils;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.commons.utils.TimePartitionUtils;
import org.apache.iotdb.consensus.ConsensusFactory;
@@ -3025,8 +3026,7 @@ public class DataRegion implements IDataRegionForQuery {
final boolean deleteOriginFile,
boolean isGeneratedByPipe)
throws LoadFileException, DiskSpaceInsufficientException {
- final File targetFile;
- targetFile =
+ final File targetFile =
fsFactory.getFile(
TierManager.getInstance().getNextFolderForTsFile(0, false),
databaseName
@@ -3038,7 +3038,7 @@ public class DataRegion implements IDataRegionForQuery {
+ tsFileResource.getTsFile().getName());
tsFileResource.setFile(targetFile);
if (tsFileManager.contains(tsFileResource, false)) {
- logger.error("The file {} has already been loaded in unsequence list",
tsFileResource);
+ logger.warn("The file {} has already been loaded in unsequence list",
tsFileResource);
return false;
}
@@ -3055,12 +3055,20 @@ public class DataRegion implements IDataRegionForQuery {
}
try {
if (deleteOriginFile) {
- FileUtils.moveFile(tsFileToLoad, targetFile);
+ RetryUtils.retryOnException(
+ () -> {
+ FileUtils.moveFile(tsFileToLoad, targetFile);
+ return null;
+ });
} else {
- Files.copy(tsFileToLoad.toPath(), targetFile.toPath());
+ RetryUtils.retryOnException(
+ () -> {
+ Files.copy(tsFileToLoad.toPath(), targetFile.toPath());
+ return null;
+ });
}
} catch (final IOException e) {
- logger.error(
+ logger.warn(
"File renaming failed when loading tsfile. Origin: {}, Target: {}",
tsFileToLoad.getAbsolutePath(),
targetFile.getAbsolutePath(),
@@ -3077,13 +3085,20 @@ public class DataRegion implements IDataRegionForQuery {
fsFactory.getFile(targetFile.getAbsolutePath() + RESOURCE_SUFFIX);
try {
if (deleteOriginFile) {
- FileUtils.moveFile(resourceFileToLoad, targetResourceFile);
+ RetryUtils.retryOnException(
+ () -> {
+ FileUtils.moveFile(resourceFileToLoad, targetResourceFile);
+ return null;
+ });
} else {
- Files.copy(resourceFileToLoad.toPath(), targetResourceFile.toPath());
+ RetryUtils.retryOnException(
+ () -> {
+ Files.copy(resourceFileToLoad.toPath(),
targetResourceFile.toPath());
+ return null;
+ });
}
-
} catch (final IOException e) {
- logger.error(
+ logger.warn(
"File renaming failed when loading .resource file. Origin: {},
Target: {}",
resourceFileToLoad.getAbsolutePath(),
targetResourceFile.getAbsolutePath(),
@@ -3133,18 +3148,30 @@ public class DataRegion implements IDataRegionForQuery {
// when successfully loaded, the filepath of the resource will be
changed to the IoTDB data
// dir, so we can add a suffix to find the old modification file.
try {
- Files.deleteIfExists(targetModFile.toPath());
+ RetryUtils.retryOnException(
+ () -> {
+ Files.deleteIfExists(targetModFile.toPath());
+ return null;
+ });
} catch (final IOException e) {
logger.warn("Cannot delete localModFile {}", targetModFile, e);
}
try {
if (deleteOriginFile) {
- FileUtils.moveFile(modFileToLoad, targetModFile);
+ RetryUtils.retryOnException(
+ () -> {
+ FileUtils.moveFile(modFileToLoad, targetModFile);
+ return null;
+ });
} else {
- Files.copy(modFileToLoad.toPath(), targetModFile.toPath());
+ RetryUtils.retryOnException(
+ () -> {
+ Files.copy(modFileToLoad.toPath(), targetModFile.toPath());
+ return null;
+ });
}
} catch (final IOException e) {
- logger.error(
+ logger.warn(
"File renaming failed when loading .mod file. Origin: {}, Target:
{}",
modFileToLoad.getAbsolutePath(),
targetModFile.getAbsolutePath(),
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java
index c4dc75f591b..b88ab2c6520 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java
@@ -29,6 +29,7 @@ import org.apache.iotdb.commons.service.metric.MetricService;
import org.apache.iotdb.commons.service.metric.enums.Metric;
import org.apache.iotdb.commons.service.metric.enums.Tag;
import org.apache.iotdb.commons.utils.FileUtils;
+import org.apache.iotdb.commons.utils.RetryUtils;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.consensus.DataRegionConsensusImpl;
@@ -386,7 +387,7 @@ public class LoadTsFileManager {
private void clearDir(File dir) {
if (dir.exists()) {
- FileUtils.deleteFileOrDirectory(dir);
+ FileUtils.deleteFileOrDirectoryWithRetry(dir);
}
if (dir.mkdirs()) {
LOGGER.info("Load TsFile dir {} is created.", dir.getPath());
@@ -563,7 +564,11 @@ public class LoadTsFileManager {
}
final Path writerPath = writer.getFile().toPath();
if (Files.exists(writerPath)) {
- Files.delete(writerPath);
+ RetryUtils.retryOnException(
+ () -> {
+ Files.delete(writerPath);
+ return null;
+ });
}
} catch (IOException e) {
LOGGER.warn("Close TsFileIOWriter {} error.",
entry.getValue().getFile().getPath(), e);
@@ -578,7 +583,11 @@ public class LoadTsFileManager {
modificationFile.close();
final Path modificationFilePath =
modificationFile.getFile().toPath();
if (Files.exists(modificationFilePath)) {
- Files.delete(modificationFilePath);
+ RetryUtils.retryOnException(
+ () -> {
+ Files.delete(modificationFilePath);
+ return null;
+ });
}
} catch (IOException e) {
LOGGER.warn("Close ModificationFile {} error.",
entry.getValue().getFile(), e);
@@ -586,7 +595,11 @@ public class LoadTsFileManager {
}
}
try {
- Files.delete(taskDir.toPath());
+ RetryUtils.retryOnException(
+ () -> {
+ Files.delete(taskDir.toPath());
+ return null;
+ });
} catch (DirectoryNotEmptyException e) {
LOGGER.info("Task dir {} is not empty, skip deleting.",
taskDir.getPath());
} catch (IOException e) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadDirScanner.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadDirScanner.java
index c5fc2b59b96..34bb392dc28 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadDirScanner.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadDirScanner.java
@@ -102,17 +102,22 @@ public class ActiveLoadDirScanner extends
ActiveLoadScheduledExecutorService {
listeningDir.equals(IOTDB_CONFIG.getLoadActiveListeningPipeDir());
try (final Stream<File> fileStream =
FileUtils.streamFiles(new File(listeningDir), true, (String[])
null)) {
- fileStream
- .filter(file ->
!activeLoadTsFileLoader.isFilePendingOrLoading(file))
- .filter(File::exists)
- .map(
- file ->
- (file.getName().endsWith(RESOURCE) ||
file.getName().endsWith(MODS))
- ? getTsFilePath(file.getAbsolutePath())
- : file.getAbsolutePath())
- .filter(this::isTsFileCompleted)
- .limit(currentAllowedPendingSize)
- .forEach(file -> activeLoadTsFileLoader.tryTriggerTsFileLoad(file,
isGeneratedByPipe));
+ try {
+ fileStream
+ .filter(file ->
!activeLoadTsFileLoader.isFilePendingOrLoading(file))
+ .filter(File::exists)
+ .map(
+ file ->
+ (file.getName().endsWith(RESOURCE) ||
file.getName().endsWith(MODS))
+ ? getTsFilePath(file.getAbsolutePath())
+ : file.getAbsolutePath())
+ .filter(this::isTsFileCompleted)
+ .limit(currentAllowedPendingSize)
+ .forEach(
+ file -> activeLoadTsFileLoader.tryTriggerTsFileLoad(file,
isGeneratedByPipe));
+ } catch (final Exception e) {
+ LOGGER.warn("Exception occurred during scanning dir: {}",
listeningDir, e);
+ }
}
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java
index 188ae30436d..8cb79658b66 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.commons.concurrent.IoTThreadFactory;
import org.apache.iotdb.commons.concurrent.ThreadName;
import
org.apache.iotdb.commons.concurrent.threadpool.WrappedThreadPoolExecutor;
import org.apache.iotdb.commons.conf.CommonDescriptor;
+import org.apache.iotdb.commons.utils.RetryUtils;
import org.apache.iotdb.db.auth.AuthorityChecker;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -94,7 +95,11 @@ public class ActiveLoadTsFileLoader {
if (!Objects.equals(failDir.get(),
IOTDB_CONFIG.getLoadActiveListeningFailDir())) {
final File failDirFile = new
File(IOTDB_CONFIG.getLoadActiveListeningFailDir());
try {
- FileUtils.forceMkdir(failDirFile);
+ RetryUtils.retryOnException(
+ () -> {
+ FileUtils.forceMkdir(failDirFile);
+ return null;
+ });
} catch (final IOException e) {
LOGGER.warn(
"Error occurred during creating fail directory {} for active
load.",
@@ -258,7 +263,11 @@ public class ActiveLoadTsFileLoader {
final File targetDir = new File(failDir.get());
try {
-
org.apache.iotdb.commons.utils.FileUtils.moveFileWithMD5Check(sourceFile,
targetDir);
+ RetryUtils.retryOnException(
+ () -> {
+
org.apache.iotdb.commons.utils.FileUtils.moveFileWithMD5Check(sourceFile,
targetDir);
+ return null;
+ });
} catch (final IOException e) {
LOGGER.warn("Error occurred during moving file {} to fail directory.",
filePath, e);
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
index 23e89ddb7d7..336670f593f 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java
@@ -33,6 +33,7 @@ import
org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeTransf
import
org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeTransferHandshakeV1Req;
import
org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeTransferHandshakeV2Req;
import
org.apache.iotdb.commons.pipe.connector.payload.thrift.response.PipeTransferFilePieceResp;
+import org.apache.iotdb.commons.utils.RetryUtils;
import org.apache.iotdb.commons.utils.StatusUtils;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
@@ -112,7 +113,11 @@ public abstract class IoTDBFileReceiver implements
IoTDBReceiver {
if (receiverFileDirWithIdSuffix.get() != null) {
if (receiverFileDirWithIdSuffix.get().exists()) {
try {
- FileUtils.deleteDirectory(receiverFileDirWithIdSuffix.get());
+ RetryUtils.retryOnException(
+ () -> {
+ FileUtils.deleteDirectory(receiverFileDirWithIdSuffix.get());
+ return null;
+ });
LOGGER.info(
"Receiver id = {}: Original receiver file dir {} was deleted.",
receiverId.get(),
@@ -440,7 +445,7 @@ public abstract class IoTDBFileReceiver implements
IoTDBReceiver {
private void deleteFile(final File file) {
if (file.exists()) {
try {
- FileUtils.delete(file);
+ RetryUtils.retryOnException(() -> FileUtils.delete(file));
LOGGER.info(
"Receiver id = {}: Original writing file {} was deleted.",
receiverId.get(),
@@ -749,7 +754,7 @@ public abstract class IoTDBFileReceiver implements
IoTDBReceiver {
if (writingFile != null) {
try {
- FileUtils.delete(writingFile);
+ RetryUtils.retryOnException(() -> FileUtils.delete(writingFile));
LOGGER.info(
"Receiver id = {}: Handling exit: Writing file {} was deleted.",
receiverId.get(),
@@ -774,7 +779,11 @@ public abstract class IoTDBFileReceiver implements
IoTDBReceiver {
if (receiverFileDirWithIdSuffix.get() != null) {
if (receiverFileDirWithIdSuffix.get().exists()) {
try {
- FileUtils.deleteDirectory(receiverFileDirWithIdSuffix.get());
+ RetryUtils.retryOnException(
+ () -> {
+ FileUtils.deleteDirectory(receiverFileDirWithIdSuffix.get());
+ return null;
+ });
LOGGER.info(
"Receiver id = {}: Handling exit: Original receiver file dir {}
was deleted.",
receiverId.get(),
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBReceiverAgent.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBReceiverAgent.java
index c09812f2003..7aaa66a8b75 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBReceiverAgent.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBReceiverAgent.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.commons.pipe.receiver;
+import org.apache.iotdb.commons.utils.RetryUtils;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
@@ -114,7 +115,11 @@ public abstract class IoTDBReceiverAgent {
public static void cleanPipeReceiverDir(final File receiverFileDir) {
try {
- FileUtils.deleteDirectory(receiverFileDir);
+ RetryUtils.retryOnException(
+ () -> {
+ FileUtils.deleteDirectory(receiverFileDir);
+ return null;
+ });
LOGGER.info("Clean pipe receiver dir {} successfully.", receiverFileDir);
} catch (final Exception e) {
LOGGER.warn("Clean pipe receiver dir {} failed.", receiverFileDir, e);
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/resource/snapshot/PipeSnapshotResourceManager.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/resource/snapshot/PipeSnapshotResourceManager.java
index 6b134d17205..24152a14da9 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/resource/snapshot/PipeSnapshotResourceManager.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/resource/snapshot/PipeSnapshotResourceManager.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.commons.pipe.resource.snapshot;
import org.apache.iotdb.commons.utils.FileUtils;
+import org.apache.iotdb.commons.utils.RetryUtils;
import org.apache.iotdb.commons.utils.TestOnly;
import org.slf4j.Logger;
@@ -75,7 +76,11 @@ public abstract class PipeSnapshotResourceManager {
}
// Otherwise, copy the snapshot to pipe dir
- FileUtils.copyFile(new File(snapshotPath), new File(copiedFilePath));
+ RetryUtils.retryOnException(
+ () -> {
+ FileUtils.copyFile(new File(snapshotPath), new
File(copiedFilePath));
+ return null;
+ });
copiedSnapshotPath2ReferenceCountMap.put(copiedFilePath, new
AtomicLong(1));
return copiedFilePath;
} finally {
@@ -140,7 +145,7 @@ public abstract class PipeSnapshotResourceManager {
final long count = referenceCount.decrementAndGet();
if (count == 0) {
copiedSnapshotPath2ReferenceCountMap.remove(snapshotPath);
- FileUtils.deleteFileOrDirectory(new File(snapshotPath));
+ FileUtils.deleteFileOrDirectoryWithRetry(new File(snapshotPath));
}
} finally {
lock.unlock();
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/FileUtils.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/FileUtils.java
index 99a7b4e7589..f1ae48c9956 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/FileUtils.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/FileUtils.java
@@ -86,6 +86,28 @@ public class FileUtils {
}
}
+ public static void deleteFileOrDirectoryWithRetry(File file) {
+ if (file.isDirectory()) {
+ File[] files = file.listFiles();
+ if (files != null) {
+ for (File subfile : files) {
+ deleteFileOrDirectoryWithRetry(subfile);
+ }
+ }
+ }
+ try {
+ RetryUtils.retryOnException(
+ () -> {
+ Files.delete(file.toPath());
+ return null;
+ });
+ } catch (DirectoryNotEmptyException e) {
+ LOGGER.warn("{}: {}", e.getMessage(), Arrays.toString(file.list()), e);
+ } catch (Exception e) {
+ LOGGER.warn("{}: {}", e.getMessage(), file.getName(), e);
+ }
+ }
+
public static void deleteDirectoryAndEmptyParent(File folder) {
deleteFileOrDirectory(folder);
final File parentFolder = folder.getParentFile();
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/RetryUtils.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/RetryUtils.java
new file mode 100644
index 00000000000..19a6456ec30
--- /dev/null
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/RetryUtils.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.utils;
+
+public class RetryUtils {
+
+ public interface CallableWithException<T, E extends Exception> {
+ T call() throws E;
+ }
+
+ public static final int MAX_RETRIES = 3;
+
+ public static <T, E extends Exception> T retryOnException(
+ final CallableWithException<T, E> callable) throws E {
+ int attempt = 0;
+ while (true) {
+ try {
+ return callable.call();
+ } catch (Exception e) {
+ attempt++;
+ if (attempt >= MAX_RETRIES) {
+ throw e;
+ }
+ }
+ }
+ }
+
+ private RetryUtils() {
+ // utility class
+ }
+}