This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch object_type
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/object_type by this push:
     new 56b4a71df60 finally iotv1 worked
56b4a71df60 is described below

commit 56b4a71df6064ba10162792192760511f5cafd41
Author: HTHou <[email protected]>
AuthorDate: Tue Jul 8 17:41:43 2025 +0800

    finally iotv1 worked
---
 .../plan/planner/plan/node/PlanNodeType.java       |  6 +-
 .../planner/plan/node/write/InsertTabletNode.java  |  6 --
 .../plan/planner/plan/node/write/ObjectNode.java   | 49 ++++++++++++--
 .../db/storageengine/dataregion/DataRegion.java    |  2 +
 .../dataregion/memtable/TsFileProcessor.java       | 76 ----------------------
 .../storageengine/dataregion/wal/node/WALNode.java |  4 +-
 6 files changed, 51 insertions(+), 92 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
index 421a5396807..98936da12bb 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
@@ -386,9 +386,7 @@ public enum PlanNodeType {
       case 2003:
         return RelationalDeleteDataNode.deserializeFromWAL(buffer);
       case 2004:
-        // TODO haonan only be called by follower, should call normal 
dserialize instead from
-        // deserializeFromWAL
-        return ObjectNode.deserializeFromWAL(buffer);
+        return ObjectNode.deserialize(buffer);
       default:
         throw new IllegalArgumentException("Invalid node type: " + nodeType);
     }
