This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch object_type
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/object_type by this push:
     new 3a685174ed0 fix error value
3a685174ed0 is described below

commit 3a685174ed077a951c465b6fcb1de86ae552aeb0
Author: HTHou <[email protected]>
AuthorDate: Mon Jul 7 16:24:40 2025 +0800

    fix error value
---
 .../plan/planner/plan/node/write/FileNode.java     |  89 ++++++++++---
 .../db/storageengine/dataregion/DataRegion.java    |  37 +++++-
 .../dataregion/memtable/TsFileProcessor.java       | 148 +++++++++++----------
 .../dataregion/wal/buffer/WALEntryType.java        |   1 +
 .../db/storageengine/rescon/disk/TierManager.java  |   8 +-
 5 files changed, 193 insertions(+), 90 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/FileNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/FileNode.java
index 6eae25ac1df..e91869e7e0b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/FileNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/FileNode.java
@@ -22,20 +22,13 @@ package 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write;
 // TODO:[OBJECT] WAL serde
 public class FileNode {
 
-  private String filePath;
+  private final boolean isEOF;
 
-  private long offset;
+  private final long offset;
 
   private byte[] content;
 
-  private boolean isEOF;
-
-  public FileNode(String filePath, boolean isEOF, long offset, byte[] content) 
{
-    this.filePath = filePath;
-    this.isEOF = isEOF;
-    this.offset = offset;
-    this.content = content;
-  }
+  private String filePath;
 
   public FileNode(boolean isEOF, long offset, byte[] content) {
     this.isEOF = isEOF;
@@ -43,14 +36,6 @@ public class FileNode {
     this.content = content;
   }
 
-  public void setFilePath(String filePath) {
-    this.filePath = filePath;
-  }
-
-  public String getFilePath() {
-    return filePath;
-  }
-
   public boolean isEOF() {
     return isEOF;
   }
@@ -62,4 +47,72 @@ public class FileNode {
   public long getOffset() {
     return offset;
   }
+
+  public void setFilePath(String filePath) {
+    this.filePath = filePath;
+  }
+
+  public String getFilePath() {
+    return filePath;
+  }
+  //
+  //  @Override
+  //  public void serializeToWAL(IWALByteBufferView buffer) {}
+  //
+  //  @Override
+  //  public int serializedSize() {
+  //    return 0;
+  //  }
+  //
+  //  @Override
+  //  public SearchNode merge(List<SearchNode> searchNodes) {
+  //    return null;
+  //  }
+  //
+  //  @Override
+  //  public ProgressIndex getProgressIndex() {
+  //    return null;
+  //  }
+  //
+  //  @Override
+  //  public void setProgressIndex(ProgressIndex progressIndex) {}
+  //
+  //  @Override
+  //  public List<WritePlanNode> splitByPartition(IAnalysis analysis) {
+  //    return List.of();
+  //  }
+  //
+  //  @Override
+  //  public TRegionReplicaSet getRegionReplicaSet() {
+  //    return null;
+  //  }
+  //
+  //  @Override
+  //  public List<PlanNode> getChildren() {
+  //    return List.of();
+  //  }
+  //
+  //  @Override
+  //  public void addChild(PlanNode child) {}
+  //
+  //  @Override
+  //  public PlanNode clone() {
+  //    return null;
+  //  }
+  //
+  //  @Override
+  //  public int allowedChildCount() {
+  //    return 0;
+  //  }
+  //
+  //  @Override
+  //  public List<String> getOutputColumnNames() {
+  //    return List.of();
+  //  }
+  //
+  //  @Override
+  //  protected void serializeAttributes(ByteBuffer byteBuffer) {}
+  //
+  //  @Override
+  //  protected void serializeAttributes(DataOutputStream stream) throws 
IOException {}
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index 3da2e466177..25bc1484a78 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -347,7 +347,7 @@ public class DataRegion implements IDataRegionForQuery {
   private ILoadDiskSelector ordinaryLoadDiskSelector;
   private ILoadDiskSelector pipeAndIoTV2LoadDiskSelector;
 
-  public static AtomicLong objectFileId = new AtomicLong(0);
+  public AtomicLong objectFileId = new AtomicLong(0);
 
   /**
    * Construct a database processor.
@@ -537,6 +537,7 @@ public class DataRegion implements IDataRegionForQuery {
           
getAllFiles(TierManager.getInstance().getAllLocalSequenceFileFolders());
       Map<Long, List<TsFileResource>> partitionTmpUnseqTsFiles =
           
getAllFiles(TierManager.getInstance().getAllLocalUnSequenceFileFolders());
+      checkObjectFiles(TierManager.getInstance().getAllObjectFileFolders());
       DataRegionRecoveryContext dataRegionRecoveryContext =
           new DataRegionRecoveryContext(
               
partitionTmpSeqTsFiles.values().stream().mapToLong(List::size).sum()
@@ -853,6 +854,40 @@ public class DataRegion implements IDataRegionForQuery {
     return ret;
   }
 
+  private void checkObjectFiles(List<String> folders) throws IOException {
+    for (String baseDir : folders) {
+      File fileFolder = fsFactory.getFile(baseDir + File.separator + 
databaseName, dataRegionId);
+      if (!fileFolder.exists()) {
+        continue;
+      }
+      File[] subFiles = fileFolder.listFiles();
+      if (subFiles != null) {
+        for (File partitionFolder : subFiles) {
+          if (!partitionFolder.isDirectory()) {
+            logger.warn("{} is not a directory.", 
partitionFolder.getAbsolutePath());
+          } else {
+            File[] objectFileInThisFolder =
+                fsFactory.listFilesBySuffix(partitionFolder.getAbsolutePath(), 
".bin");
+            for (File f : objectFileInThisFolder) {
+              objectFileId.updateAndGet(
+                  current ->
+                      Math.max(
+                          current,
+                          Long.parseLong(
+                              f.getName().substring(0, f.getName().length() - 
4).split("-")[2])));
+            }
+            File[] objectTmpFileInThisFolder =
+                fsFactory.listFilesBySuffix(partitionFolder.getAbsolutePath(), 
".bin.tmp");
+            for (File f : objectTmpFileInThisFolder) {
+              // remove bin.tmp
+              Files.delete(f.toPath());
+            }
+          }
+        }
+      }
+    }
+  }
+
   private void continueFailedRenames(File fileFolder, String suffix) throws 
IOException {
     File[] files = fsFactory.listFilesBySuffix(fileFolder.getAbsolutePath(), 
suffix);
     if (files != null) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
index b521f8a1f5b..4c4392178f9 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
@@ -567,77 +567,8 @@ public class TsFileProcessor {
       boolean noFailure,
       long[] infoForMetrics)
       throws WriteProcessException {
-    if (insertTabletNode instanceof RelationalInsertTabletNode) {
-      RelationalInsertTabletNode relationalInsertTabletNode =
-          (RelationalInsertTabletNode) insertTabletNode;
-      List<List<FileNode>> fileNodesList = 
relationalInsertTabletNode.getFileNodeList();
-      if (fileNodesList != null) {
-        for (int j = 0; j < fileNodesList.size(); j++) {
-          List<FileNode> fileNodeList = fileNodesList.get(j);
-          for (int i = 0; i < fileNodeList.size(); i++) {
-            FileNode fileNode = fileNodeList.get(i);
-            String objectFileName =
-                insertTabletNode.getTimes()[i]
-                    + "-"
-                    + config.getDataNodeId()
-                    + "-"
-                    + DataRegion.objectFileId.incrementAndGet()
-                    + ".bin";
-            String objectTmpFileName = objectFileName + ".tmp";
-            File objectTmpFile;
-            try {
-              String baseDir = 
TierManager.getInstance().getNextFolderForObjectFile();
-              String objectFileDir =
-                  baseDir
-                      + File.separator
-                      + dataRegionInfo.getDataRegion().getDatabaseName()
-                      + File.separator
-                      + dataRegionInfo.getDataRegion().getDataRegionId()
-                      + File.separator
-                      + tsFileResource.getTsFileID().timePartitionId;
 
-              objectTmpFile =
-                  FSFactoryProducer.getFSFactory().getFile(objectFileDir, 
objectTmpFileName);
-              try (ObjectWriter writer = new ObjectWriter(objectTmpFile)) {
-                writer.write(fileNode);
-              }
-            } catch (Exception e) {
-              results[i] = 
RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR, e.getMessage());
-              throw new WriteProcessException(e);
-            }
-            // TODO:[OBJECT] write file node wal
-            if (fileNode.isEOF()) {
-              File objectFile =
-                  FSFactoryProducer.getFSFactory()
-                      .getFile(objectTmpFile.getParentFile(), objectFileName);
-              try {
-                Files.move(
-                    objectTmpFile.toPath(),
-                    objectFile.toPath(),
-                    StandardCopyOption.REPLACE_EXISTING);
-              } catch (IOException e) {
-                results[i] = 
RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR, e.getMessage());
-                throw new WriteProcessException(e);
-              }
-              String relativePathString =
-                  dataRegionInfo.getDataRegion().getDatabaseName()
-                      + File.separator
-                      + dataRegionInfo.getDataRegion().getDataRegionId()
-                      + File.separator
-                      + tsFileResource.getTsFileID().timePartitionId
-                      + File.separator
-                      + objectFileName;
-              byte[] filePathBytes = 
relativePathString.getBytes(StandardCharsets.UTF_8);
-              byte[] valueBytes = new byte[filePathBytes.length + Long.BYTES];
-              System.arraycopy(
-                  BytesUtils.longToBytes(objectFile.length()), 0, valueBytes, 
0, Long.BYTES);
-              System.arraycopy(filePathBytes, 0, valueBytes, 4, 
filePathBytes.length);
-              (relationalInsertTabletNode.getObjectColumns().get(j))[i] = new 
Binary(valueBytes);
-            }
-          }
-        }
-      }
-    }
+    handleWriteObject(insertTabletNode, rangeList, results);
 
     ensureMemTable(infoForMetrics);
 
@@ -2411,4 +2342,81 @@ public class TsFileProcessor {
           "{}: {} release flushQueryLock", dataRegionName, 
tsFileResource.getTsFile().getName());
     }
   }
+
+  private void handleWriteObject(
+      InsertTabletNode insertTabletNode, List<int[]> rangeList, TSStatus[] 
results)
+      throws WriteProcessException {
+    if (insertTabletNode instanceof RelationalInsertTabletNode) {
+      RelationalInsertTabletNode relationalInsertTabletNode =
+          (RelationalInsertTabletNode) insertTabletNode;
+      List<List<FileNode>> fileNodesList = 
relationalInsertTabletNode.getFileNodeList();
+      if (fileNodesList != null) {
+        for (int j = 0; j < fileNodesList.size(); j++) {
+          List<FileNode> fileNodeList = fileNodesList.get(j);
+          for (int i = 0; i < fileNodeList.size(); i++) {
+            FileNode fileNode = fileNodeList.get(i);
+            String objectFileName =
+                insertTabletNode.getTimes()[i]
+                    + "-"
+                    + config.getDataNodeId()
+                    + "-"
+                    + 
dataRegionInfo.getDataRegion().objectFileId.incrementAndGet()
+                    + ".bin";
+            String objectTmpFileName = objectFileName + ".tmp";
+            File objectTmpFile;
+            String relativePathString =
+                dataRegionInfo.getDataRegion().getDatabaseName()
+                    + File.separator
+                    + dataRegionInfo.getDataRegion().getDataRegionId()
+                    + File.separator
+                    + tsFileResource.getTsFileID().timePartitionId
+                    + File.separator
+                    + objectFileName;
+            try {
+              String baseDir = 
TierManager.getInstance().getNextFolderForObjectFile();
+              String objectFileDir =
+                  baseDir
+                      + File.separator
+                      + dataRegionInfo.getDataRegion().getDatabaseName()
+                      + File.separator
+                      + dataRegionInfo.getDataRegion().getDataRegionId()
+                      + File.separator
+                      + tsFileResource.getTsFileID().timePartitionId;
+
+              objectTmpFile =
+                  FSFactoryProducer.getFSFactory().getFile(objectFileDir, 
objectTmpFileName);
+              try (ObjectWriter writer = new ObjectWriter(objectTmpFile)) {
+                writer.write(fileNode);
+              }
+              fileNode.setFilePath(relativePathString);
+            } catch (Exception e) {
+              results[i] = 
RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR, e.getMessage());
+              throw new WriteProcessException(e);
+            }
+            // TODO:[OBJECT] write file node wal
+            if (fileNode.isEOF()) {
+              File objectFile =
+                  FSFactoryProducer.getFSFactory()
+                      .getFile(objectTmpFile.getParentFile(), objectFileName);
+              try {
+                Files.move(
+                    objectTmpFile.toPath(),
+                    objectFile.toPath(),
+                    StandardCopyOption.REPLACE_EXISTING);
+              } catch (IOException e) {
+                results[i] = 
RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR, e.getMessage());
+                throw new WriteProcessException(e);
+              }
+              byte[] filePathBytes = 
relativePathString.getBytes(StandardCharsets.UTF_8);
+              byte[] valueBytes = new byte[filePathBytes.length + Long.BYTES];
+              System.arraycopy(
+                  BytesUtils.longToBytes(objectFile.length()), 0, valueBytes, 
0, Long.BYTES);
+              System.arraycopy(filePathBytes, 0, valueBytes, Long.BYTES, 
filePathBytes.length);
+              (relationalInsertTabletNode.getObjectColumns().get(j))[i] = new 
Binary(valueBytes);
+            }
+          }
+        }
+      }
+    }
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALEntryType.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALEntryType.java
index 1c3ddcbf702..ac490699547 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALEntryType.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALEntryType.java
@@ -45,6 +45,7 @@ public enum WALEntryType {
   /** {@link 
org.apache.iotdb.db.storageengine.dataregion.memtable.AbstractMemTable} */
   MEMORY_TABLE_SNAPSHOT((byte) 10),
   RELATIONAL_DELETE_DATA_NODE((byte) 11),
+  OBJECT_FILE_NODE((byte) 12),
   // endregion
   // region signal entry type
   // signal wal buffer has been closed
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/disk/TierManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/disk/TierManager.java
index 1273e7adaf7..75c5d3b3c96 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/disk/TierManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/disk/TierManager.java
@@ -73,6 +73,8 @@ public class TierManager {
   /** unSeq file folder's rawFsPath path -> tier level */
   private final Map<String, Integer> unSeqDir2TierLevel = new HashMap<>();
 
+  private List<String> objectDirs;
+
   /** total space of each tier, Long.MAX_VALUE when one tier contains remote 
storage */
   private long[] tierDiskTotalSpace;
 
@@ -154,7 +156,7 @@ public class TierManager {
         unSeqDir2TierLevel.put(dir, tierLevel);
       }
 
-      List<String> objectDirs =
+      objectDirs =
           Arrays.stream(tierDirs[tierLevel])
               .filter(Objects::nonNull)
               .map(
@@ -245,6 +247,10 @@ public class TierManager {
         .collect(Collectors.toList());
   }
 
+  public List<String> getAllObjectFileFolders() {
+    return objectDirs;
+  }
+
   public int getTiersNum() {
     return seqTiers.size();
   }

Reply via email to