This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch object_type
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/object_type by this push:
     new af051df0313 validate offset & impl relativePath
af051df0313 is described below

commit af051df0313d70de22712af285435e67d60edbb9
Author: HTHou <[email protected]>
AuthorDate: Mon Jul 7 10:22:35 2025 +0800

    validate offset & impl relativePath
---
 .../main/java/org/apache/iotdb/ObjectExample.java  | 31 +++++++++++++-
 .../db/storageengine/dataregion/DataRegion.java    |  2 +
 .../dataregion/memtable/TsFileProcessor.java       | 47 ++++++++++++++++++----
 .../org/apache/iotdb/db/utils/ObjectWriter.java    | 12 +++++-
 4 files changed, 80 insertions(+), 12 deletions(-)

diff --git a/example/session/src/main/java/org/apache/iotdb/ObjectExample.java 
b/example/session/src/main/java/org/apache/iotdb/ObjectExample.java
index 1aa286e7ac4..b040dc5313b 100644
--- a/example/session/src/main/java/org/apache/iotdb/ObjectExample.java
+++ b/example/session/src/main/java/org/apache/iotdb/ObjectExample.java
@@ -28,7 +28,9 @@ import org.apache.tsfile.enums.ColumnCategory;
 import org.apache.tsfile.enums.TSDataType;
 import org.apache.tsfile.write.record.Tablet;
 
