This is an automated email from the ASF dual-hosted git repository.

Caideyipi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 44d4f6d962e Optimized write performace by reducing separators (#17670)
44d4f6d962e is described below

commit 44d4f6d962ea4e0a03e07e8e3051b6e0c6e0e9b7
Author: Caideyipi <[email protected]>
AuthorDate: Mon Jun 1 18:16:39 2026 +0800

    Optimized write performace by reducing separators (#17670)
---
 .../dataregion/DataExecutionVisitor.java           |   5 -
 .../planner/plan/node/write/InsertRowNode.java     |   6 +-
 .../planner/plan/node/write/InsertRowsNode.java    |   6 +-
 .../planner/plan/node/write/InsertTabletNode.java  |   6 +-
 .../plan/node/write/RelationalInsertRowNode.java   |   4 +-
 .../plan/node/write/RelationalInsertRowsNode.java  |   4 +-
 .../plan/planner/plan/node/write/SearchNode.java   |  45 +++++++
 .../db/storageengine/dataregion/DataRegion.java    |  82 ++++++++++--
 .../storageengine/dataregion/wal/node/WALNode.java |  11 +-
 .../planner/node/write/InsertRowNodeSerdeTest.java |  90 +++++++++++++
 .../node/write/InsertRowsNodeSerdeTest.java        | 141 +++++++++++++++++++++
 .../node/write/InsertTabletNodeSerdeTest.java      |  67 ++++++++++
 .../wal/node/WALNodeWaitForRollFileTest.java       |  29 ++++-
 13 files changed, 463 insertions(+), 33 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataExecutionVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataExecutionVisitor.java
index 88d23360b54..e96ae983e58 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataExecutionVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataExecutionVisitor.java
@@ -76,7 +76,6 @@ public class DataExecutionVisitor implements 
PlanVisitor<TSStatus, DataRegion> {
   public TSStatus visitInsertRow(InsertRowNode node, DataRegion dataRegion) {
     try {
       dataRegion.insert(node);
-      dataRegion.insertSeparatorToWAL();
       return StatusUtils.OK;
     } catch (OutOfTTLException e) {
       LOGGER.warn(DataNodeMiscMessages.ERROR_EXECUTING_PLAN_NODE_CAUSED, node, 
e.getMessage());
@@ -100,7 +99,6 @@ public class DataExecutionVisitor implements 
PlanVisitor<TSStatus, DataRegion> {
   public TSStatus visitInsertTablet(final InsertTabletNode node, final 
DataRegion dataRegion) {
     try {
       dataRegion.insertTablet(node);
-      dataRegion.insertSeparatorToWAL();
       return StatusUtils.OK;
     } catch (final OutOfTTLException e) {
       LOGGER.debug(DataNodeMiscMessages.ERROR_EXECUTING_PLAN_NODE_CAUSED, 
node, e.getMessage());
@@ -137,7 +135,6 @@ public class DataExecutionVisitor implements 
PlanVisitor<TSStatus, DataRegion> {
   public TSStatus visitInsertRows(InsertRowsNode node, DataRegion dataRegion) {
     try {
       dataRegion.insert(node);
-      dataRegion.insertSeparatorToWAL();
       return StatusUtils.OK;
     } catch (WriteProcessRejectException e) {
       LOGGER.warn(DataNodeMiscMessages.REJECT_EXECUTING_PLAN_NODE, node, 
e.getMessage());
@@ -174,7 +171,6 @@ public class DataExecutionVisitor implements 
PlanVisitor<TSStatus, DataRegion> {
   public TSStatus visitInsertMultiTablets(InsertMultiTabletsNode node, 
DataRegion dataRegion) {
     try {
       dataRegion.insertTablets(node);
-      dataRegion.insertSeparatorToWAL();
       return StatusUtils.OK;
     } catch (WriteProcessRejectException e) {
       LOGGER.warn(DataNodeMiscMessages.REJECT_EXECUTING_PLAN_NODE, node, 
e.getMessage());
@@ -209,7 +205,6 @@ public class DataExecutionVisitor implements 
PlanVisitor<TSStatus, DataRegion> {
       InsertRowsOfOneDeviceNode node, DataRegion dataRegion) {
     try {
       dataRegion.insert(node);
-      dataRegion.insertSeparatorToWAL();
       return StatusUtils.OK;
     } catch (WriteProcessRejectException e) {
       LOGGER.warn(DataNodeMiscMessages.REJECT_EXECUTING_PLAN_NODE, node, 
e.getMessage());
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowNode.java
index 448c8df4545..708dac1d33a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowNode.java
@@ -621,7 +621,7 @@ public class InsertRowNode extends InsertNode implements 
WALEntryValue {
   @Override
   public void serializeToWAL(IWALByteBufferView buffer) {
     buffer.putShort(getType().getNodeType());
-    buffer.putLong(searchIndex);
+    buffer.putLong(getEncodedSearchIndex());
     subSerialize(buffer);
   }
 
@@ -702,7 +702,7 @@ public class InsertRowNode extends InsertNode implements 
WALEntryValue {
   public static InsertRowNode deserializeFromWAL(DataInputStream stream) 
throws IOException {
     long searchIndex = stream.readLong();
     InsertRowNode insertNode = subDeserializeFromWAL(stream);
-    insertNode.setSearchIndex(searchIndex);
+    insertNode.setSearchIndexFromWAL(searchIndex);
     return insertNode;
   }
 
@@ -793,7 +793,7 @@ public class InsertRowNode extends InsertNode implements 
WALEntryValue {
   public static InsertRowNode deserializeFromWAL(ByteBuffer buffer) {
     long searchIndex = buffer.getLong();
     InsertRowNode insertNode = subDeserializeFromWAL(buffer);
-    insertNode.setSearchIndex(searchIndex);
+    insertNode.setSearchIndexFromWAL(searchIndex);
     return insertNode;
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java
index 5571cd25616..957b22cfcb2 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java
@@ -350,7 +350,7 @@ public class InsertRowsNode extends InsertNode implements 
WALEntryValue {
   @Override
   public void serializeToWAL(IWALByteBufferView buffer) {
     buffer.putShort(getType().getNodeType());
-    buffer.putLong(searchIndex);
+    buffer.putLong(getEncodedSearchIndex());
     subSerialize(buffer);
   }
 
@@ -378,7 +378,7 @@ public class InsertRowsNode extends InsertNode implements 
WALEntryValue {
       InsertRowNode insertRowNode = 
InsertRowNode.subDeserializeFromWAL(stream);
       insertRowsNode.addOneInsertRowNode(insertRowNode, i);
     }
-    insertRowsNode.setSearchIndex(searchIndex);
+    insertRowsNode.setSearchIndexFromWAL(searchIndex);
     return insertRowsNode;
   }
 
@@ -398,7 +398,7 @@ public class InsertRowsNode extends InsertNode implements 
WALEntryValue {
       InsertRowNode insertRowNode = 
InsertRowNode.subDeserializeFromWAL(buffer);
       insertRowsNode.addOneInsertRowNode(insertRowNode, i);
     }
-    insertRowsNode.setSearchIndex(searchIndex);
+    insertRowsNode.setSearchIndexFromWAL(searchIndex);
     return insertRowsNode;
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
index adf14a39381..a00b418da9f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
@@ -905,7 +905,7 @@ public class InsertTabletNode extends InsertNode implements 
WALEntryValue {
   }
 
   void subSerialize(IWALByteBufferView buffer, List<int[]> rangeList) {
-    buffer.putLong(searchIndex);
+    buffer.putLong(getEncodedSearchIndex());
     WALWriteUtils.write(targetPath.getFullPath(), buffer);
     // data types are serialized in measurement schemas
     writeMeasurementSchemas(buffer);
@@ -1037,7 +1037,7 @@ public class InsertTabletNode extends InsertNode 
implements WALEntryValue {
   }
 
   protected void subDeserializeFromWAL(DataInputStream stream) throws 
IOException {
-    searchIndex = stream.readLong();
+    setSearchIndexFromWAL(stream.readLong());
     try {
       targetPath = readTargetPath(stream);
     } catch (IllegalPathException e) {
@@ -1073,7 +1073,7 @@ public class InsertTabletNode extends InsertNode 
implements WALEntryValue {
   }
 
   protected void subDeserializeFromWAL(ByteBuffer buffer) {
-    searchIndex = buffer.getLong();
+    setSearchIndexFromWAL(buffer.getLong());
     try {
       targetPath = readTargetPath(buffer);
     } catch (IllegalPathException e) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertRowNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertRowNode.java
index e622dba30b9..8ef6802f047 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertRowNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertRowNode.java
@@ -128,14 +128,14 @@ public class RelationalInsertRowNode extends 
InsertRowNode {
       throws IOException {
     long searchIndex = stream.readLong();
     RelationalInsertRowNode insertNode = subDeserializeFromWAL(stream);
-    insertNode.setSearchIndex(searchIndex);
+    insertNode.setSearchIndexFromWAL(searchIndex);
     return insertNode;
   }
 
   public static RelationalInsertRowNode deserializeFromWAL(ByteBuffer buffer) {
     long searchIndex = buffer.getLong();
     RelationalInsertRowNode insertNode = subDeserializeFromWAL(buffer);
-    insertNode.setSearchIndex(searchIndex);
+    insertNode.setSearchIndexFromWAL(searchIndex);
     return insertNode;
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertRowsNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertRowsNode.java
index 83498ceefc9..741c45f3256 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertRowsNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertRowsNode.java
@@ -124,7 +124,7 @@ public class RelationalInsertRowsNode extends 
InsertRowsNode {
       RelationalInsertRowNode insertRowNode = 
RelationalInsertRowNode.subDeserializeFromWAL(stream);
       insertRowsNode.addOneInsertRowNode(insertRowNode, i);
     }
-    insertRowsNode.setSearchIndex(searchIndex);
+    insertRowsNode.setSearchIndexFromWAL(searchIndex);
     return insertRowsNode;
   }
 
@@ -144,7 +144,7 @@ public class RelationalInsertRowsNode extends 
InsertRowsNode {
       RelationalInsertRowNode insertRowNode = 
RelationalInsertRowNode.subDeserializeFromWAL(buffer);
       insertRowsNode.addOneInsertRowNode(insertRowNode, i);
     }
-    insertRowsNode.setSearchIndex(searchIndex);
+    insertRowsNode.setSearchIndexFromWAL(searchIndex);
     return insertRowsNode;
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/SearchNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/SearchNode.java
index 5a7e5d9cf78..975ac1f4844 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/SearchNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/SearchNode.java
@@ -28,15 +28,22 @@ import java.util.List;
 
 public abstract class SearchNode extends WritePlanNode implements 
ComparableConsensusRequest {
 
+  private static final long LAST_FRAGMENT_MASK = Long.MIN_VALUE;
+
   /** this insert node doesn't need to participate in iot consensus */
   public static final long NO_CONSENSUS_INDEX = 
ConsensusReqReader.DEFAULT_SEARCH_INDEX;
 
+  // Preserve last-fragment state for WAL entries that do not have a consensus 
search index.
+  private static final long NO_CONSENSUS_INDEX_WITH_LAST_FRAGMENT = 
Long.MIN_VALUE;
+
   /**
    * this index is used by wal search, its order should be protected by the 
upper layer, and the
    * value should start from 1
    */
   protected long searchIndex = NO_CONSENSUS_INDEX;
 
+  protected boolean isLastFragment = false;
+
   protected SearchNode(PlanNodeId id) {
     super(id);
   }
@@ -51,5 +58,43 @@ public abstract class SearchNode extends WritePlanNode 
implements ComparableCons
     return this;
   }
 
+  public boolean isLastFragment() {
+    return isLastFragment;
+  }
+
+  public SearchNode setLastFragment(boolean lastFragment) {
+    isLastFragment = lastFragment;
+    return this;
+  }
+
+  protected long getEncodedSearchIndex() {
+    if (!isLastFragment) {
+      return searchIndex;
+    }
+    if (searchIndex == NO_CONSENSUS_INDEX) {
+      return NO_CONSENSUS_INDEX_WITH_LAST_FRAGMENT;
+    }
+    return searchIndex | LAST_FRAGMENT_MASK;
+  }
+
+  public static long extractSearchIndex(long encodedSearchIndex) {
+    if (encodedSearchIndex == NO_CONSENSUS_INDEX
+        || encodedSearchIndex == NO_CONSENSUS_INDEX_WITH_LAST_FRAGMENT) {
+      return NO_CONSENSUS_INDEX;
+    }
+    return encodedSearchIndex & ~LAST_FRAGMENT_MASK;
+  }
+
+  public static boolean isLastFragment(long encodedSearchIndex) {
+    return encodedSearchIndex == NO_CONSENSUS_INDEX_WITH_LAST_FRAGMENT
+        || (encodedSearchIndex != NO_CONSENSUS_INDEX
+            && (encodedSearchIndex & LAST_FRAGMENT_MASK) != 0);
+  }
+
+  protected void setSearchIndexFromWAL(long encodedSearchIndex) {
+    this.searchIndex = extractSearchIndex(encodedSearchIndex);
+    this.isLastFragment = isLastFragment(encodedSearchIndex);
+  }
+
   public abstract SearchNode merge(List<SearchNode> searchNodes);
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index 784d4fa4771..93a04bf401a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -1205,6 +1205,7 @@ public class DataRegion implements IDataRegionForQuery {
       // init map
       long timePartitionId = 
TimePartitionUtils.getTimePartitionId(insertRowNode.getTime());
       initFlushTimeMap(timePartitionId);
+      insertRowNode.setLastFragment(true);
 
       boolean isSequence =
           config.isEnableSeparateData()
@@ -1333,14 +1334,29 @@ public class DataRegion implements IDataRegionForQuery {
       InsertTabletNode insertTabletNode,
       Map<Long, List<int[]>[]> splitMap,
       TSStatus[] results,
-      long[] infoForMetrics)
+      long[] infoForMetrics,
+      boolean markLastFragmentOnFinalWrite)
       throws DataTypeInconsistentException {
     boolean noFailure = true;
+    int remainingFragmentCount = 0;
+    if (markLastFragmentOnFinalWrite) {
+      for (Entry<Long, List<int[]>[]> entry : splitMap.entrySet()) {
+        List<int[]>[] rangeLists = entry.getValue();
+        if (rangeLists[1] != null) {
+          remainingFragmentCount++;
+        }
+        if (rangeLists[0] != null) {
+          remainingFragmentCount++;
+        }
+      }
+    }
     for (Entry<Long, List<int[]>[]> entry : splitMap.entrySet()) {
       long timePartitionId = entry.getKey();
       List<int[]>[] rangeLists = entry.getValue();
       List<int[]> sequenceRangeList = rangeLists[1];
       if (sequenceRangeList != null) {
+        insertTabletNode.setLastFragment(
+            markLastFragmentOnFinalWrite && remainingFragmentCount == 1);
         noFailure =
             insertTabletToTsFileProcessor(
                     insertTabletNode,
@@ -1351,9 +1367,12 @@ public class DataRegion implements IDataRegionForQuery {
                     noFailure,
                     infoForMetrics)
                 && noFailure;
+        remainingFragmentCount--;
       }
       List<int[]> unSequenceRangeList = rangeLists[0];
       if (unSequenceRangeList != null) {
+        insertTabletNode.setLastFragment(
+            markLastFragmentOnFinalWrite && remainingFragmentCount == 1);
         noFailure =
             insertTabletToTsFileProcessor(
                     insertTabletNode,
@@ -1364,6 +1383,7 @@ public class DataRegion implements IDataRegionForQuery {
                     noFailure,
                     infoForMetrics)
                 && noFailure;
+        remainingFragmentCount--;
       }
     }
     return noFailure;
@@ -1402,7 +1422,7 @@ public class DataRegion implements IDataRegionForQuery {
       // infoForMetrics[2]: ScheduleWalTimeCost
       // infoForMetrics[3]: ScheduleMemTableTimeCost
       // infoForMetrics[4]: InsertedPointsNumber
-      boolean noFailure = executeInsertTablet(insertTabletNode, results, 
infoForMetrics);
+      boolean noFailure = executeInsertTablet(insertTabletNode, results, 
infoForMetrics, true);
       updateTsFileProcessorMetric(insertTabletNode, infoForMetrics);
 
       if (!noFailure) {
@@ -1418,7 +1438,8 @@ public class DataRegion implements IDataRegionForQuery {
       InsertTabletNode insertTabletNode,
       TSStatus[] results,
       long[] infoForMetrics,
-      List<Pair<IDeviceID, Integer>> deviceEndOffsetPairs) {
+      List<Pair<IDeviceID, Integer>> deviceEndOffsetPairs,
+      boolean markLastFragmentOnFinalWrite) {
     final int initialStart = start;
     try {
       Map<Long, List<int[]>[]> splitInfo = new HashMap<>();
@@ -1427,7 +1448,8 @@ public class DataRegion implements IDataRegionForQuery {
         split(insertTabletNode, start, end, splitInfo);
         start = end;
       }
-      return doInsert(insertTabletNode, splitInfo, results, infoForMetrics);
+      return doInsert(
+          insertTabletNode, splitInfo, results, infoForMetrics, 
markLastFragmentOnFinalWrite);
     } catch (DataTypeInconsistentException e) {
       // the exception will trigger a flush, which requires the flush time to 
be recalculated
       start = initialStart;
@@ -1438,7 +1460,8 @@ public class DataRegion implements IDataRegionForQuery {
         start = end;
       }
       try {
-        return doInsert(insertTabletNode, splitInfo, results, infoForMetrics);
+        return doInsert(
+            insertTabletNode, splitInfo, results, infoForMetrics, 
markLastFragmentOnFinalWrite);
       } catch (DataTypeInconsistentException ex) {
         
logger.error(StorageEngineMessages.DATA_INCONSISTENT_NOT_TRIGGER_TWICE, ex);
         return false;
@@ -1447,7 +1470,10 @@ public class DataRegion implements IDataRegionForQuery {
   }
 
   private boolean executeInsertTablet(
-      InsertTabletNode insertTabletNode, TSStatus[] results, long[] 
infoForMetrics)
+      InsertTabletNode insertTabletNode,
+      TSStatus[] results,
+      long[] infoForMetrics,
+      boolean markLastFragmentOnFinalWrite)
       throws OutOfTTLException {
     boolean noFailure;
     int loc =
@@ -1458,7 +1484,13 @@ public class DataRegion implements IDataRegionForQuery {
     List<Pair<IDeviceID, Integer>> deviceEndOffsetPairs =
         insertTabletNode.splitByDevice(loc, insertTabletNode.getRowCount());
     noFailure =
-        splitAndInsert(loc, insertTabletNode, results, infoForMetrics, 
deviceEndOffsetPairs)
+        splitAndInsert(
+                loc,
+                insertTabletNode,
+                results,
+                infoForMetrics,
+                deviceEndOffsetPairs,
+                markLastFragmentOnFinalWrite)
             && noFailure;
 
     if (CommonDescriptor.getInstance().getConfig().isLastCacheEnable()
@@ -1471,6 +1503,25 @@ public class DataRegion implements IDataRegionForQuery {
     return noFailure;
   }
 
+  private int findLastInsertTabletIndexToMark(final InsertMultiTabletsNode 
insertMultiTabletsNode) {
+    for (int i = insertMultiTabletsNode.getInsertTabletNodeList().size() - 1; 
i >= 0; i--) {
+      final InsertTabletNode insertTabletNode =
+          insertMultiTabletsNode.getInsertTabletNodeList().get(i);
+      if (insertTabletNode.getRowCount() <= 0 || 
insertTabletNode.allMeasurementFailed()) {
+        continue;
+      }
+      if (!insertTabletNode.shouldCheckTTL()) {
+        return i;
+      }
+      final long[] times = insertTabletNode.getTimes();
+      if (times.length > 0
+          && CommonUtils.isAlive(times[times.length - 1], 
getTTL(insertTabletNode))) {
+        return i;
+      }
+    }
+    return -1;
+  }
+
   private void initFlushTimeMap(long timePartitionId) {
     if (config.isEnableSeparateData()
         && 
!lastFlushTimeMap.checkAndCreateFlushedTimePartition(timePartitionId, true)) {
@@ -1752,8 +1803,10 @@ public class DataRegion implements IDataRegionForQuery {
     }
 
     List<InsertRowNode> executedInsertRowNodeList = new ArrayList<>();
+    int remainingFragments = tsFileProcessorMap.size();
     for (Map.Entry<TsFileProcessor, InsertRowsNode> entry : 
tsFileProcessorMap.entrySet()) {
       InsertRowsNode subInsertRowsNode = entry.getValue();
+      subInsertRowsNode.setLastFragment(--remainingFragments == 0);
       try {
         List<TsFileProcessor> insertedProcessors =
             insertRowsWithTypeConsistencyCheck(entry.getKey(), 
subInsertRowsNode, infoForMetrics);
@@ -1865,10 +1918,14 @@ public class DataRegion implements IDataRegionForQuery {
     }
 
     final List<TsFileProcessor> insertedProcessors = new 
ArrayList<>(retriedProcessorMap.size());
+    int remainingRetriedFragments = retriedProcessorMap.size();
     for (Entry<TsFileProcessor, InsertRowsNode> retriedEntry : 
retriedProcessorMap.entrySet()) {
       final TsFileProcessor retriedProcessor = retriedEntry.getKey();
-      registerToTsFile(retriedEntry.getValue(), retriedProcessor);
-      retriedProcessor.insertRows(retriedEntry.getValue(), infoForMetrics);
+      final InsertRowsNode retriedInsertRowsNode = retriedEntry.getValue();
+      retriedInsertRowsNode.setLastFragment(
+          subInsertRowsNode.isLastFragment() && --remainingRetriedFragments == 
0);
+      registerToTsFile(retriedInsertRowsNode, retriedProcessor);
+      retriedProcessor.insertRows(retriedInsertRowsNode, infoForMetrics);
       insertedProcessors.add(retriedProcessor);
     }
     return insertedProcessors;
@@ -4619,8 +4676,10 @@ public class DataRegion implements IDataRegionForQuery {
       // infoForMetrics[2]: ScheduleWalTimeCost
       // infoForMetrics[3]: ScheduleMemTableTimeCost
       // infoForMetrics[4]: InsertedPointsNumber
+      int remainingFragments = tsFileProcessorMap.size();
       for (Map.Entry<TsFileProcessor, InsertRowsNode> entry : 
tsFileProcessorMap.entrySet()) {
         InsertRowsNode subInsertRowsNode = entry.getValue();
+        subInsertRowsNode.setLastFragment(--remainingFragments == 0);
         try {
           List<TsFileProcessor> insertedProcessors =
               insertRowsWithTypeConsistencyCheck(entry.getKey(), 
subInsertRowsNode, infoForMetrics);
@@ -4761,6 +4820,7 @@ public class DataRegion implements IDataRegionForQuery {
       // infoForMetrics[2]: ScheduleWalTimeCost
       // infoForMetrics[3]: ScheduleMemTableTimeCost
       // infoForMetrics[4]: InsertedPointsNumbe
+      final int lastTabletIndexToMark = 
findLastInsertTabletIndexToMark(insertMultiTabletsNode);
       for (int i = 0; i < 
insertMultiTabletsNode.getInsertTabletNodeList().size(); i++) {
         InsertTabletNode insertTabletNode = 
insertMultiTabletsNode.getInsertTabletNodeList().get(i);
         long[] times = insertTabletNode.getTimes();
@@ -4774,7 +4834,9 @@ public class DataRegion implements IDataRegionForQuery {
         Arrays.fill(results, RpcUtils.SUCCESS_STATUS);
         boolean noFailure = false;
         try {
-          noFailure = executeInsertTablet(insertTabletNode, results, 
infoForMetrics);
+          noFailure =
+              executeInsertTablet(
+                  insertTabletNode, results, infoForMetrics, i == 
lastTabletIndexToMark);
         } catch (WriteProcessException e) {
           insertMultiTabletsNode
               .getResults()
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java
index 4057914f9db..86e2b173027 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNode.java
@@ -36,6 +36,7 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNo
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.ObjectNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalDeleteDataNode;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.SearchNode;
 import org.apache.iotdb.db.service.metrics.WritingMetrics;
 import org.apache.iotdb.db.storageengine.StorageEngine;
 import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
@@ -690,7 +691,9 @@ public class WALNode implements IWALNode {
             if (type.needSearch()) {
               // see WALInfoEntry#serialize, entry type + memtable id + plan 
node type
               buffer.position(WALInfoEntry.FIXED_SERIALIZED_SIZE + 
PlanNodeType.BYTES);
-              final long currentWalEntryIndex = buffer.getLong();
+              final long encodedSearchIndex = buffer.getLong();
+              final long currentWalEntryIndex = 
SearchNode.extractSearchIndex(encodedSearchIndex);
+              final boolean isLastFragment = 
SearchNode.isLastFragment(encodedSearchIndex);
               buffer.clear();
               if (currentWalEntryIndex == -1) {
                 // WAL entry of targetIndex has been fully collected, so put 
them into insertNodes
@@ -715,6 +718,9 @@ public class WALNode implements IWALNode {
                   tmpNodes.get().add(new IoTConsensusRequest(buffer));
                   memorySize += buffer.remaining();
                 }
+                if (isLastFragment) {
+                  tryToCollectInsertNodeAndBumpIndex.run();
+                }
               } else {
                 // currentWalEntryIndex > targetIndex
                 // WAL entry of targetIndex has been fully collected, put them 
into insertNodes
@@ -743,6 +749,9 @@ public class WALNode implements IWALNode {
                   tmpNodes.get().add(new IoTConsensusRequest(buffer));
                   memorySize += buffer.remaining();
                 }
+                if (isLastFragment) {
+                  tryToCollectInsertNodeAndBumpIndex.run();
+                }
               }
             } else {
               tryToCollectInsertNodeAndBumpIndex.run();
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/write/InsertRowNodeSerdeTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/write/InsertRowNodeSerdeTest.java
index dbe69da2462..21bc9e0d120 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/write/InsertRowNodeSerdeTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/write/InsertRowNodeSerdeTest.java
@@ -23,10 +23,13 @@ import 
org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNodeId;
 import 
org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNodeType;
+import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalInsertRowNode;
 import 
org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALByteBufferForTest;
 
 import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.utils.Binary;
 import org.apache.tsfile.write.schema.MeasurementSchema;
 import org.junit.Assert;
 import org.junit.Test;
@@ -35,6 +38,7 @@ import java.io.ByteArrayInputStream;
 import java.io.DataInputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
 
 public class InsertRowNodeSerdeTest {
 
@@ -72,6 +76,7 @@ public class InsertRowNodeSerdeTest {
   @Test
   public void TestSerializeAndDeserializeForWAL() throws IllegalPathException, 
IOException {
     InsertRowNode insertRowNode = getInsertRowNodeWithMeasurementSchemas();
+    insertRowNode.setLastFragment(true);
 
     int serializedSize = insertRowNode.serializedSize();
 
@@ -96,6 +101,67 @@ public class InsertRowNodeSerdeTest {
           new MeasurementSchema("s5", TSDataType.BOOLEAN)
         });
     Assert.assertEquals(insertRowNode, tmpNode);
+    Assert.assertTrue(tmpNode.isLastFragment());
+  }
+
+  @Test
+  public void testDeserializeLegacyWAL() throws IllegalPathException, 
IOException {
+    InsertRowNode insertRowNode = getInsertRowNodeWithMeasurementSchemas();
+    insertRowNode.setSearchIndex(123L);
+
+    byte[] bytes = new byte[insertRowNode.serializedSize()];
+    WALByteBufferForTest walBuffer = new 
WALByteBufferForTest(ByteBuffer.wrap(bytes));
+    insertRowNode.serializeToWAL(walBuffer);
+
+    ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
+    Assert.assertEquals(PlanNodeType.INSERT_ROW.getNodeType(), 
byteBuffer.getShort());
+    Assert.assertEquals(123L, byteBuffer.getLong());
+
+    DataInputStream dataInputStream = new DataInputStream(new 
ByteArrayInputStream(bytes));
+    dataInputStream.readShort();
+
+    InsertRowNode tmpNode = InsertRowNode.deserializeFromWAL(dataInputStream);
+    tmpNode.setPlanNodeId(insertRowNode.getPlanNodeId());
+    tmpNode.setMeasurementSchemas(
+        new MeasurementSchema[] {
+          new MeasurementSchema("s1", TSDataType.DOUBLE),
+          new MeasurementSchema("s2", TSDataType.FLOAT),
+          new MeasurementSchema("s3", TSDataType.INT64),
+          new MeasurementSchema("s4", TSDataType.INT32),
+          new MeasurementSchema("s5", TSDataType.BOOLEAN)
+        });
+    Assert.assertEquals(insertRowNode, tmpNode);
+    Assert.assertEquals(123L, tmpNode.getSearchIndex());
+    Assert.assertFalse(tmpNode.isLastFragment());
+  }
+
+  @Test
+  public void testDeserializeLegacyWALRelational() throws IOException {
+    RelationalInsertRowNode insertRowNode = 
getRelationalInsertRowNodeWithMeasurementSchemas();
+    insertRowNode.setSearchIndex(123L);
+
+    byte[] bytes = new byte[insertRowNode.serializedSize()];
+    WALByteBufferForTest walBuffer = new 
WALByteBufferForTest(ByteBuffer.wrap(bytes));
+    insertRowNode.serializeToWAL(walBuffer);
+
+    ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
+    Assert.assertEquals(PlanNodeType.RELATIONAL_INSERT_ROW.getNodeType(), 
byteBuffer.getShort());
+    Assert.assertEquals(123L, byteBuffer.getLong());
+
+    DataInputStream dataInputStream = new DataInputStream(new 
ByteArrayInputStream(bytes));
+    dataInputStream.readShort();
+
+    RelationalInsertRowNode tmpNode = 
RelationalInsertRowNode.deserializeFromWAL(dataInputStream);
+    tmpNode.setPlanNodeId(insertRowNode.getPlanNodeId());
+    tmpNode.setMeasurementSchemas(
+        new MeasurementSchema[] {
+          new MeasurementSchema("id", TSDataType.STRING),
+          new MeasurementSchema("attr", TSDataType.TEXT),
+          new MeasurementSchema("value", TSDataType.INT64)
+        });
+    Assert.assertEquals(insertRowNode, tmpNode);
+    Assert.assertEquals(123L, tmpNode.getSearchIndex());
+    Assert.assertFalse(tmpNode.isLastFragment());
   }
 
   private InsertRowNode getInsertRowNode() throws IllegalPathException {
@@ -199,4 +265,28 @@ public class InsertRowNodeSerdeTest {
     insertRowNode.setNeedInferType(true);
     return insertRowNode;
   }
+
+  private RelationalInsertRowNode 
getRelationalInsertRowNodeWithMeasurementSchemas() {
+    return new RelationalInsertRowNode(
+        new PlanNodeId("plannode 3"),
+        new PartialPath("table1", false),
+        false,
+        new String[] {"id", "attr", "value"},
+        new TSDataType[] {TSDataType.STRING, TSDataType.TEXT, 
TSDataType.INT64},
+        new MeasurementSchema[] {
+          new MeasurementSchema("id", TSDataType.STRING),
+          new MeasurementSchema("attr", TSDataType.TEXT),
+          new MeasurementSchema("value", TSDataType.INT64)
+        },
+        90L,
+        new Object[] {
+          new Binary("d1".getBytes(StandardCharsets.UTF_8)),
+          new Binary("v1".getBytes(StandardCharsets.UTF_8)),
+          1L
+        },
+        false,
+        new TsTableColumnCategory[] {
+          TsTableColumnCategory.TAG, TsTableColumnCategory.ATTRIBUTE, 
TsTableColumnCategory.FIELD
+        });
+  }
 }
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/write/InsertRowsNodeSerdeTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/write/InsertRowsNodeSerdeTest.java
index 08819830557..034d4cb57de 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/write/InsertRowsNodeSerdeTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/write/InsertRowsNodeSerdeTest.java
@@ -129,6 +129,7 @@ public class InsertRowsNodeSerdeTest {
             new Object[] {2.0, false},
             false),
         1);
+    insertRowsNode.setLastFragment(true);
 
     int serializedSize = insertRowsNode.serializedSize();
 
@@ -145,6 +146,70 @@ public class InsertRowsNodeSerdeTest {
     InsertRowsNode tmpNode = 
InsertRowsNode.deserializeFromWAL(dataInputStream);
     tmpNode.setPlanNodeId(insertRowsNode.getPlanNodeId());
     Assert.assertEquals(insertRowsNode, tmpNode);
+    Assert.assertTrue(tmpNode.isLastFragment());
+  }
+
+  @Test
+  public void testDeserializeLegacyWAL() throws IllegalPathException, 
IOException {
+    InsertRowsNode insertRowsNode = new InsertRowsNode(new PlanNodeId("plan 
node 1"));
+    insertRowsNode.addOneInsertRowNode(
+        new InsertRowNode(
+            new PlanNodeId(""),
+            new PartialPath("root.sg.d1"),
+            false,
+            new String[] {"s1", "s2", "s3", "s4", "s5"},
+            new TSDataType[] {
+              TSDataType.DOUBLE,
+              TSDataType.FLOAT,
+              TSDataType.INT64,
+              TSDataType.TEXT,
+              TSDataType.STRING
+            },
+            new MeasurementSchema[] {
+              new MeasurementSchema("s1", TSDataType.DOUBLE),
+              new MeasurementSchema("s2", TSDataType.FLOAT),
+              new MeasurementSchema("s3", TSDataType.INT64),
+              new MeasurementSchema("s4", TSDataType.TEXT),
+              new MeasurementSchema("s5", TSDataType.STRING)
+            },
+            1000L,
+            new Object[] {1.0, 2f, 300L, new 
Binary("444".getBytes(StandardCharsets.UTF_8)), null},
+            false),
+        0);
+
+    insertRowsNode.addOneInsertRowNode(
+        new InsertRowNode(
+            new PlanNodeId(""),
+            new PartialPath("root.sg.d2"),
+            false,
+            new String[] {"s1", "s4"},
+            new TSDataType[] {TSDataType.DOUBLE, TSDataType.BOOLEAN},
+            new MeasurementSchema[] {
+              new MeasurementSchema("s1", TSDataType.DOUBLE),
+              new MeasurementSchema("s4", TSDataType.BOOLEAN),
+            },
+            2000L,
+            new Object[] {2.0, false},
+            false),
+        1);
+    insertRowsNode.setSearchIndex(123L);
+
+    byte[] bytes = new byte[insertRowsNode.serializedSize()];
+    WALByteBufferForTest walBuffer = new 
WALByteBufferForTest(ByteBuffer.wrap(bytes));
+    insertRowsNode.serializeToWAL(walBuffer);
+
+    ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
+    Assert.assertEquals(PlanNodeType.INSERT_ROWS.getNodeType(), 
byteBuffer.getShort());
+    Assert.assertEquals(123L, byteBuffer.getLong());
+
+    DataInputStream dataInputStream = new DataInputStream(new 
ByteArrayInputStream(bytes));
+    dataInputStream.readShort();
+
+    InsertRowsNode tmpNode = 
InsertRowsNode.deserializeFromWAL(dataInputStream);
+    tmpNode.setPlanNodeId(insertRowsNode.getPlanNodeId());
+    Assert.assertEquals(insertRowsNode, tmpNode);
+    Assert.assertEquals(123L, tmpNode.getSearchIndex());
+    Assert.assertFalse(tmpNode.isLastFragment());
   }
 
   @Test
@@ -262,6 +327,7 @@ public class InsertRowsNodeSerdeTest {
                 TsTableColumnCategory.TAG, TsTableColumnCategory.ATTRIBUTE
               }),
           1);
+      insertRowsNode.setLastFragment(true);
 
       int serializedSize = insertRowsNode.serializedSize();
 
@@ -280,6 +346,81 @@ public class InsertRowsNodeSerdeTest {
           RelationalInsertRowsNode.deserializeFromWAL(dataInputStream);
       tmpNode.setPlanNodeId(insertRowsNode.getPlanNodeId());
       Assert.assertEquals(insertRowsNode, tmpNode);
+      Assert.assertTrue(tmpNode.isLastFragment());
     }
   }
+
+  @Test
+  public void testDeserializeLegacyWALRelational() throws IOException {
+    RelationalInsertRowsNode insertRowsNode =
+        new RelationalInsertRowsNode(new PlanNodeId("plan node 1"));
+    insertRowsNode.addOneInsertRowNode(
+        new RelationalInsertRowNode(
+            new PlanNodeId(""),
+            new PartialPath("table1", false),
+            false,
+            new String[] {"s1", "s2", "s3", "s4", "s5"},
+            new TSDataType[] {
+              TSDataType.DOUBLE,
+              TSDataType.FLOAT,
+              TSDataType.INT64,
+              TSDataType.TEXT,
+              TSDataType.STRING
+            },
+            new MeasurementSchema[] {
+              new MeasurementSchema("s1", TSDataType.DOUBLE),
+              new MeasurementSchema("s2", TSDataType.FLOAT),
+              new MeasurementSchema("s3", TSDataType.INT64),
+              new MeasurementSchema("s4", TSDataType.TEXT),
+              new MeasurementSchema("s5", TSDataType.STRING)
+            },
+            1000L,
+            new Object[] {1.0, 2f, 300L, new 
Binary("444".getBytes(StandardCharsets.UTF_8)), null},
+            false,
+            new TsTableColumnCategory[] {
+              TsTableColumnCategory.TAG,
+              TsTableColumnCategory.ATTRIBUTE,
+              TsTableColumnCategory.FIELD,
+              TsTableColumnCategory.FIELD,
+              TsTableColumnCategory.FIELD
+            }),
+        0);
+
+    insertRowsNode.addOneInsertRowNode(
+        new RelationalInsertRowNode(
+            new PlanNodeId(""),
+            new PartialPath("table1", false),
+            false,
+            new String[] {"s1", "s4"},
+            new TSDataType[] {TSDataType.DOUBLE, TSDataType.BOOLEAN},
+            new MeasurementSchema[] {
+              new MeasurementSchema("s1", TSDataType.DOUBLE),
+              new MeasurementSchema("s4", TSDataType.BOOLEAN),
+            },
+            2000L,
+            new Object[] {2.0, false},
+            false,
+            new TsTableColumnCategory[] {
+              TsTableColumnCategory.TAG, TsTableColumnCategory.ATTRIBUTE
+            }),
+        1);
+    insertRowsNode.setSearchIndex(123L);
+
+    byte[] bytes = new byte[insertRowsNode.serializedSize()];
+    WALByteBufferForTest walBuffer = new 
WALByteBufferForTest(ByteBuffer.wrap(bytes));
+    insertRowsNode.serializeToWAL(walBuffer);
+
+    ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
+    Assert.assertEquals(PlanNodeType.RELATIONAL_INSERT_ROWS.getNodeType(), 
byteBuffer.getShort());
+    Assert.assertEquals(123L, byteBuffer.getLong());
+
+    DataInputStream dataInputStream = new DataInputStream(new 
ByteArrayInputStream(bytes));
+    dataInputStream.readShort();
+
+    RelationalInsertRowsNode tmpNode = 
RelationalInsertRowsNode.deserializeFromWAL(dataInputStream);
+    tmpNode.setPlanNodeId(insertRowsNode.getPlanNodeId());
+    Assert.assertEquals(insertRowsNode, tmpNode);
+    Assert.assertEquals(123L, tmpNode.getSearchIndex());
+    Assert.assertFalse(tmpNode.isLastFragment());
+  }
 }
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/write/InsertTabletNodeSerdeTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/write/InsertTabletNodeSerdeTest.java
index 4d8f5f6e4a8..ddc35e1eda9 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/write/InsertTabletNodeSerdeTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/write/InsertTabletNodeSerdeTest.java
@@ -68,6 +68,7 @@ public class InsertTabletNodeSerdeTest {
   @Test
   public void testSerializeAndDeserializeForWAL() throws IllegalPathException, 
IOException {
     InsertTabletNode insertTabletNode = getInsertTabletNodeWithSchema();
+    insertTabletNode.setLastFragment(true);
 
     int serializedSize = insertTabletNode.serializedSize();
 
@@ -93,6 +94,38 @@ public class InsertTabletNodeSerdeTest {
           new MeasurementSchema("s5", TSDataType.BOOLEAN)
         });
     Assert.assertEquals(insertTabletNode, tmpNode);
+    Assert.assertTrue(tmpNode.isLastFragment());
+  }
+
+  @Test
+  public void testDeserializeLegacyWAL() throws IllegalPathException, 
IOException {
+    InsertTabletNode insertTabletNode = getInsertTabletNodeWithSchema();
+    insertTabletNode.setSearchIndex(123L);
+
+    byte[] bytes = new byte[insertTabletNode.serializedSize()];
+    WALByteBufferForTest walBuffer = new 
WALByteBufferForTest(ByteBuffer.wrap(bytes));
+    insertTabletNode.serializeToWAL(walBuffer);
+
+    ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
+    Assert.assertEquals(PlanNodeType.INSERT_TABLET.getNodeType(), 
byteBuffer.getShort());
+    Assert.assertEquals(123L, byteBuffer.getLong());
+
+    DataInputStream dataInputStream = new DataInputStream(new 
ByteArrayInputStream(bytes));
+    dataInputStream.readShort();
+
+    InsertTabletNode tmpNode = 
InsertTabletNode.deserializeFromWAL(dataInputStream);
+    tmpNode.setPlanNodeId(insertTabletNode.getPlanNodeId());
+    tmpNode.setMeasurementSchemas(
+        new MeasurementSchema[] {
+          new MeasurementSchema("s1", TSDataType.DOUBLE),
+          new MeasurementSchema("s2", TSDataType.FLOAT),
+          new MeasurementSchema("s3", TSDataType.INT64),
+          new MeasurementSchema("s4", TSDataType.INT32),
+          new MeasurementSchema("s5", TSDataType.BOOLEAN)
+        });
+    Assert.assertEquals(insertTabletNode, tmpNode);
+    Assert.assertEquals(123L, tmpNode.getSearchIndex());
+    Assert.assertFalse(tmpNode.isLastFragment());
   }
 
   @Test
@@ -126,6 +159,7 @@ public class InsertTabletNodeSerdeTest {
     for (String tableName : new String[] {"table1", "ta`ble1", "root.table1"}) 
{
       RelationalInsertTabletNode insertTabletNode =
           getRelationalInsertTabletNodeWithSchema(tableName);
+      insertTabletNode.setLastFragment(true);
 
       int serializedSize = insertTabletNode.serializedSize();
 
@@ -153,9 +187,42 @@ public class InsertTabletNodeSerdeTest {
             new MeasurementSchema("s5", TSDataType.BOOLEAN)
           });
       Assert.assertEquals(insertTabletNode, tmpNode);
+      Assert.assertTrue(tmpNode.isLastFragment());
     }
   }
 
+  @Test
+  public void testDeserializeLegacyWALRelational() throws IOException {
+    RelationalInsertTabletNode insertTabletNode = 
getRelationalInsertTabletNodeWithSchema("table1");
+    insertTabletNode.setSearchIndex(123L);
+
+    byte[] bytes = new byte[insertTabletNode.serializedSize()];
+    WALByteBufferForTest walBuffer = new 
WALByteBufferForTest(ByteBuffer.wrap(bytes));
+    insertTabletNode.serializeToWAL(walBuffer);
+
+    ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
+    Assert.assertEquals(PlanNodeType.RELATIONAL_INSERT_TABLET.getNodeType(), 
byteBuffer.getShort());
+    Assert.assertEquals(123L, byteBuffer.getLong());
+
+    DataInputStream dataInputStream = new DataInputStream(new 
ByteArrayInputStream(bytes));
+    dataInputStream.readShort();
+
+    RelationalInsertTabletNode tmpNode =
+        RelationalInsertTabletNode.deserializeFromWAL(dataInputStream);
+    tmpNode.setPlanNodeId(insertTabletNode.getPlanNodeId());
+    tmpNode.setMeasurementSchemas(
+        new MeasurementSchema[] {
+          new MeasurementSchema("s1", TSDataType.DOUBLE),
+          new MeasurementSchema("s2", TSDataType.FLOAT),
+          new MeasurementSchema("s3", TSDataType.INT64),
+          new MeasurementSchema("s4", TSDataType.INT32),
+          new MeasurementSchema("s5", TSDataType.BOOLEAN)
+        });
+    Assert.assertEquals(insertTabletNode, tmpNode);
+    Assert.assertEquals(123L, tmpNode.getSearchIndex());
+    Assert.assertFalse(tmpNode.isLastFragment());
+  }
+
   @Test
   public void testInitTabletValuesWithAllTypes()
       throws NoSuchMethodException, InvocationTargetException, 
IllegalAccessException {
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNodeWaitForRollFileTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNodeWaitForRollFileTest.java
index 2977841d5e6..b24a8cd29cf 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNodeWaitForRollFileTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNodeWaitForRollFileTest.java
@@ -178,6 +178,29 @@ public class WALNodeWaitForRollFileTest {
     assertNotNull(iterator.next());
   }
 
+  @Test
+  public void testLegacySeparatorStillWorksAfterRollFile() throws Exception {
+    IMemTable memTable = new PrimitiveMemTable(databasePath, dataRegionId);
+    walNode.onMemTableCreated(memTable, logDirectory + File.separator + 
"test.tsfile");
+
+    InsertTabletNode insertTabletNode = getInsertTabletNode(devicePath, new 
long[] {1});
+    insertTabletNode.setSearchIndex(1);
+    walNode.log(
+        memTable.getMemTableId(),
+        insertTabletNode,
+        Collections.singletonList(new int[] {0, 
insertTabletNode.getRowCount()}));
+    walNode.log(memTable.getMemTableId(), new 
ContinuousSameSearchIndexSeparatorNode());
+
+    Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() -> 
walNode.isAllWALEntriesConsumed());
+
+    walNode.rollWALFile();
+    Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() -> 
walNode.isAllWALEntriesConsumed());
+
+    ConsensusReqReader.ReqIterator iterator = walNode.getReqIterator(1);
+    assertTrue(iterator.hasNext());
+    assertNotNull(iterator.next());
+  }
+
   /**
    * Verifies that waitForNextReady wakes up when a WAL file roll is triggered 
concurrently. A
    * background thread rolls the WAL file while the main thread waits on the 
iterator.
@@ -190,12 +213,11 @@ public class WALNodeWaitForRollFileTest {
     // write data with search index
     InsertTabletNode insertTabletNode = getInsertTabletNode(devicePath, new 
long[] {1});
     insertTabletNode.setSearchIndex(1);
+    insertTabletNode.setLastFragment(true);
     walNode.log(
         memTable.getMemTableId(),
         insertTabletNode,
         Collections.singletonList(new int[] {0, 
insertTabletNode.getRowCount()}));
-    walNode.log(
-        memTable.getMemTableId(), new 
ContinuousSameSearchIndexSeparatorNode(new PlanNodeId("")));
 
     Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() -> 
walNode.isAllWALEntriesConsumed());
 
@@ -318,12 +340,11 @@ public class WALNodeWaitForRollFileTest {
     // write data with search index — stays in the current (active) WAL file
     InsertTabletNode insertTabletNode = getInsertTabletNode(devicePath, new 
long[] {1});
     insertTabletNode.setSearchIndex(1);
+    insertTabletNode.setLastFragment(true);
     walNode.log(
         memTable.getMemTableId(),
         insertTabletNode,
         Collections.singletonList(new int[] {0, 
insertTabletNode.getRowCount()}));
-    walNode.log(
-        memTable.getMemTableId(), new 
ContinuousSameSearchIndexSeparatorNode(new PlanNodeId("")));
 
     Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() -> 
walNode.isAllWALEntriesConsumed());
 

Reply via email to