This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch object_type
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/object_type by this push:
new 3a685174ed0 fix error value
3a685174ed0 is described below
commit 3a685174ed077a951c465b6fcb1de86ae552aeb0
Author: HTHou <[email protected]>
AuthorDate: Mon Jul 7 16:24:40 2025 +0800
fix error value
---
.../plan/planner/plan/node/write/FileNode.java | 89 ++++++++++---
.../db/storageengine/dataregion/DataRegion.java | 37 +++++-
.../dataregion/memtable/TsFileProcessor.java | 148 +++++++++++----------
.../dataregion/wal/buffer/WALEntryType.java | 1 +
.../db/storageengine/rescon/disk/TierManager.java | 8 +-
5 files changed, 193 insertions(+), 90 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/FileNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/FileNode.java
index 6eae25ac1df..e91869e7e0b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/FileNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/FileNode.java
@@ -22,20 +22,13 @@ package
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write;
// TODO:[OBJECT] WAL serde
public class FileNode {
- private String filePath;
+ private final boolean isEOF;
- private long offset;
+ private final long offset;
private byte[] content;
- private boolean isEOF;
-
- public FileNode(String filePath, boolean isEOF, long offset, byte[] content)
{
- this.filePath = filePath;
- this.isEOF = isEOF;
- this.offset = offset;
- this.content = content;
- }
+ private String filePath;
public FileNode(boolean isEOF, long offset, byte[] content) {
this.isEOF = isEOF;
@@ -43,14 +36,6 @@ public class FileNode {
this.content = content;
}
- public void setFilePath(String filePath) {
- this.filePath = filePath;
- }
-
- public String getFilePath() {
- return filePath;
- }
-
public boolean isEOF() {
return isEOF;
}
@@ -62,4 +47,72 @@ public class FileNode {
public long getOffset() {
return offset;
}
+
+ public void setFilePath(String filePath) {
+ this.filePath = filePath;
+ }
+
+ public String getFilePath() {
+ return filePath;
+ }
+ //
+ // @Override
+ // public void serializeToWAL(IWALByteBufferView buffer) {}
+ //
+ // @Override
+ // public int serializedSize() {
+ // return 0;
+ // }
+ //
+ // @Override
+ // public SearchNode merge(List<SearchNode> searchNodes) {
+ // return null;
+ // }
+ //
+ // @Override
+ // public ProgressIndex getProgressIndex() {
+ // return null;
+ // }
+ //
+ // @Override
+ // public void setProgressIndex(ProgressIndex progressIndex) {}
+ //
+ // @Override
+ // public List<WritePlanNode> splitByPartition(IAnalysis analysis) {
+ // return List.of();
+ // }
+ //
+ // @Override
+ // public TRegionReplicaSet getRegionReplicaSet() {
+ // return null;
+ // }
+ //
+ // @Override
+ // public List<PlanNode> getChildren() {
+ // return List.of();
+ // }
+ //
+ // @Override
+ // public void addChild(PlanNode child) {}
+ //
+ // @Override
+ // public PlanNode clone() {
+ // return null;
+ // }
+ //
+ // @Override
+ // public int allowedChildCount() {
+ // return 0;
+ // }
+ //
+ // @Override
+ // public List<String> getOutputColumnNames() {
+ // return List.of();
+ // }
+ //
+ // @Override
+ // protected void serializeAttributes(ByteBuffer byteBuffer) {}
+ //
+ // @Override
+ // protected void serializeAttributes(DataOutputStream stream) throws
IOException {}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index 3da2e466177..25bc1484a78 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -347,7 +347,7 @@ public class DataRegion implements IDataRegionForQuery {
private ILoadDiskSelector ordinaryLoadDiskSelector;
private ILoadDiskSelector pipeAndIoTV2LoadDiskSelector;
- public static AtomicLong objectFileId = new AtomicLong(0);
+ public AtomicLong objectFileId = new AtomicLong(0);
/**
* Construct a database processor.
@@ -537,6 +537,7 @@ public class DataRegion implements IDataRegionForQuery {
getAllFiles(TierManager.getInstance().getAllLocalSequenceFileFolders());
Map<Long, List<TsFileResource>> partitionTmpUnseqTsFiles =
getAllFiles(TierManager.getInstance().getAllLocalUnSequenceFileFolders());
+ checkObjectFiles(TierManager.getInstance().getAllObjectFileFolders());
DataRegionRecoveryContext dataRegionRecoveryContext =
new DataRegionRecoveryContext(
partitionTmpSeqTsFiles.values().stream().mapToLong(List::size).sum()
@@ -853,6 +854,40 @@ public class DataRegion implements IDataRegionForQuery {
return ret;
}
+ private void checkObjectFiles(List<String> folders) throws IOException {
+ for (String baseDir : folders) {
+ File fileFolder = fsFactory.getFile(baseDir + File.separator +
databaseName, dataRegionId);
+ if (!fileFolder.exists()) {
+ continue;
+ }
+ File[] subFiles = fileFolder.listFiles();
+ if (subFiles != null) {
+ for (File partitionFolder : subFiles) {
+ if (!partitionFolder.isDirectory()) {
+ logger.warn("{} is not a directory.",
partitionFolder.getAbsolutePath());
+ } else {
+ File[] objectFileInThisFolder =
+ fsFactory.listFilesBySuffix(partitionFolder.getAbsolutePath(),
".bin");
+ for (File f : objectFileInThisFolder) {
+ objectFileId.updateAndGet(
+ current ->
+ Math.max(
+ current,
+ Long.parseLong(
+ f.getName().substring(0, f.getName().length() -
4).split("-")[2])));
+ }
+ File[] objectTmpFileInThisFolder =
+ fsFactory.listFilesBySuffix(partitionFolder.getAbsolutePath(),
".bin.tmp");
+ for (File f : objectTmpFileInThisFolder) {
+ // remove bin.tmp
+ Files.delete(f.toPath());
+ }
+ }
+ }
+ }
+ }
+ }
+
private void continueFailedRenames(File fileFolder, String suffix) throws
IOException {
File[] files = fsFactory.listFilesBySuffix(fileFolder.getAbsolutePath(),
suffix);
if (files != null) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
index b521f8a1f5b..4c4392178f9 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
@@ -567,77 +567,8 @@ public class TsFileProcessor {
boolean noFailure,
long[] infoForMetrics)
throws WriteProcessException {
- if (insertTabletNode instanceof RelationalInsertTabletNode) {
- RelationalInsertTabletNode relationalInsertTabletNode =
- (RelationalInsertTabletNode) insertTabletNode;
- List<List<FileNode>> fileNodesList =
relationalInsertTabletNode.getFileNodeList();
- if (fileNodesList != null) {
- for (int j = 0; j < fileNodesList.size(); j++) {
- List<FileNode> fileNodeList = fileNodesList.get(j);
- for (int i = 0; i < fileNodeList.size(); i++) {
- FileNode fileNode = fileNodeList.get(i);
- String objectFileName =
- insertTabletNode.getTimes()[i]
- + "-"
- + config.getDataNodeId()
- + "-"
- + DataRegion.objectFileId.incrementAndGet()
- + ".bin";
- String objectTmpFileName = objectFileName + ".tmp";
- File objectTmpFile;
- try {
- String baseDir =
TierManager.getInstance().getNextFolderForObjectFile();
- String objectFileDir =
- baseDir
- + File.separator
- + dataRegionInfo.getDataRegion().getDatabaseName()
- + File.separator
- + dataRegionInfo.getDataRegion().getDataRegionId()
- + File.separator
- + tsFileResource.getTsFileID().timePartitionId;
- objectTmpFile =
- FSFactoryProducer.getFSFactory().getFile(objectFileDir,
objectTmpFileName);
- try (ObjectWriter writer = new ObjectWriter(objectTmpFile)) {
- writer.write(fileNode);
- }
- } catch (Exception e) {
- results[i] =
RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR, e.getMessage());
- throw new WriteProcessException(e);
- }
- // TODO:[OBJECT] write file node wal
- if (fileNode.isEOF()) {
- File objectFile =
- FSFactoryProducer.getFSFactory()
- .getFile(objectTmpFile.getParentFile(), objectFileName);
- try {
- Files.move(
- objectTmpFile.toPath(),
- objectFile.toPath(),
- StandardCopyOption.REPLACE_EXISTING);
- } catch (IOException e) {
- results[i] =
RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR, e.getMessage());
- throw new WriteProcessException(e);
- }
- String relativePathString =
- dataRegionInfo.getDataRegion().getDatabaseName()
- + File.separator
- + dataRegionInfo.getDataRegion().getDataRegionId()
- + File.separator
- + tsFileResource.getTsFileID().timePartitionId
- + File.separator
- + objectFileName;
- byte[] filePathBytes =
relativePathString.getBytes(StandardCharsets.UTF_8);
- byte[] valueBytes = new byte[filePathBytes.length + Long.BYTES];
- System.arraycopy(
- BytesUtils.longToBytes(objectFile.length()), 0, valueBytes,
0, Long.BYTES);
- System.arraycopy(filePathBytes, 0, valueBytes, 4,
filePathBytes.length);
- (relationalInsertTabletNode.getObjectColumns().get(j))[i] = new
Binary(valueBytes);
- }
- }
- }
- }
- }
+ handleWriteObject(insertTabletNode, rangeList, results);
ensureMemTable(infoForMetrics);
@@ -2411,4 +2342,81 @@ public class TsFileProcessor {
"{}: {} release flushQueryLock", dataRegionName,
tsFileResource.getTsFile().getName());
}
}
+
+ private void handleWriteObject(
+ InsertTabletNode insertTabletNode, List<int[]> rangeList, TSStatus[]
results)
+ throws WriteProcessException {
+ if (insertTabletNode instanceof RelationalInsertTabletNode) {
+ RelationalInsertTabletNode relationalInsertTabletNode =
+ (RelationalInsertTabletNode) insertTabletNode;
+ List<List<FileNode>> fileNodesList =
relationalInsertTabletNode.getFileNodeList();
+ if (fileNodesList != null) {
+ for (int j = 0; j < fileNodesList.size(); j++) {
+ List<FileNode> fileNodeList = fileNodesList.get(j);
+ for (int i = 0; i < fileNodeList.size(); i++) {
+ FileNode fileNode = fileNodeList.get(i);
+ String objectFileName =
+ insertTabletNode.getTimes()[i]
+ + "-"
+ + config.getDataNodeId()
+ + "-"
+ +
dataRegionInfo.getDataRegion().objectFileId.incrementAndGet()
+ + ".bin";
+ String objectTmpFileName = objectFileName + ".tmp";
+ File objectTmpFile;
+ String relativePathString =
+ dataRegionInfo.getDataRegion().getDatabaseName()
+ + File.separator
+ + dataRegionInfo.getDataRegion().getDataRegionId()
+ + File.separator
+ + tsFileResource.getTsFileID().timePartitionId
+ + File.separator
+ + objectFileName;
+ try {
+ String baseDir =
TierManager.getInstance().getNextFolderForObjectFile();
+ String objectFileDir =
+ baseDir
+ + File.separator
+ + dataRegionInfo.getDataRegion().getDatabaseName()
+ + File.separator
+ + dataRegionInfo.getDataRegion().getDataRegionId()
+ + File.separator
+ + tsFileResource.getTsFileID().timePartitionId;
+
+ objectTmpFile =
+ FSFactoryProducer.getFSFactory().getFile(objectFileDir,
objectTmpFileName);
+ try (ObjectWriter writer = new ObjectWriter(objectTmpFile)) {
+ writer.write(fileNode);
+ }
+ fileNode.setFilePath(relativePathString);
+ } catch (Exception e) {
+ results[i] =
RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR, e.getMessage());
+ throw new WriteProcessException(e);
+ }
+ // TODO:[OBJECT] write file node wal
+ if (fileNode.isEOF()) {
+ File objectFile =
+ FSFactoryProducer.getFSFactory()
+ .getFile(objectTmpFile.getParentFile(), objectFileName);
+ try {
+ Files.move(
+ objectTmpFile.toPath(),
+ objectFile.toPath(),
+ StandardCopyOption.REPLACE_EXISTING);
+ } catch (IOException e) {
+ results[i] =
RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR, e.getMessage());
+ throw new WriteProcessException(e);
+ }
+ byte[] filePathBytes =
relativePathString.getBytes(StandardCharsets.UTF_8);
+ byte[] valueBytes = new byte[filePathBytes.length + Long.BYTES];
+ System.arraycopy(
+ BytesUtils.longToBytes(objectFile.length()), 0, valueBytes,
0, Long.BYTES);
+ System.arraycopy(filePathBytes, 0, valueBytes, Long.BYTES,
filePathBytes.length);
+ (relationalInsertTabletNode.getObjectColumns().get(j))[i] = new
Binary(valueBytes);
+ }
+ }
+ }
+ }
+ }
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALEntryType.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALEntryType.java
index 1c3ddcbf702..ac490699547 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALEntryType.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALEntryType.java
@@ -45,6 +45,7 @@ public enum WALEntryType {
/** {@link
org.apache.iotdb.db.storageengine.dataregion.memtable.AbstractMemTable} */
MEMORY_TABLE_SNAPSHOT((byte) 10),
RELATIONAL_DELETE_DATA_NODE((byte) 11),
+ OBJECT_FILE_NODE((byte) 12),
// endregion
// region signal entry type
// signal wal buffer has been closed
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/disk/TierManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/disk/TierManager.java
index 1273e7adaf7..75c5d3b3c96 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/disk/TierManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/disk/TierManager.java
@@ -73,6 +73,8 @@ public class TierManager {
/** unSeq file folder's rawFsPath path -> tier level */
private final Map<String, Integer> unSeqDir2TierLevel = new HashMap<>();
+ private List<String> objectDirs;
+
/** total space of each tier, Long.MAX_VALUE when one tier contains remote
storage */
private long[] tierDiskTotalSpace;
@@ -154,7 +156,7 @@ public class TierManager {
unSeqDir2TierLevel.put(dir, tierLevel);
}
- List<String> objectDirs =
+ objectDirs =
Arrays.stream(tierDirs[tierLevel])
.filter(Objects::nonNull)
.map(
@@ -245,6 +247,10 @@ public class TierManager {
.collect(Collectors.toList());
}
+ public List<String> getAllObjectFileFolders() {
+ return objectDirs;
+ }
+
public int getTiersNum() {
return seqTiers.size();
}