This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch object_type
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/object_type by this push:
new a5b60d4bd0b Add back logic
new ffd7027dc3f Merge branch 'object_type' of github.com:apache/iotdb into
object_type
a5b60d4bd0b is described below
commit a5b60d4bd0bc0979b365845ca5c9da17b6cc0a2a
Author: HTHou <[email protected]>
AuthorDate: Tue Jul 8 18:07:47 2025 +0800
Add back logic
---
.../iotdb/db/storageengine/StorageEngine.java | 35 ++++++++++++
.../db/storageengine/dataregion/DataRegion.java | 63 +++++-----------------
2 files changed, 48 insertions(+), 50 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
index 7edd40e42d0..a4dc0bc7a2a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
@@ -76,12 +76,15 @@ import
org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALException;
import
org.apache.iotdb.db.storageengine.dataregion.wal.recover.WALRecoverManager;
import org.apache.iotdb.db.storageengine.load.LoadTsFileManager;
import org.apache.iotdb.db.storageengine.load.limiter.LoadTsFileRateLimiter;
+import org.apache.iotdb.db.storageengine.rescon.disk.TierManager;
import org.apache.iotdb.db.storageengine.rescon.memory.SystemInfo;
import org.apache.iotdb.db.utils.ThreadUtils;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.commons.io.FileUtils;
+import org.apache.tsfile.fileSystem.FSFactoryProducer;
+import org.apache.tsfile.fileSystem.fsFactory.FSFactory;
import org.apache.tsfile.utils.FilePathUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -89,6 +92,8 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
@@ -119,6 +124,8 @@ public class StorageEngine implements IService {
private static final IoTDBConfig CONFIG =
IoTDBDescriptor.getInstance().getConfig();
private static final WritingMetrics WRITING_METRICS =
WritingMetrics.getInstance();
+ private final FSFactory fsFactory = FSFactoryProducer.getFSFactory();
+
/**
* a folder (system/databases/ by default) that persist system info. Each
database will have a
* subfolder under the systemDir.
@@ -234,6 +241,8 @@ public class StorageEngine implements IService {
}
private void asyncRecover(List<Future<Void>> futures) {
+ checkObjectFiles();
+
Map<String, List<DataRegionId>> localDataRegionInfo =
getLocalDataRegionInfo();
localDataRegionInfo.values().forEach(list -> recoverDataRegionNum +=
list.size());
readyDataRegionNum = new AtomicInteger(0);
@@ -1068,6 +1077,32 @@ public class StorageEngine implements IService {
systemDir + File.separator + dataBaseName, dataRegionId);
}
+ private void checkObjectFiles() {
+ List<String> folders = TierManager.getInstance().getAllObjectFileFolders();
+ for (String baseDir : folders) {
+ File fileFolder = fsFactory.getFile(baseDir);
+ try (Stream<Path> paths = Files.walk(fileFolder.toPath())) {
+ paths
+ .filter(Files::isRegularFile)
+ .filter(
+ path -> {
+ String name = path.getFileName().toString();
+ return name.endsWith(".bin.tmp") ||
name.endsWith(".bin.back");
+ })
+ .forEach(
+ path -> {
+ try {
+ Files.delete(path);
+ } catch (IOException e) {
+ LOGGER.error("Failed to delete: {} -> {}", path,
e.getMessage());
+ }
+ });
+ } catch (IOException e) {
+ LOGGER.error("Failed to check Object Files: {}", e.getMessage());
+ }
+ }
+ }
+
public Runnable executeCompactFileTimeIndexCache() {
return () -> {
if (!isReadyForNonReadWriteFunctions()) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index 8396941c480..bee359ab33f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -538,7 +538,6 @@ public class DataRegion implements IDataRegionForQuery {
getAllFiles(TierManager.getInstance().getAllLocalSequenceFileFolders());
Map<Long, List<TsFileResource>> partitionTmpUnseqTsFiles =
getAllFiles(TierManager.getInstance().getAllLocalUnSequenceFileFolders());
- checkObjectFiles(TierManager.getInstance().getAllObjectFileFolders());
DataRegionRecoveryContext dataRegionRecoveryContext =
new DataRegionRecoveryContext(
partitionTmpSeqTsFiles.values().stream().mapToLong(List::size).sum()
@@ -819,15 +818,6 @@ public class DataRegion implements IDataRegionForQuery {
String tsFilePartitionPath = partitionName + File.separator +
f.getName();
tsFilePartitionPath2File.put(tsFilePartitionPath, f);
}
-
- File[] objectFileInThisFolder =
- fsFactory.listFilesBySuffix(partitionFolder.getAbsolutePath(),
".bin");
- File[] objectTmpFileInThisFolder =
- fsFactory.listFilesBySuffix(partitionFolder.getAbsolutePath(),
".bin.tmp");
- for (File f : objectTmpFileInThisFolder) {
- // remove bin.tmp
- Files.delete(f.toPath());
- }
}
}
}
@@ -847,44 +837,6 @@ public class DataRegion implements IDataRegionForQuery {
return ret;
}
- private void checkObjectFiles(List<String> folders) throws IOException {
- for (String baseDir : folders) {
- File fileFolder = fsFactory.getFile(baseDir + File.separator +
databaseName, dataRegionId);
- if (!fileFolder.exists()) {
- continue;
- }
- File[] subFiles = fileFolder.listFiles();
- if (subFiles != null) {
- for (File partitionFolder : subFiles) {
- if (!partitionFolder.isDirectory()) {
- logger.warn("{} is not a directory.",
partitionFolder.getAbsolutePath());
- } else {
- File[] objectFileInThisFolder =
- fsFactory.listFilesBySuffix(partitionFolder.getAbsolutePath(),
".bin");
- // for (File f : objectFileInThisFolder) {
- // StorageEngine.getInstance()
- // .objectFileId
- // .updateAndGet(
- // current ->
- // Math.max(
- // current,
- // Long.parseLong(
- // f.getName()
- // .substring(0,
f.getName().length() - 4)
- // .split("-")[2])));
- // }
- File[] objectTmpFileInThisFolder =
- fsFactory.listFilesBySuffix(partitionFolder.getAbsolutePath(),
".bin.tmp");
- for (File f : objectTmpFileInThisFolder) {
- // remove bin.tmp
- Files.delete(f.toPath());
- }
- }
- }
- }
- }
- }
-
private void continueFailedRenames(File fileFolder, String suffix) throws
IOException {
File[] files = fsFactory.listFilesBySuffix(fileFolder.getAbsolutePath(),
suffix);
if (files != null) {
@@ -3090,8 +3042,19 @@ public class DataRegion implements IDataRegionForQuery {
if (objectNode.isEOF()) {
File objectFile =
FSFactoryProducer.getFSFactory().getFile(objectFileDir,
objectNode.getFilePath());
- Files.move(
- objectTmpFile.toPath(), objectFile.toPath(),
StandardCopyOption.REPLACE_EXISTING);
+ if (objectFile.exists()) {
+ String relativeBackPathString = objectNode.getFilePath() + ".back";
+ File objectBackFile =
+ FSFactoryProducer.getFSFactory().getFile(objectFileDir,
relativeBackPathString);
+ Files.move(
+ objectFile.toPath(), objectBackFile.toPath(),
StandardCopyOption.REPLACE_EXISTING);
+ Files.move(
+ objectTmpFile.toPath(), objectFile.toPath(),
StandardCopyOption.REPLACE_EXISTING);
+ Files.delete(objectBackFile.toPath());
+ } else {
+ Files.move(
+ objectTmpFile.toPath(), objectFile.toPath(),
StandardCopyOption.REPLACE_EXISTING);
+ }
}
getWALNode()
.ifPresent(walNode ->
walNode.log(TsFileProcessor.MEMTABLE_NOT_EXIST, objectNode));