-import java.nio.charset.StandardCharsets;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -74,7 +76,30 @@ public class ObjectExample {
       tablet.addValue(rowIndex, 1, "5");
       tablet.addValue(rowIndex, 2, "3");
       tablet.addValue(rowIndex, 3, 37.6F);
-      tablet.addValue(rowIndex, 4, true, 0, 
"123456".getBytes(StandardCharsets.UTF_8));
+      tablet.addValue(
+          rowIndex,
+          4,
+          true,
+          0,
+          Files.readAllBytes(
+              
Paths.get("/Users/ht/Downloads/2_1746622362350_fa24aa15233f4e76bcda789a5771f43f")));
+      session.insert(tablet);
+      tablet.reset();
+
+      tablet = new Tablet("test1", columnNameList, dataTypeList, 
columnTypeList, 1);
+      rowIndex = tablet.getRowSize();
+      tablet.addTimestamp(rowIndex, 2);
+      tablet.addValue(rowIndex, 0, "1");
+      tablet.addValue(rowIndex, 1, "5");
+      tablet.addValue(rowIndex, 2, "3");
+      tablet.addValue(rowIndex, 3, 37.6F);
+      tablet.addValue(
+          rowIndex,
+          4,
+          true,
+          0,
+          Files.readAllBytes(
+              
Paths.get("/Users/ht/Downloads/2_1746622367063_8fb5ac8e21724140874195b60b878664")));
       session.insert(tablet);
       tablet.reset();
 
@@ -82,6 +107,8 @@ public class ObjectExample {
       e.printStackTrace();
     } catch (StatementExecutionException e) {
       e.printStackTrace();
+    } catch (IOException e) {
+      throw new RuntimeException(e);
     }
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index 6a3b0917422..4e248ed3e48 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -347,6 +347,8 @@ public class DataRegion implements IDataRegionForQuery {
   private ILoadDiskSelector ordinaryLoadDiskSelector;
   private ILoadDiskSelector pipeAndIoTV2LoadDiskSelector;
 
+  public static AtomicLong objectFileId = new AtomicLong(0);
+
   /**
    * Construct a database processor.
    *
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
index af49c77f110..4cfad7920eb 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.storageengine.dataregion.memtable;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.conf.CommonDescriptor;
+import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.exception.IoTDBRuntimeException;
 import org.apache.iotdb.commons.exception.MetadataException;
@@ -92,6 +93,7 @@ import org.apache.tsfile.file.metadata.IDeviceID;
 import org.apache.tsfile.file.metadata.TableSchema;
 import org.apache.tsfile.read.filter.basic.Filter;
 import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.utils.BytesUtils;
 import org.apache.tsfile.utils.Pair;
 import org.apache.tsfile.write.writer.RestorableTsFileIOWriter;
 import org.slf4j.Logger;
@@ -101,6 +103,8 @@ import java.io.File;
 import java.io.IOException;
 import java.nio.channels.ClosedChannelException;
 import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.StandardCopyOption;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -570,19 +574,46 @@ public class TsFileProcessor {
         for (int i = 0; i < fileNodeList.size(); i++) {
           FileNode fileNode = fileNodeList.get(i);
           String objectFileName =
-              insertTabletNode.getTimes()[i] + "-" + config.getDataNodeId() + 
"-" + 1 + ".bin";
-          File objectFile = new File(writer.getFile().getParent(), 
objectFileName);
-          try (ObjectWriter writer = new ObjectWriter(objectFile)) {
-            writer.write(fileNode.getContent());
+              insertTabletNode.getTimes()[i]
+                  + "-"
+                  + config.getDataNodeId()
+                  + "-"
+                  + DataRegion.objectFileId.incrementAndGet()
+                  + ".bin";
+          String objectTmpFileName = objectFileName + ".tmp";
+          File objectTmpFile = new File(writer.getFile().getParent(), 
objectTmpFileName);
+          try (ObjectWriter writer = new ObjectWriter(objectTmpFile)) {
+            writer.write(fileNode);
           } catch (Exception e) {
             throw new WriteProcessException(e);
           }
           // TODO:[OBJECT] write file node wal
           if (fileNode.isEOF()) {
-            relationalInsertTabletNode.getObjectColumn()[i] =
-                new Binary(
-                    (objectFile.getPath() + "," + objectFile.length())
-                        .getBytes(StandardCharsets.UTF_8));
+            File objectFile = new File(writer.getFile().getParent(), 
objectFileName);
+            try {
+              Files.move(
+                  objectTmpFile.toPath(), objectFile.toPath(), 
StandardCopyOption.REPLACE_EXISTING);
+            } catch (IOException e) {
+              throw new WriteProcessException(e);
+            }
+            String relativePathString =
+                (sequence
+                        ? IoTDBConstant.SEQUENCE_FOLDER_NAME
+                        : IoTDBConstant.UNSEQUENCE_FOLDER_NAME)
+                    + File.separator
+                    + dataRegionInfo.getDataRegion().getDatabaseName()
+                    + File.separator
+                    + dataRegionInfo.getDataRegion().getDataRegionId()
+                    + File.separator
+                    + tsFileResource.getTsFileID().timePartitionId
+                    + File.separator
+                    + objectFileName;
+            byte[] filePathBytes = 
relativePathString.getBytes(StandardCharsets.UTF_8);
+            byte[] valueBytes = new byte[filePathBytes.length + Long.BYTES];
+            System.arraycopy(
+                BytesUtils.longToBytes(objectFile.length()), 0, valueBytes, 0, 
Long.BYTES);
+            System.arraycopy(filePathBytes, 0, valueBytes, 4, 
filePathBytes.length);
+            relationalInsertTabletNode.getObjectColumn()[i] = new 
Binary(valueBytes);
           }
         }
       }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ObjectWriter.java 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ObjectWriter.java
index 070c7b5a149..c8fde82b166 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ObjectWriter.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ObjectWriter.java
@@ -19,6 +19,8 @@
 
 package org.apache.iotdb.db.utils;
 
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.FileNode;
+
 import org.apache.commons.io.FileUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -35,6 +37,8 @@ public class ObjectWriter implements AutoCloseable {
 
   private final FileOutputStream fos;
 
+  private final File file;
+
   public ObjectWriter(File filePath) throws FileNotFoundException {
     try {
       FileUtils.forceMkdir(filePath.getParentFile());
@@ -48,11 +52,15 @@ public class ObjectWriter implements AutoCloseable {
         throw new FileNotFoundException(e.getMessage());
       }
     }
+    file = filePath;
     fos = new FileOutputStream(filePath, true);
   }
 
-  public void write(byte[] content) throws IOException {
-    fos.write(content);
+  public void write(FileNode fileNode) throws IOException {
+    if (file.length() != fileNode.getOffset()) {
+      throw new IOException("the file length is not equal to the file offset");
+    }
+    fos.write(fileNode.getContent());
   }
 
   @Override

Reply via email to