This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch object_type
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/object_type by this push:
new af051df0313 validate offset & impl relativePath
af051df0313 is described below
commit af051df0313d70de22712af285435e67d60edbb9
Author: HTHou <[email protected]>
AuthorDate: Mon Jul 7 10:22:35 2025 +0800
validate offset & impl relativePath
---
.../main/java/org/apache/iotdb/ObjectExample.java | 31 +++++++++++++-
.../db/storageengine/dataregion/DataRegion.java | 2 +
.../dataregion/memtable/TsFileProcessor.java | 47 ++++++++++++++++++----
.../org/apache/iotdb/db/utils/ObjectWriter.java | 12 +++++-
4 files changed, 80 insertions(+), 12 deletions(-)
diff --git a/example/session/src/main/java/org/apache/iotdb/ObjectExample.java
b/example/session/src/main/java/org/apache/iotdb/ObjectExample.java
index 1aa286e7ac4..b040dc5313b 100644
--- a/example/session/src/main/java/org/apache/iotdb/ObjectExample.java
+++ b/example/session/src/main/java/org/apache/iotdb/ObjectExample.java
@@ -28,7 +28,9 @@ import org.apache.tsfile.enums.ColumnCategory;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.write.record.Tablet;
-import java.nio.charset.StandardCharsets;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -74,7 +76,30 @@ public class ObjectExample {
tablet.addValue(rowIndex, 1, "5");
tablet.addValue(rowIndex, 2, "3");
tablet.addValue(rowIndex, 3, 37.6F);
- tablet.addValue(rowIndex, 4, true, 0,
"123456".getBytes(StandardCharsets.UTF_8));
+ tablet.addValue(
+ rowIndex,
+ 4,
+ true,
+ 0,
+ Files.readAllBytes(
+
Paths.get("/Users/ht/Downloads/2_1746622362350_fa24aa15233f4e76bcda789a5771f43f")));
+ session.insert(tablet);
+ tablet.reset();
+
+ tablet = new Tablet("test1", columnNameList, dataTypeList,
columnTypeList, 1);
+ rowIndex = tablet.getRowSize();
+ tablet.addTimestamp(rowIndex, 2);
+ tablet.addValue(rowIndex, 0, "1");
+ tablet.addValue(rowIndex, 1, "5");
+ tablet.addValue(rowIndex, 2, "3");
+ tablet.addValue(rowIndex, 3, 37.6F);
+ tablet.addValue(
+ rowIndex,
+ 4,
+ true,
+ 0,
+ Files.readAllBytes(
+
Paths.get("/Users/ht/Downloads/2_1746622367063_8fb5ac8e21724140874195b60b878664")));
session.insert(tablet);
tablet.reset();
@@ -82,6 +107,8 @@ public class ObjectExample {
e.printStackTrace();
} catch (StatementExecutionException e) {
e.printStackTrace();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
}
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index 6a3b0917422..4e248ed3e48 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -347,6 +347,8 @@ public class DataRegion implements IDataRegionForQuery {
private ILoadDiskSelector ordinaryLoadDiskSelector;
private ILoadDiskSelector pipeAndIoTV2LoadDiskSelector;
+ public static AtomicLong objectFileId = new AtomicLong(0);
+
/**
* Construct a database processor.
*
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
index af49c77f110..4cfad7920eb 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.storageengine.dataregion.memtable;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.conf.CommonDescriptor;
+import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.exception.IoTDBRuntimeException;
import org.apache.iotdb.commons.exception.MetadataException;
@@ -92,6 +93,7 @@ import org.apache.tsfile.file.metadata.IDeviceID;
import org.apache.tsfile.file.metadata.TableSchema;
import org.apache.tsfile.read.filter.basic.Filter;
import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.utils.BytesUtils;
import org.apache.tsfile.utils.Pair;
import org.apache.tsfile.write.writer.RestorableTsFileIOWriter;
import org.slf4j.Logger;
@@ -101,6 +103,8 @@ import java.io.File;
import java.io.IOException;
import java.nio.channels.ClosedChannelException;
import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.StandardCopyOption;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -570,19 +574,46 @@ public class TsFileProcessor {
for (int i = 0; i < fileNodeList.size(); i++) {
FileNode fileNode = fileNodeList.get(i);
String objectFileName =
- insertTabletNode.getTimes()[i] + "-" + config.getDataNodeId() +
"-" + 1 + ".bin";
- File objectFile = new File(writer.getFile().getParent(),
objectFileName);
- try (ObjectWriter writer = new ObjectWriter(objectFile)) {
- writer.write(fileNode.getContent());
+ insertTabletNode.getTimes()[i]
+ + "-"
+ + config.getDataNodeId()
+ + "-"
+ + DataRegion.objectFileId.incrementAndGet()
+ + ".bin";
+ String objectTmpFileName = objectFileName + ".tmp";
+ File objectTmpFile = new File(writer.getFile().getParent(),
objectTmpFileName);
+ try (ObjectWriter writer = new ObjectWriter(objectTmpFile)) {
+ writer.write(fileNode);
} catch (Exception e) {
throw new WriteProcessException(e);
}
// TODO:[OBJECT] write file node wal
if (fileNode.isEOF()) {
- relationalInsertTabletNode.getObjectColumn()[i] =
- new Binary(
- (objectFile.getPath() + "," + objectFile.length())
- .getBytes(StandardCharsets.UTF_8));
+ File objectFile = new File(writer.getFile().getParent(),
objectFileName);
+ try {
+ Files.move(
+ objectTmpFile.toPath(), objectFile.toPath(),
StandardCopyOption.REPLACE_EXISTING);
+ } catch (IOException e) {
+ throw new WriteProcessException(e);
+ }
+ String relativePathString =
+ (sequence
+ ? IoTDBConstant.SEQUENCE_FOLDER_NAME
+ : IoTDBConstant.UNSEQUENCE_FOLDER_NAME)
+ + File.separator
+ + dataRegionInfo.getDataRegion().getDatabaseName()
+ + File.separator
+ + dataRegionInfo.getDataRegion().getDataRegionId()
+ + File.separator
+ + tsFileResource.getTsFileID().timePartitionId
+ + File.separator
+ + objectFileName;
+ byte[] filePathBytes =
relativePathString.getBytes(StandardCharsets.UTF_8);
+ byte[] valueBytes = new byte[filePathBytes.length + Long.BYTES];
+ System.arraycopy(
+ BytesUtils.longToBytes(objectFile.length()), 0, valueBytes, 0,
Long.BYTES);
+ System.arraycopy(filePathBytes, 0, valueBytes, 4,
filePathBytes.length);
+ relationalInsertTabletNode.getObjectColumn()[i] = new
Binary(valueBytes);
}
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ObjectWriter.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ObjectWriter.java
index 070c7b5a149..c8fde82b166 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ObjectWriter.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ObjectWriter.java
@@ -19,6 +19,8 @@
package org.apache.iotdb.db.utils;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.FileNode;
+
import org.apache.commons.io.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -35,6 +37,8 @@ public class ObjectWriter implements AutoCloseable {
private final FileOutputStream fos;
+ private final File file;
+
public ObjectWriter(File filePath) throws FileNotFoundException {
try {
FileUtils.forceMkdir(filePath.getParentFile());
@@ -48,11 +52,15 @@ public class ObjectWriter implements AutoCloseable {
throw new FileNotFoundException(e.getMessage());
}
}
+ file = filePath;
fos = new FileOutputStream(filePath, true);
}
- public void write(byte[] content) throws IOException {
- fos.write(content);
+ public void write(FileNode fileNode) throws IOException {
+ if (file.length() != fileNode.getOffset()) {
+ throw new IOException("the file length is not equal to the file offset");
+ }
+ fos.write(fileNode.getContent());
}
@Override