This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch object_type
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/object_type by this push:
new 03d3c26ae27 fix iotv1 bug
new 344c234852f Merge branch 'object_type' of github.com:apache/iotdb into
object_type
03d3c26ae27 is described below
commit 03d3c26ae277385da69e76fda2baa9149ce49eaa
Author: HTHou <[email protected]>
AuthorDate: Thu Jul 10 12:11:57 2025 +0800
fix iotv1 bug
---
.../plan/planner/plan/node/write/ObjectNode.java | 53 ++++++++++++++--------
.../storageengine/dataregion/wal/node/WALNode.java | 8 ++--
2 files changed, 39 insertions(+), 22 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/ObjectNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/ObjectNode.java
index 9f06070b9ff..27d06f55819 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/ObjectNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/ObjectNode.java
@@ -38,6 +38,8 @@ import
org.apache.iotdb.db.storageengine.rescon.disk.TierManager;
import org.apache.tsfile.utils.PublicBAOS;
import org.apache.tsfile.utils.ReadWriteIOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.DataInputStream;
import java.io.DataOutputStream;
@@ -50,6 +52,8 @@ import java.util.Optional;
public class ObjectNode extends SearchNode implements WALEntryValue {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(ObjectNode.class);
+
private final boolean isEOF;
private final long offset;
@@ -269,33 +273,39 @@ public class ObjectNode extends SearchNode implements
WALEntryValue {
ReadWriteIOUtils.write(WALEntryType.OBJECT_FILE_NODE.getCode(), stream);
ReadWriteIOUtils.write((long) TsFileProcessor.MEMTABLE_NOT_EXIST,
stream);
ReadWriteIOUtils.write(getType().getNodeType(), stream);
- ReadWriteIOUtils.write(isEOF, stream);
- ReadWriteIOUtils.write(offset, stream);
- ReadWriteIOUtils.write(filePath, stream);
- ReadWriteIOUtils.write(contentLength, stream);
byte[] contents = new byte[contentLength];
- Optional<File> objectFile =
TierManager.getInstance().getAbsoluteObjectFilePath(filePath);
- if (objectFile.isPresent()) {
- try (RandomAccessFile raf = new RandomAccessFile(objectFile.get(),
"r")) {
- raf.seek(offset);
- raf.read(contents);
- } catch (IOException e) {
- throw new RuntimeException(e);
+ boolean readSuccess = false;
+ for (int i = 0; i < 2; i++) {
+ Optional<File> objectFile =
TierManager.getInstance().getAbsoluteObjectFilePath(filePath);
+ if (objectFile.isPresent()) {
+ try {
+ readContentFromFile(objectFile.get(), contents);
+ readSuccess = true;
+ } catch (IOException e) {
+ LOGGER.error("Error when read object file {}.", objectFile.get(),
e);
+ }
+ if (readSuccess) {
+ break;
+ }
}
- } else {
Optional<File> objectTmpFile =
TierManager.getInstance().getAbsoluteObjectFilePath(filePath +
".tmp");
if (objectTmpFile.isPresent()) {
- try (RandomAccessFile raf = new
RandomAccessFile(objectTmpFile.get(), "r")) {
- raf.seek(offset);
- raf.read(contents);
+ try {
+ readContentFromFile(objectTmpFile.get(), contents);
+ readSuccess = true;
} catch (IOException e) {
- throw new RuntimeException(e);
+ LOGGER.error("Error when read tmp object file {}.",
objectTmpFile.get(), e);
+ }
+ if (readSuccess) {
+ break;
}
- } else {
- throw new ObjectFileNotExist(filePath);
}
}
+ ReadWriteIOUtils.write(readSuccess && isEOF, stream);
+ ReadWriteIOUtils.write(offset, stream);
+ ReadWriteIOUtils.write(filePath, stream);
+ ReadWriteIOUtils.write(contentLength, stream);
stream.write(contents);
return ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0,
byteArrayOutputStream.size());
} catch (IOException e) {
@@ -303,6 +313,13 @@ public class ObjectNode extends SearchNode implements
WALEntryValue {
}
}
+ private void readContentFromFile(File file, byte[] contents) throws
IOException {
+ try (RandomAccessFile raf = new RandomAccessFile(file, "r")) {
+ raf.seek(offset);
+ raf.read(contents);
+ }
+ }
+
@Override
public PlanNodeType getType() {
return PlanNodeType.OBJECT_FILE_NODE;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java
index 7833785599f..ae5bb354bec 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java
@@ -781,11 +781,11 @@ public class WALNode implements IWALNode {
WALEntry walEntry =
WALEntry.deserialize(
new DataInputStream(new
ByteArrayInputStream(buffer.array())));
- // TODO haonan only be called by leader read from wal
- // only have relativePath, offset, eof, length
+ // only be called by leader read from wal
+ // wal only has relativePath, offset, eof, length
// need to add WALEntryType + memtableId + relativePath,
offset, eof, length +
- // filecontent
- // need to add IoTConsensusRequest instead of FileNode
+ // content
+ // need to add IoTConsensusRequest instead of ObjectNode
tmpNodes
.get()
.add(new IoTConsensusRequest(((ObjectNode)
walEntry.getValue()).serialize()));