This is an automated email from the ASF dual-hosted git repository. haonan pushed a commit to branch adapt_wal_1.3 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit a157df0708e0f7c4aaa2d65bbcce2d979aec0d6b Author: HTHou <[email protected]> AuthorDate: Tue Aug 13 17:10:05 2024 +0800 Adapt wal with memtable snapshot from v1.3 or lower version --- .../dataregion/memtable/AbstractMemTable.java | 47 ++++++++++++++++++++++ .../dataregion/wal/buffer/WALEntry.java | 3 ++ .../dataregion/wal/buffer/WALEntryType.java | 7 +++- .../file/UnsealedTsFileRecoverPerformer.java | 1 + 4 files changed, 56 insertions(+), 2 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java index 320b3a12efd..b1098f9858e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java @@ -34,6 +34,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.exception.WriteProcessException; import org.apache.iotdb.db.exception.query.QueryProcessException; import org.apache.iotdb.db.queryengine.execution.fragment.QueryContext; +import org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeDevicePathCache; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode; import org.apache.iotdb.db.schemaengine.schemaregion.utils.ResourceByPathUtils; @@ -1096,6 +1097,37 @@ public abstract class AbstractMemTable implements IMemTable { } } + public void deserializeFromOldMemTableSnapshot(DataInputStream stream) throws IOException { + seriesNumber = stream.readInt(); + memSize = stream.readLong(); + tvListRamCost = stream.readLong(); + totalPointsNum = stream.readLong(); + totalPointsNumThreshold = stream.readLong(); + maxPlanIndex = stream.readLong(); + minPlanIndex = stream.readLong(); + + int memTableMapSize = stream.readInt(); + for (int i = 0; i < memTableMapSize; ++i) { + PartialPath devicePath; + try { + devicePath = + DataNodeDevicePathCache.getInstance() + .getPartialPath(ReadWriteIOUtils.readString(stream)); + } catch (IllegalPathException e) { + throw new IllegalArgumentException("Cannot deserialize OldMemTableSnapshot", e); + } + IDeviceID deviceID = deviceIDFactory.getDeviceID(devicePath); + boolean isAligned = ReadWriteIOUtils.readBool(stream); + IWritableMemChunkGroup memChunkGroup; + if (isAligned) { + memChunkGroup = AlignedWritableMemChunkGroup.deserialize(stream); + } else { + memChunkGroup = WritableMemChunkGroup.deserialize(stream); + } + memTableMap.put(deviceID, memChunkGroup); + } + } + @Override public Map<IDeviceID, Long> getMaxTime() { Map<IDeviceID, Long> latestTimeForEachDevice = new HashMap<>(); @@ -1129,6 +1161,21 @@ public abstract class AbstractMemTable implements IMemTable { } return memTable; } + + public static IMemTable createFromOldMemTableSnapshot(DataInputStream stream) + throws IOException { + boolean isSignal = ReadWriteIOUtils.readBool(stream); + IMemTable memTable; + if (isSignal) { + memTable = new NotifyFlushMemTable(); + } else { + // database will be updated when deserialize + PrimitiveMemTable primitiveMemTable = new PrimitiveMemTable(); + primitiveMemTable.deserializeFromOldMemTableSnapshot(stream); + memTable = primitiveMemTable; + } + return memTable; + } } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALEntry.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALEntry.java index f9ea8fdb333..67030587922 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALEntry.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALEntry.java @@ -110,6 +110,9 @@ public abstract class WALEntry implements SerializedSize { case MEMORY_TABLE_SNAPSHOT: value = AbstractMemTable.Factory.create(stream); break; + case OLD_MEMORY_TABLE_SNAPSHOT: + value = AbstractMemTable.Factory.createFromOldMemTableSnapshot(stream); + break; case INSERT_ROW_NODE: value = (InsertRowNode) PlanNodeType.deserializeFromWAL(stream); break; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALEntryType.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALEntryType.java index 914a2e90663..415bdaeb877 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALEntryType.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/buffer/WALEntryType.java @@ -28,8 +28,9 @@ public enum WALEntryType { INSERT_TABLET_PLAN((byte) 1), @Deprecated DELETE_PLAN((byte) 2), - /** {@link org.apache.iotdb.db.engine.memtable.AbstractMemTable} */ - MEMORY_TABLE_SNAPSHOT((byte) 3), + @Deprecated + // memory tablet snapshot from 1.3 or lower version + OLD_MEMORY_TABLE_SNAPSHOT((byte) 3), /** {@link org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode} */ INSERT_ROW_NODE((byte) 4), /** {@link org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode} */ @@ -41,6 +42,8 @@ public enum WALEntryType { /** {@link org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode} */ INSERT_ROWS_NODE((byte) 8), CONTINUOUS_SAME_SEARCH_INDEX_SEPARATOR_NODE((byte) 9), + /** {@link org.apache.iotdb.db.storageengine.dataregion.memtable.AbstractMemTable} */ + MEMORY_TABLE_SNAPSHOT((byte) 10), // endregion // region signal entry type // signal wal buffer has been closed diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/file/UnsealedTsFileRecoverPerformer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/file/UnsealedTsFileRecoverPerformer.java index 71ff9417e91..a09c473e16e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/file/UnsealedTsFileRecoverPerformer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/file/UnsealedTsFileRecoverPerformer.java @@ -190,6 +190,7 @@ public class UnsealedTsFileRecoverPerformer extends AbstractTsFileRecoverPerform try { switch (walEntry.getType()) { case MEMORY_TABLE_SNAPSHOT: + case OLD_MEMORY_TABLE_SNAPSHOT: IMemTable memTable = (IMemTable) walEntry.getValue(); if (!memTable.isSignalMemTable()) { if (tsFileResource != null) {
