This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch object_type
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/object_type by this push:
new 56b4a71df60 finally iotv1 worked
56b4a71df60 is described below
commit 56b4a71df6064ba10162792192760511f5cafd41
Author: HTHou <[email protected]>
AuthorDate: Tue Jul 8 17:41:43 2025 +0800
finally iotv1 worked
---
.../plan/planner/plan/node/PlanNodeType.java | 6 +-
.../planner/plan/node/write/InsertTabletNode.java | 6 --
.../plan/planner/plan/node/write/ObjectNode.java | 49 ++++++++++++--
.../db/storageengine/dataregion/DataRegion.java | 2 +
.../dataregion/memtable/TsFileProcessor.java | 76 ----------------------
.../storageengine/dataregion/wal/node/WALNode.java | 4 +-
6 files changed, 51 insertions(+), 92 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
index 421a5396807..98936da12bb 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
@@ -386,9 +386,7 @@ public enum PlanNodeType {
case 2003:
return RelationalDeleteDataNode.deserializeFromWAL(buffer);
case 2004:
- // TODO haonan only be called by follower, should call normal
dserialize instead from
- // deserializeFromWAL
- return ObjectNode.deserializeFromWAL(buffer);
+ return ObjectNode.deserialize(buffer);
default:
throw new IllegalArgumentException("Invalid node type: " + nodeType);
}
@@ -705,6 +703,8 @@ public enum PlanNodeType {
return RelationalInsertRowsNode.deserialize(buffer);
case 2003:
return RelationalDeleteDataNode.deserialize(buffer);
+ case 2004:
+ return ObjectNode.deserialize(buffer);
default:
throw new IllegalArgumentException("Invalid node type: " + nodeType);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
index 5b495e6c9e3..be67555ac2e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
@@ -220,12 +220,6 @@ public class InsertTabletNode extends InsertNode
implements WALEntryValue {
final Map<IDeviceID, PartitionSplitInfo> deviceIDSplitInfoMap =
collectSplitRanges();
final Map<TRegionReplicaSet, List<Integer>> splitMap =
splitByReplicaSet(deviceIDSplitInfoMap, analysis);
-
- // TODO haonan should generate ObjectNode for each object point
- // TODO haonan normal serialize method should contain relativePath,
offset, length, eof,
- // fileContent
- // ObjectPlanNode(relativePath, offset, eof, length, fileContent),
ObjectNode, ObjectNode,
- // InsertNode()
return doSplit(splitMap);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/ObjectNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/ObjectNode.java
index 6b989ca564a..9b583b56be2 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/ObjectNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/ObjectNode.java
@@ -22,17 +22,21 @@ package
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.commons.consensus.index.ProgressIndex;
import org.apache.iotdb.commons.exception.ObjectFileNotExist;
+import
org.apache.iotdb.commons.exception.runtime.SerializationRunTimeException;
import org.apache.iotdb.db.queryengine.plan.analyze.IAnalysis;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.WritePlanNode;
+import org.apache.iotdb.db.storageengine.dataregion.memtable.TsFileProcessor;
import
org.apache.iotdb.db.storageengine.dataregion.wal.buffer.IWALByteBufferView;
+import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntryType;
import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntryValue;
import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALWriteUtils;
import org.apache.iotdb.db.storageengine.rescon.disk.TierManager;
+import org.apache.tsfile.utils.PublicBAOS;
import org.apache.tsfile.utils.ReadWriteIOUtils;
import java.io.DataInputStream;
@@ -98,7 +102,6 @@ public class ObjectNode extends SearchNode implements
WALEntryValue {
@Override
public void serializeToWAL(IWALByteBufferView buffer) {
- // TODO haonan only need relativePath, offset, length, eof
buffer.putShort(getType().getNodeType());
buffer.putLong(searchIndex);
buffer.put((byte) (isEOF ? 1 : 0));
@@ -118,17 +121,13 @@ public class ObjectNode extends SearchNode implements
WALEntryValue {
}
public static ObjectNode deserializeFromWAL(DataInputStream stream) throws
IOException {
- // TODO haonan only be called in recovery, should only deserialize
relativePath, offset, eof,
- // length
long searchIndex = stream.readLong();
boolean isEOF = stream.readByte() == 1;
long offset = stream.readLong();
String filePath = ReadWriteIOUtils.readString(stream);
int contentLength = stream.readInt();
-
ObjectNode objectNode = new ObjectNode(isEOF, offset, contentLength,
filePath);
objectNode.setSearchIndex(searchIndex);
-
return objectNode;
}
@@ -141,7 +140,7 @@ public class ObjectNode extends SearchNode implements
WALEntryValue {
int contentLength = buffer.getInt();
byte[] contents = new byte[contentLength];
if (objectFile.isPresent()) {
- try (RandomAccessFile raf = new RandomAccessFile(filePath, "r")) {
+ try (RandomAccessFile raf = new RandomAccessFile(objectFile.get(), "r"))
{
raf.seek(offset);
raf.read(contents);
} catch (IOException e) {
@@ -156,6 +155,15 @@ public class ObjectNode extends SearchNode implements
WALEntryValue {
return objectNode;
}
+ public static ObjectNode deserialize(ByteBuffer byteBuffer) {
+ boolean isEoF = ReadWriteIOUtils.readBool(byteBuffer);
+ long offset = ReadWriteIOUtils.readLong(byteBuffer);
+ String filePath = ReadWriteIOUtils.readString(byteBuffer);
+ int contentLength = ReadWriteIOUtils.readInt(byteBuffer);
+ byte[] content = ReadWriteIOUtils.readBytes(byteBuffer, contentLength);
+ return new ObjectNode(isEoF, offset, content, filePath);
+ }
+
@Override
public SearchNode merge(List<SearchNode> searchNodes) {
if (searchNodes.size() == 1) {
@@ -229,6 +237,35 @@ public class ObjectNode extends SearchNode implements
WALEntryValue {
stream.write(content);
}
+ public ByteBuffer serialize() {
+ try (PublicBAOS byteArrayOutputStream = new PublicBAOS();
+ DataOutputStream stream = new DataOutputStream(byteArrayOutputStream))
{
+ ReadWriteIOUtils.write(WALEntryType.OBJECT_FILE_NODE.getCode(), stream);
+ ReadWriteIOUtils.write((long) TsFileProcessor.MEMTABLE_NOT_EXIST,
stream);
+ ReadWriteIOUtils.write(getType().getNodeType(), stream);
+ ReadWriteIOUtils.write(isEOF, stream);
+ ReadWriteIOUtils.write(offset, stream);
+ ReadWriteIOUtils.write(filePath, stream);
+ ReadWriteIOUtils.write(contentLength, stream);
+ Optional<File> objectFile =
TierManager.getInstance().getAbsoluteObjectFilePath(filePath);
+ byte[] contents = new byte[contentLength];
+ if (objectFile.isPresent()) {
+ try (RandomAccessFile raf = new RandomAccessFile(objectFile.get(),
"r")) {
+ raf.seek(offset);
+ raf.read(contents);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ } else {
+ throw new ObjectFileNotExist(filePath);
+ }
+ stream.write(contents);
+ return ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0,
byteArrayOutputStream.size());
+ } catch (IOException e) {
+ throw new SerializationRunTimeException(e);
+ }
+ }
+
@Override
public PlanNodeType getType() {
return PlanNodeType.OBJECT_FILE_NODE;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index f80a853ad6d..8396941c480 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -3093,6 +3093,8 @@ public class DataRegion implements IDataRegionForQuery {
Files.move(
objectTmpFile.toPath(), objectFile.toPath(),
StandardCopyOption.REPLACE_EXISTING);
}
+ getWALNode()
+ .ifPresent(walNode ->
walNode.log(TsFileProcessor.MEMTABLE_NOT_EXIST, objectNode));
} finally {
writeUnlock();
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
index 878eda4d071..b2c96a87f3b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
@@ -47,7 +47,6 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNod
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalDeleteDataNode;
-import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalInsertTabletNode;
import org.apache.iotdb.db.schemaengine.schemaregion.utils.ResourceByPathUtils;
import org.apache.iotdb.db.service.metrics.WritingMetrics;
import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
@@ -71,13 +70,11 @@ import
org.apache.iotdb.db.storageengine.dataregion.wal.WALManager;
import org.apache.iotdb.db.storageengine.dataregion.wal.node.IWALNode;
import
org.apache.iotdb.db.storageengine.dataregion.wal.utils.listener.AbstractResultListener;
import
org.apache.iotdb.db.storageengine.dataregion.wal.utils.listener.WALFlushListener;
-import org.apache.iotdb.db.storageengine.rescon.disk.TierManager;
import org.apache.iotdb.db.storageengine.rescon.memory.MemTableManager;
import org.apache.iotdb.db.storageengine.rescon.memory.PrimitiveArrayManager;
import org.apache.iotdb.db.storageengine.rescon.memory.SystemInfo;
import org.apache.iotdb.db.utils.MemUtils;
import org.apache.iotdb.db.utils.ModificationUtils;
-import org.apache.iotdb.db.utils.ObjectWriter;
import org.apache.iotdb.db.utils.datastructure.AlignedTVList;
import org.apache.iotdb.db.utils.datastructure.TVList;
import org.apache.iotdb.rpc.RpcUtils;
@@ -90,23 +87,16 @@ import org.apache.tsfile.file.metadata.ChunkMetadata;
import org.apache.tsfile.file.metadata.IChunkMetadata;
import org.apache.tsfile.file.metadata.IDeviceID;
import org.apache.tsfile.file.metadata.TableSchema;
-import org.apache.tsfile.fileSystem.FSFactoryProducer;
import org.apache.tsfile.read.filter.basic.Filter;
import org.apache.tsfile.utils.Binary;
-import org.apache.tsfile.utils.BytesUtils;
import org.apache.tsfile.utils.Pair;
-import org.apache.tsfile.utils.ReadWriteIOUtils;
import org.apache.tsfile.write.writer.RestorableTsFileIOWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
-import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
-import java.nio.charset.StandardCharsets;
-import java.nio.file.Files;
-import java.nio.file.StandardCopyOption;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -2341,70 +2331,4 @@ public class TsFileProcessor {
"{}: {} release flushQueryLock", dataRegionName,
tsFileResource.getTsFile().getName());
}
}
-
- private void handleWriteObject(
- InsertTabletNode insertTabletNode, List<int[]> rangeList, TSStatus[]
results)
- throws WriteProcessException {
- if (insertTabletNode instanceof RelationalInsertTabletNode) {
- RelationalInsertTabletNode node = (RelationalInsertTabletNode)
insertTabletNode;
- for (int j = 0; j < node.getColumns().length; j++) {
- if (node.getDataType(j) == TSDataType.OBJECT) {
- Binary[] objectColumn = node.getObjectColumns().get(j);
- for (int i = 0; i < node.getTimes().length; i++) {
- ByteBuffer buffer =
ByteBuffer.wrap(objectColumn[i].getValues()).duplicate();
- String relativePathString = ReadWriteIOUtils.readString(buffer);
- boolean isEoF = ReadWriteIOUtils.readBool(buffer);
- long offset = ReadWriteIOUtils.readLong(buffer);
- byte[] content = ReadWriteIOUtils.readBytes(buffer,
buffer.remaining());
- String relativeTmpPathString = relativePathString + ".tmp";
- String objectFileDir;
- File objectTmpFile;
- try {
- objectFileDir =
TierManager.getInstance().getNextFolderForObjectFile();
- objectTmpFile =
- FSFactoryProducer.getFSFactory().getFile(objectFileDir,
relativeTmpPathString);
- try (ObjectWriter writer = new ObjectWriter(objectTmpFile)) {
- writer.write(isEoF, offset, content);
- }
- } catch (Exception e) {
- results[i] =
RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR, e.getMessage());
- throw new WriteProcessException(e);
- }
- // TODO:[OBJECT] write file node wal
- if (isEoF) {
- File objectFile =
- FSFactoryProducer.getFSFactory().getFile(objectFileDir,
relativePathString);
- try {
- Files.move(
- objectTmpFile.toPath(),
- objectFile.toPath(),
- StandardCopyOption.REPLACE_EXISTING);
- } catch (IOException e) {
- results[i] =
RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR, e.getMessage());
- throw new WriteProcessException(e);
- }
- byte[] filePathBytes =
relativePathString.getBytes(StandardCharsets.UTF_8);
- byte[] valueBytes = new byte[filePathBytes.length + Long.BYTES];
- System.arraycopy(
- BytesUtils.longToBytes(objectFile.length()), 0, valueBytes,
0, Long.BYTES);
- System.arraycopy(filePathBytes, 0, valueBytes, Long.BYTES,
filePathBytes.length);
- }
-
- // WALFlushListener walFlushListener;
- // try {
- // walFlushListener =
walNode.log(workMemTable.getMemTableId(), fileNode);
- // if (walFlushListener.waitForResult() ==
WALFlushListener.Status.FAILURE)
- // {
- // throw walFlushListener.getCause();
- // }
- // } catch (Exception e) {
- // results[i] =
RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR,
- // e.getMessage());
- // throw new WriteProcessException(e);
- // }
- }
- }
- }
- }
- }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java
index c2af1493115..7833785599f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java
@@ -786,7 +786,9 @@ public class WALNode implements IWALNode {
// need to add WALEntryType + memtableId + relativePath,
offset, eof, length +
// filecontent
// need to add IoTConsensusRequest instead of FileNode
- tmpNodes.get().add((ObjectNode) walEntry.getValue());
+ tmpNodes
+ .get()
+ .add(new IoTConsensusRequest(((ObjectNode)
walEntry.getValue()).serialize()));
} else {
tmpNodes.get().add(new IoTConsensusRequest(buffer));
}