This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch adapt_wal_1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit a157df0708e0f7c4aaa2d65bbcce2d979aec0d6b
Author: HTHou <[email protected]>
AuthorDate: Tue Aug 13 17:10:05 2024 +0800

    Adapt wal with memtable snapshot from v1.3 or lower version
---
 .../dataregion/memtable/AbstractMemTable.java      | 47 ++++++++++++++++++++++
 .../dataregion/wal/buffer/WALEntry.java            |  3 ++
 .../dataregion/wal/buffer/WALEntryType.java        |  7 +++-
 .../file/UnsealedTsFileRecoverPerformer.java       |  1 +
 4 files changed, 56 insertions(+), 2 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
index 320b3a12efd..b1098f9858e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
@@ -34,6 +34,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.WriteProcessException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.queryengine.execution.fragment.QueryContext;
+import 
org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeDevicePathCache;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
 import org.apache.iotdb.db.schemaengine.schemaregion.utils.ResourceByPathUtils;
@@ -1096,6 +1097,37 @@ public abstract class AbstractMemTable implements 
IMemTable {
     }
   }
 
+  public void deserializeFromOldMemTableSnapshot(DataInputStream stream) 
throws IOException {
+    seriesNumber = stream.readInt();
+    memSize = stream.readLong();
+    tvListRamCost = stream.readLong();
+    totalPointsNum = stream.readLong();
+    totalPointsNumThreshold = stream.readLong();
+    maxPlanIndex = stream.readLong();
+    minPlanIndex = stream.readLong();
+
+    int memTableMapSize = stream.readInt();
+    for (int i = 0; i < memTableMapSize; ++i) {
+      PartialPath devicePath;
+      try {
+        devicePath =
+            DataNodeDevicePathCache.getInstance()
+                .getPartialPath(ReadWriteIOUtils.readString(stream));
+      } catch (IllegalPathException e) {
+        throw new IllegalArgumentException("Cannot deserialize 
OldMemTableSnapshot", e);
+      }
+      IDeviceID deviceID = deviceIDFactory.getDeviceID(devicePath);
+      boolean isAligned = ReadWriteIOUtils.readBool(stream);
+      IWritableMemChunkGroup memChunkGroup;
+      if (isAligned) {
+        memChunkGroup = AlignedWritableMemChunkGroup.deserialize(stream);
+      } else {
+        memChunkGroup = WritableMemChunkGroup.deserialize(stream);
+      }
+      memTableMap.put(deviceID, memChunkGroup);
+    }
+  }
+
   @Override
   public Map<IDeviceID, Long> getMaxTime() {
     Map<IDeviceID, Long> latestTimeForEachDevice = new HashMap<>();
@@ -1129,6 +1161,21 @@ public abstract class AbstractMemTable implements 
IMemTable {
       }
       return memTable;
     }
+
+    public static IMemTable createFromOldMemTableSnapshot(DataInputStream 
stream)
+        throws IOException {
+      boolean isSignal = ReadWriteIOUtils.readBool(stream);
+      IMemTable memTable;
+      if (isSignal) {
+        memTable = new NotifyFlushMemTable();
+      } else {
+        // database will be updated when deserialize
+        PrimitiveMemTable primitiveMemTable = new PrimitiveMemTable();
+        primitiveMemTable.deserializeFromOldMemTableSnapshot(stream);
+        memTable = primitiveMemTable;
+      }
+      return memTable;
+    }
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALEntry.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALEntry.java
index f9ea8fdb333..67030587922 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALEntry.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALEntry.java
@@ -110,6 +110,9 @@ public abstract class WALEntry implements SerializedSize {
       case MEMORY_TABLE_SNAPSHOT:
         value = AbstractMemTable.Factory.create(stream);
         break;
+      case OLD_MEMORY_TABLE_SNAPSHOT:
+        value = AbstractMemTable.Factory.createFromOldMemTableSnapshot(stream);
+        break;
       case INSERT_ROW_NODE:
         value = (InsertRowNode) PlanNodeType.deserializeFromWAL(stream);
         break;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALEntryType.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALEntryType.java
index 914a2e90663..415bdaeb877 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALEntryType.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALEntryType.java
@@ -28,8 +28,9 @@ public enum WALEntryType {
   INSERT_TABLET_PLAN((byte) 1),
   @Deprecated
   DELETE_PLAN((byte) 2),
-  /** {@link org.apache.iotdb.db.engine.memtable.AbstractMemTable} */
-  MEMORY_TABLE_SNAPSHOT((byte) 3),
+  @Deprecated
+  // memory tablet snapshot from 1.3 or lower version
+  OLD_MEMORY_TABLE_SNAPSHOT((byte) 3),
   /** {@link 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode} */
   INSERT_ROW_NODE((byte) 4),
   /** {@link 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode} 
*/
@@ -41,6 +42,8 @@ public enum WALEntryType {
   /** {@link 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode} */
   INSERT_ROWS_NODE((byte) 8),
   CONTINUOUS_SAME_SEARCH_INDEX_SEPARATOR_NODE((byte) 9),
+  /** {@link 
org.apache.iotdb.db.storageengine.dataregion.memtable.AbstractMemTable} */
+  MEMORY_TABLE_SNAPSHOT((byte) 10),
   // endregion
   // region signal entry type
   // signal wal buffer has been closed
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/file/UnsealedTsFileRecoverPerformer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/file/UnsealedTsFileRecoverPerformer.java
index 71ff9417e91..a09c473e16e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/file/UnsealedTsFileRecoverPerformer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/file/UnsealedTsFileRecoverPerformer.java
@@ -190,6 +190,7 @@ public class UnsealedTsFileRecoverPerformer extends 
AbstractTsFileRecoverPerform
     try {
       switch (walEntry.getType()) {
         case MEMORY_TABLE_SNAPSHOT:
+        case OLD_MEMORY_TABLE_SNAPSHOT:
           IMemTable memTable = (IMemTable) walEntry.getValue();
           if (!memTable.isSignalMemTable()) {
             if (tsFileResource != null) {

Reply via email to