@@ -705,6 +703,8 @@ public enum PlanNodeType {
         return RelationalInsertRowsNode.deserialize(buffer);
       case 2003:
         return RelationalDeleteDataNode.deserialize(buffer);
+      case 2004:
+        return ObjectNode.deserialize(buffer);
       default:
         throw new IllegalArgumentException("Invalid node type: " + nodeType);
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
index 5b495e6c9e3..be67555ac2e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
@@ -220,12 +220,6 @@ public class InsertTabletNode extends InsertNode 
implements WALEntryValue {
     final Map<IDeviceID, PartitionSplitInfo> deviceIDSplitInfoMap = 
collectSplitRanges();
     final Map<TRegionReplicaSet, List<Integer>> splitMap =
         splitByReplicaSet(deviceIDSplitInfoMap, analysis);
-
-    // TODO haonan should generate ObjectNode for each object point
-    // TODO haonan normal serialize method should contain relativePath, 
offset, length, eof,
-    // fileContent
-    // ObjectPlanNode(relativePath, offset, eof, length, fileContent), 
ObjectNode, ObjectNode,
-    // InsertNode()
     return doSplit(splitMap);
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/ObjectNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/ObjectNode.java
index 6b989ca564a..9b583b56be2 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/ObjectNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/ObjectNode.java
@@ -22,17 +22,21 @@ package 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write;
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.commons.consensus.index.ProgressIndex;
 import org.apache.iotdb.commons.exception.ObjectFileNotExist;
+import 
org.apache.iotdb.commons.exception.runtime.SerializationRunTimeException;
 import org.apache.iotdb.db.queryengine.plan.analyze.IAnalysis;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.WritePlanNode;
+import org.apache.iotdb.db.storageengine.dataregion.memtable.TsFileProcessor;
 import 
org.apache.iotdb.db.storageengine.dataregion.wal.buffer.IWALByteBufferView;
+import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntryType;
 import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntryValue;
 import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALWriteUtils;
 import org.apache.iotdb.db.storageengine.rescon.disk.TierManager;
 
+import org.apache.tsfile.utils.PublicBAOS;
 import org.apache.tsfile.utils.ReadWriteIOUtils;
 
 import java.io.DataInputStream;
@@ -98,7 +102,6 @@ public class ObjectNode extends SearchNode implements 
WALEntryValue {
 
   @Override
   public void serializeToWAL(IWALByteBufferView buffer) {
-    // TODO haonan only need relativePath, offset, length, eof
     buffer.putShort(getType().getNodeType());
     buffer.putLong(searchIndex);
     buffer.put((byte) (isEOF ? 1 : 0));
@@ -118,17 +121,13 @@ public class ObjectNode extends SearchNode implements 
WALEntryValue {
   }
 
   public static ObjectNode deserializeFromWAL(DataInputStream stream) throws 
IOException {
-    // TODO haonan only be called in recovery, should only deserialize 
relativePath, offset, eof,
-    // length
     long searchIndex = stream.readLong();
     boolean isEOF = stream.readByte() == 1;
     long offset = stream.readLong();
     String filePath = ReadWriteIOUtils.readString(stream);
     int contentLength = stream.readInt();
-
     ObjectNode objectNode = new ObjectNode(isEOF, offset, contentLength, 
filePath);
     objectNode.setSearchIndex(searchIndex);
-
     return objectNode;
   }
 
@@ -141,7 +140,7 @@ public class ObjectNode extends SearchNode implements 
WALEntryValue {
     int contentLength = buffer.getInt();
     byte[] contents = new byte[contentLength];
     if (objectFile.isPresent()) {
-      try (RandomAccessFile raf = new RandomAccessFile(filePath, "r")) {
+      try (RandomAccessFile raf = new RandomAccessFile(objectFile.get(), "r")) 
{
         raf.seek(offset);
         raf.read(contents);
       } catch (IOException e) {
@@ -156,6 +155,15 @@ public class ObjectNode extends SearchNode implements 
WALEntryValue {
     return objectNode;
   }
 
+  public static ObjectNode deserialize(ByteBuffer byteBuffer) {
+    boolean isEoF = ReadWriteIOUtils.readBool(byteBuffer);
+    long offset = ReadWriteIOUtils.readLong(byteBuffer);
+    String filePath = ReadWriteIOUtils.readString(byteBuffer);
+    int contentLength = ReadWriteIOUtils.readInt(byteBuffer);
+    byte[] content = ReadWriteIOUtils.readBytes(byteBuffer, contentLength);
+    return new ObjectNode(isEoF, offset, content, filePath);
+  }
+
   @Override
   public SearchNode merge(List<SearchNode> searchNodes) {
     if (searchNodes.size() == 1) {
@@ -229,6 +237,35 @@ public class ObjectNode extends SearchNode implements 
WALEntryValue {
     stream.write(content);
   }
 
+  public ByteBuffer serialize() {
+    try (PublicBAOS byteArrayOutputStream = new PublicBAOS();
+        DataOutputStream stream = new DataOutputStream(byteArrayOutputStream)) 
{
+      ReadWriteIOUtils.write(WALEntryType.OBJECT_FILE_NODE.getCode(), stream);
+      ReadWriteIOUtils.write((long) TsFileProcessor.MEMTABLE_NOT_EXIST, 
stream);
+      ReadWriteIOUtils.write(getType().getNodeType(), stream);
+      ReadWriteIOUtils.write(isEOF, stream);
+      ReadWriteIOUtils.write(offset, stream);
+      ReadWriteIOUtils.write(filePath, stream);
+      ReadWriteIOUtils.write(contentLength, stream);
+      Optional<File> objectFile = 
TierManager.getInstance().getAbsoluteObjectFilePath(filePath);
+      byte[] contents = new byte[contentLength];
+      if (objectFile.isPresent()) {
+        try (RandomAccessFile raf = new RandomAccessFile(objectFile.get(), 
"r")) {
+          raf.seek(offset);
+          raf.read(contents);
+        } catch (IOException e) {
+          throw new RuntimeException(e);
+        }
+      } else {
+        throw new ObjectFileNotExist(filePath);
+      }
+      stream.write(contents);
+      return ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, 
byteArrayOutputStream.size());
+    } catch (IOException e) {
+      throw new SerializationRunTimeException(e);
+    }
+  }
+
   @Override
   public PlanNodeType getType() {
     return PlanNodeType.OBJECT_FILE_NODE;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index f80a853ad6d..8396941c480 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -3093,6 +3093,8 @@ public class DataRegion implements IDataRegionForQuery {
         Files.move(
             objectTmpFile.toPath(), objectFile.toPath(), 
StandardCopyOption.REPLACE_EXISTING);
       }
+      getWALNode()
+          .ifPresent(walNode -> 
walNode.log(TsFileProcessor.MEMTABLE_NOT_EXIST, objectNode));
     } finally {
       writeUnlock();
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
index 878eda4d071..b2c96a87f3b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
@@ -47,7 +47,6 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNod
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalDeleteDataNode;
-import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalInsertTabletNode;
 import org.apache.iotdb.db.schemaengine.schemaregion.utils.ResourceByPathUtils;
 import org.apache.iotdb.db.service.metrics.WritingMetrics;
 import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
@@ -71,13 +70,11 @@ import 
org.apache.iotdb.db.storageengine.dataregion.wal.WALManager;
 import org.apache.iotdb.db.storageengine.dataregion.wal.node.IWALNode;
 import 
org.apache.iotdb.db.storageengine.dataregion.wal.utils.listener.AbstractResultListener;
 import 
org.apache.iotdb.db.storageengine.dataregion.wal.utils.listener.WALFlushListener;
-import org.apache.iotdb.db.storageengine.rescon.disk.TierManager;
 import org.apache.iotdb.db.storageengine.rescon.memory.MemTableManager;
 import org.apache.iotdb.db.storageengine.rescon.memory.PrimitiveArrayManager;
 import org.apache.iotdb.db.storageengine.rescon.memory.SystemInfo;
 import org.apache.iotdb.db.utils.MemUtils;
 import org.apache.iotdb.db.utils.ModificationUtils;
-import org.apache.iotdb.db.utils.ObjectWriter;
 import org.apache.iotdb.db.utils.datastructure.AlignedTVList;
 import org.apache.iotdb.db.utils.datastructure.TVList;
 import org.apache.iotdb.rpc.RpcUtils;
@@ -90,23 +87,16 @@ import org.apache.tsfile.file.metadata.ChunkMetadata;
 import org.apache.tsfile.file.metadata.IChunkMetadata;
 import org.apache.tsfile.file.metadata.IDeviceID;
 import org.apache.tsfile.file.metadata.TableSchema;
-import org.apache.tsfile.fileSystem.FSFactoryProducer;
 import org.apache.tsfile.read.filter.basic.Filter;
 import org.apache.tsfile.utils.Binary;
-import org.apache.tsfile.utils.BytesUtils;
 import org.apache.tsfile.utils.Pair;
-import org.apache.tsfile.utils.ReadWriteIOUtils;
 import org.apache.tsfile.write.writer.RestorableTsFileIOWriter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.nio.channels.ClosedChannelException;
-import java.nio.charset.StandardCharsets;
-import java.nio.file.Files;
-import java.nio.file.StandardCopyOption;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -2341,70 +2331,4 @@ public class TsFileProcessor {
           "{}: {} release flushQueryLock", dataRegionName, 
tsFileResource.getTsFile().getName());
     }
   }
-
-  private void handleWriteObject(
-      InsertTabletNode insertTabletNode, List<int[]> rangeList, TSStatus[] 
results)
-      throws WriteProcessException {
-    if (insertTabletNode instanceof RelationalInsertTabletNode) {
-      RelationalInsertTabletNode node = (RelationalInsertTabletNode) 
insertTabletNode;
-      for (int j = 0; j < node.getColumns().length; j++) {
-        if (node.getDataType(j) == TSDataType.OBJECT) {
-          Binary[] objectColumn = node.getObjectColumns().get(j);
-          for (int i = 0; i < node.getTimes().length; i++) {
-            ByteBuffer buffer = 
ByteBuffer.wrap(objectColumn[i].getValues()).duplicate();
-            String relativePathString = ReadWriteIOUtils.readString(buffer);
-            boolean isEoF = ReadWriteIOUtils.readBool(buffer);
-            long offset = ReadWriteIOUtils.readLong(buffer);
-            byte[] content = ReadWriteIOUtils.readBytes(buffer, 
buffer.remaining());
-            String relativeTmpPathString = relativePathString + ".tmp";
-            String objectFileDir;
-            File objectTmpFile;
-            try {
-              objectFileDir = 
TierManager.getInstance().getNextFolderForObjectFile();
-              objectTmpFile =
-                  FSFactoryProducer.getFSFactory().getFile(objectFileDir, 
relativeTmpPathString);
-              try (ObjectWriter writer = new ObjectWriter(objectTmpFile)) {
-                writer.write(isEoF, offset, content);
-              }
-            } catch (Exception e) {
-              results[i] = 
RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR, e.getMessage());
-              throw new WriteProcessException(e);
-            }
-            // TODO:[OBJECT] write file node wal
-            if (isEoF) {
-              File objectFile =
-                  FSFactoryProducer.getFSFactory().getFile(objectFileDir, 
relativePathString);
-              try {
-                Files.move(
-                    objectTmpFile.toPath(),
-                    objectFile.toPath(),
-                    StandardCopyOption.REPLACE_EXISTING);
-              } catch (IOException e) {
-                results[i] = 
RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR, e.getMessage());
-                throw new WriteProcessException(e);
-              }
-              byte[] filePathBytes = 
relativePathString.getBytes(StandardCharsets.UTF_8);
-              byte[] valueBytes = new byte[filePathBytes.length + Long.BYTES];
-              System.arraycopy(
-                  BytesUtils.longToBytes(objectFile.length()), 0, valueBytes, 
0, Long.BYTES);
-              System.arraycopy(filePathBytes, 0, valueBytes, Long.BYTES, 
filePathBytes.length);
-            }
-
-            //            WALFlushListener walFlushListener;
-            //            try {
-            //              walFlushListener = 
walNode.log(workMemTable.getMemTableId(), fileNode);
-            //              if (walFlushListener.waitForResult() == 
WALFlushListener.Status.FAILURE)
-            // {
-            //                throw walFlushListener.getCause();
-            //              }
-            //            } catch (Exception e) {
-            //              results[i] = 
RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR,
-            // e.getMessage());
-            //              throw new WriteProcessException(e);
-            //            }
-          }
-        }
-      }
-    }
-  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java
index c2af1493115..7833785599f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java
@@ -786,7 +786,9 @@ public class WALNode implements IWALNode {
                   // need to add WALEntryType + memtableId + relativePath, 
offset, eof, length +
                   // filecontent
                   // need to add IoTConsensusRequest instead of FileNode
-                  tmpNodes.get().add((ObjectNode) walEntry.getValue());
+                  tmpNodes
+                      .get()
+                      .add(new IoTConsensusRequest(((ObjectNode) 
walEntry.getValue()).serialize()));
                 } else {
                   tmpNodes.get().add(new IoTConsensusRequest(buffer));
                 }

Reply via email to