This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch pipe-consensus-index
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/pipe-consensus-index by this 
push:
     new b17b6e4ca5d [IOTDB-5723] Pipe: Consensus Index (#9446)
b17b6e4ca5d is described below

commit b17b6e4ca5dcc240e58109ddb90cb32314556839
Author: yschengzi <[email protected]>
AuthorDate: Thu May 25 22:27:06 2023 +0800

    [IOTDB-5723] Pipe: Consensus Index (#9446)
    
    * finish consensus index
    
    * add test
    
    * spotless
    
    * fix serialize
    
    * finish except report
    
    * change dataRegionId from String to TConsensusGroupId
    
    * fix UT for ConsensusIndex
    
    * create reference count for EnrichedEvent
    
    * finish report progress index
    
    * fix UT
    
    * merge heartbeat
    
    * finish datanode id
    
    * add trace
    
    * fix leader change null and updataToMaximum
    
    * delete printTrace
    
    * revert serialize and deserialize in InsertNode
    
    * revert serialize and deserialize in InsertRowsNode
    
    * well define ConsensusIndex
    
    ---------
    
    Co-authored-by: Steve Yurong Su <[email protected]>
---
 .../pipe/runtime/PipeRuntimeCoordinator.java       |   5 +-
 .../confignode/persistence/pipe/PipeTaskInfo.java  |   7 +-
 .../runtime/PipeHandleMetaChangeProcedure.java     |  10 +-
 .../impl/pipe/task/CreatePipeProcedureV2.java      |   4 +-
 .../request/ConfigPhysicalPlanSerDeTest.java       |   9 +-
 .../iotdb/confignode/persistence/PipeInfoTest.java |   3 +-
 .../runtime/PipeHandleMetaChangeProcedureTest.java |   7 +-
 .../index/ComparableConsensusRequest.java          |  26 ++++
 .../commons/consensus/index/ConsensusIndex.java    |  69 ++++++++
 .../consensus/index/ConsensusIndexType.java        |  79 ++++++++++
 .../consensus/index/impl/IoTConsensusIndex.java    | 102 ++++++++++++
 .../index/impl/MinimumConsensusIndex.java          |  78 ++++++++++
 .../iotdb/commons/pipe/task/meta/PipeTaskMeta.java |  59 ++++---
 .../commons/pipe/task/meta/PipeMetaDeSerTest.java  |   7 +-
 .../db/engine/storagegroup/TsFileProcessor.java    |   2 +
 .../db/engine/storagegroup/TsFileResource.java     |  33 ++++
 .../plan/node/write/InsertMultiTabletsNode.java    |   7 +
 .../plan/planner/plan/node/write/InsertNode.java   |  20 ++-
 .../planner/plan/node/write/InsertRowsNode.java    |   7 +
 .../plan/node/write/InsertRowsOfOneDeviceNode.java |   7 +
 .../iotdb/db/pipe/agent/task/PipeTaskAgent.java    |  40 +++--
 .../core/collector/IoTDBDataRegionCollector.java   |   9 +-
 .../PipeHistoricalDataRegionTsFileCollector.java   |  24 ++-
 .../realtime/PipeRealtimeDataRegionCollector.java  |  10 ++
 .../PipeRealtimeDataRegionHybridCollector.java     |   4 +-
 .../realtime/assigner/PipeDataRegionAssigner.java  |   6 +-
 .../iotdb/db/pipe/core/event/EnrichedEvent.java    |  60 ++++++-
 .../core/event/impl/PipeTabletInsertionEvent.java  |  17 +-
 .../core/event/impl/PipeTsFileInsertionEvent.java  |  27 +++-
 .../event/realtime/PipeRealtimeCollectEvent.java   |  27 ++--
 .../org/apache/iotdb/db/pipe/task/PipeBuilder.java |  25 +--
 .../org/apache/iotdb/db/pipe/task/PipeTask.java    |   7 +-
 .../apache/iotdb/db/pipe/task/PipeTaskBuilder.java |  15 +-
 .../db/pipe/task/stage/PipeTaskCollectorStage.java |  11 +-
 .../db/pipe/task/stage/PipeTaskProcessorStage.java |   3 +-
 .../TsFileResourceConsensusIndexTest.java          | 173 +++++++++++++++++++++
 .../collector/CachedSchemaPatternMatcherTest.java  |  14 +-
 .../core/collector/PipeRealtimeCollectTest.java    |  12 +-
 38 files changed, 893 insertions(+), 132 deletions(-)

diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeRuntimeCoordinator.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeRuntimeCoordinator.java
index 3455a6c1369..2992f6dd31e 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeRuntimeCoordinator.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeRuntimeCoordinator.java
@@ -71,7 +71,10 @@ public class PipeRuntimeCoordinator implements 
IClusterStatusSubscriber {
         .forEach(
             (regionId, pair) -> {
               if (regionId.getType().equals(TConsensusGroupType.DataRegion)) {
-                dataRegionGroupToOldAndNewLeaderPairMap.put(regionId, pair);
+                dataRegionGroupToOldAndNewLeaderPairMap.put(
+                    regionId,
+                    new Pair<>( // null or -1 means empty origin leader
+                        pair.left == null ? -1 : pair.left, pair.right == null 
? -1 : pair.right));
               }
             });
 
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
index 76ec9007ffa..5e71d36319f 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.confignode.persistence.pipe;
 
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.consensus.index.impl.MinimumConsensusIndex;
 import org.apache.iotdb.commons.pipe.task.meta.PipeMeta;
 import org.apache.iotdb.commons.pipe.task.meta.PipeMetaKeeper;
 import org.apache.iotdb.commons.pipe.task.meta.PipeStatus;
@@ -210,7 +211,7 @@ public class PipeTaskInfo implements SnapshotProcessor {
                             if (newDataRegionLeader != -1) {
                               consensusGroupIdToTaskMetaMap
                                   .get(dataRegionGroupId)
-                                  .setRegionLeader(newDataRegionLeader);
+                                  .setDataNodeId(newDataRegionLeader);
                             } else {
                               
consensusGroupIdToTaskMetaMap.remove(dataRegionGroupId);
                             }
@@ -221,7 +222,9 @@ public class PipeTaskInfo implements SnapshotProcessor {
                               consensusGroupIdToTaskMetaMap.put(
                                   // TODO: the progress index should be passed 
from the leader
                                   // correctly
-                                  dataRegionGroupId, new PipeTaskMeta(0, 
newDataRegionLeader));
+                                  dataRegionGroupId,
+                                  new PipeTaskMeta(
+                                      new MinimumConsensusIndex(), 
newDataRegionLeader));
                             } else {
                               LOGGER.warn(
                                   "The pipe task meta does not contain the 
data region group {} or the data region group has already been removed",
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java
index db04d909f7d..91e12cda000 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java
@@ -117,7 +117,7 @@ public class PipeHandleMetaChangeProcedure extends 
AbstractOperatePipeProcedureV
           
pipeMetaFromDataNode.getRuntimeMeta().getConsensusGroupIdToTaskMetaMap();
       for (final Map.Entry<TConsensusGroupId, PipeTaskMeta> 
runtimeMetaOnConfigNode :
           pipeTaskMetaMapOnConfigNode.entrySet()) {
-        if (runtimeMetaOnConfigNode.getValue().getRegionLeader() != 
dataNodeId) {
+        if (runtimeMetaOnConfigNode.getValue().getDataNodeId() != dataNodeId) {
           continue;
         }
 
@@ -132,11 +132,13 @@ public class PipeHandleMetaChangeProcedure extends 
AbstractOperatePipeProcedureV
         }
 
         // update progress index
-        if (runtimeMetaOnConfigNode.getValue().getProgressIndex()
-            < runtimeMetaFromDataNode.getProgressIndex()) {
+        if (!runtimeMetaOnConfigNode
+            .getValue()
+            .getProgressIndex()
+            .isAfter(runtimeMetaFromDataNode.getProgressIndex())) {
           runtimeMetaOnConfigNode
               .getValue()
-              .setProgressIndex(runtimeMetaFromDataNode.getProgressIndex());
+              .updateProgressIndex(runtimeMetaFromDataNode.getProgressIndex());
           needWriteConsensusOnConfigNodes = true;
         }
 
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
index e0de835fb07..a21ec8c0f75 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.confignode.procedure.impl.pipe.task;
 
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.commons.consensus.index.impl.MinimumConsensusIndex;
 import org.apache.iotdb.commons.pipe.task.meta.PipeRuntimeMeta;
 import org.apache.iotdb.commons.pipe.task.meta.PipeStaticMeta;
 import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
@@ -104,7 +105,8 @@ public class CreatePipeProcedureV2 extends 
AbstractOperatePipeProcedureV2 {
         .forEach(
             (region, leader) ->
                 // TODO: make index configurable
-                consensusGroupIdToTaskMetaMap.put(region, new PipeTaskMeta(0, 
leader)));
+                consensusGroupIdToTaskMetaMap.put(
+                    region, new PipeTaskMeta(new MinimumConsensusIndex(), 
leader)));
     pipeRuntimeMeta = new PipeRuntimeMeta(consensusGroupIdToTaskMetaMap);
   }
 
diff --git 
a/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
 
b/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
index 9a83e00acd7..302fa209195 100644
--- 
a/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
+++ 
b/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanSerDeTest.java
@@ -34,6 +34,7 @@ import org.apache.iotdb.common.rpc.thrift.TTimedQuota;
 import org.apache.iotdb.common.rpc.thrift.ThrottleType;
 import org.apache.iotdb.commons.auth.AuthException;
 import org.apache.iotdb.commons.auth.entity.PrivilegeType;
+import org.apache.iotdb.commons.consensus.index.impl.MinimumConsensusIndex;
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.partition.DataPartitionTable;
 import org.apache.iotdb.commons.partition.SchemaPartitionTable;
@@ -1050,7 +1051,7 @@ public class ConfigPhysicalPlanSerDeTest {
     collectorAttributes.put("collector", 
"org.apache.iotdb.pipe.collector.DefaultCollector");
     processorAttributes.put("processor", 
"org.apache.iotdb.pipe.processor.SDTFilterProcessor");
     connectorAttributes.put("connector", 
"org.apache.iotdb.pipe.protocal.ThriftTransporter");
-    PipeTaskMeta pipeTaskMeta = new PipeTaskMeta(0, 1);
+    PipeTaskMeta pipeTaskMeta = new PipeTaskMeta(new MinimumConsensusIndex(), 
1);
     Map<TConsensusGroupId, PipeTaskMeta> pipeTasks = new HashMap<>();
     pipeTasks.put(new TConsensusGroupId(DataRegion, 1), pipeTaskMeta);
     PipeStaticMeta pipeStaticMeta =
@@ -1163,10 +1164,12 @@ public class ConfigPhysicalPlanSerDeTest {
               {
                 put(
                     new TConsensusGroupId(TConsensusGroupType.DataRegion, 456),
-                    new PipeTaskMeta(789, 987));
+                    new PipeTaskMeta(
+                        new MinimumConsensusIndex(), 987)); // TODO: replace 
with IoTConsensus
                 put(
                     new TConsensusGroupId(TConsensusGroupType.DataRegion, 123),
-                    new PipeTaskMeta(456, 789));
+                    new PipeTaskMeta(
+                        new MinimumConsensusIndex(), 789)); // TODO: replace 
with IoTConsensus
               }
             });
     pipeMetaList.add(new PipeMeta(pipeStaticMeta, pipeRuntimeMeta));
diff --git 
a/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PipeInfoTest.java
 
b/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PipeInfoTest.java
index 0246759c98c..eb53c6cc5a3 100644
--- 
a/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PipeInfoTest.java
+++ 
b/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PipeInfoTest.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.confignode.persistence;
 
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.commons.consensus.index.impl.MinimumConsensusIndex;
 import org.apache.iotdb.commons.pipe.plugin.meta.PipePluginMeta;
 import org.apache.iotdb.commons.pipe.task.meta.PipeRuntimeMeta;
 import org.apache.iotdb.commons.pipe.task.meta.PipeStaticMeta;
@@ -72,7 +73,7 @@ public class PipeInfoTest {
     collectorAttributes.put("collector", 
"org.apache.iotdb.pipe.collector.DefaultCollector");
     processorAttributes.put("processor", 
"org.apache.iotdb.pipe.processor.SDTFilterProcessor");
     connectorAttributes.put("connector", 
"org.apache.iotdb.pipe.protocal.ThriftTransporter");
-    PipeTaskMeta pipeTaskMeta = new PipeTaskMeta(0, 1);
+    PipeTaskMeta pipeTaskMeta = new PipeTaskMeta(new MinimumConsensusIndex(), 
1);
     Map<TConsensusGroupId, PipeTaskMeta> pipeTasks = new HashMap<>();
     pipeTasks.put(new TConsensusGroupId(DataRegion, 1), pipeTaskMeta);
     PipeStaticMeta pipeStaticMeta =
diff --git 
a/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedureTest.java
 
b/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedureTest.java
index 07d74e2d9ff..5674a9a57cf 100644
--- 
a/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedureTest.java
+++ 
b/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedureTest.java
@@ -21,6 +21,7 @@ package 
org.apache.iotdb.confignode.procedure.impl.pipe.runtime;
 
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
+import org.apache.iotdb.commons.consensus.index.impl.MinimumConsensusIndex;
 import org.apache.iotdb.commons.pipe.task.meta.PipeMeta;
 import org.apache.iotdb.commons.pipe.task.meta.PipeRuntimeMeta;
 import org.apache.iotdb.commons.pipe.task.meta.PipeStaticMeta;
@@ -68,10 +69,12 @@ public class PipeHandleMetaChangeProcedureTest {
               {
                 put(
                     new TConsensusGroupId(TConsensusGroupType.DataRegion, 456),
-                    new PipeTaskMeta(789, 987));
+                    new PipeTaskMeta(
+                        new MinimumConsensusIndex(), 987)); // TODO: replace 
with IoTConsensus
                 put(
                     new TConsensusGroupId(TConsensusGroupType.DataRegion, 123),
-                    new PipeTaskMeta(456, 789));
+                    new PipeTaskMeta(
+                        new MinimumConsensusIndex(), 789)); // TODO: replace 
with IoTConsensus
               }
             });
 
diff --git 
a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/ComparableConsensusRequest.java
 
b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/ComparableConsensusRequest.java
new file mode 100644
index 00000000000..dd0e5dfe319
--- /dev/null
+++ 
b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/ComparableConsensusRequest.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.consensus.index;
+
+public interface ComparableConsensusRequest {
+  ConsensusIndex getConsensusIndex();
+
+  void setConsensusIndex(ConsensusIndex consensusIndex);
+}
diff --git 
a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/ConsensusIndex.java
 
b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/ConsensusIndex.java
new file mode 100644
index 00000000000..e3fcc0afefa
--- /dev/null
+++ 
b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/ConsensusIndex.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.consensus.index;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+public interface ConsensusIndex {
+
+  /** serialize this consensus index to the given byte buffer */
+  void serialize(ByteBuffer byteBuffer);
+
+  /** serialize this consensus index to the given output stream */
+  void serialize(OutputStream stream) throws IOException;
+
+  /**
+   * A.isAfter(B) is true if and only if A is strictly greater than B
+   *
+   * @param consensusIndex the consensus index to be compared
+   * @return true if and only if this consensus index is strictly greater than 
the given consensus
+   *     index
+   */
+  boolean isAfter(ConsensusIndex consensusIndex);
+
+  /**
+   * A.equals(B) is true if and only if A is equal to B
+   *
+   * @param consensusIndex the consensus index to be compared
+   * @return true if and only if this consensus index is equal to the given 
consensus index
+   */
+  boolean equals(ConsensusIndex consensusIndex);
+
+  /**
+   * C = A.updateToMaximum(B) where C should satisfy:
+   *
+   * <p>(C.equals(A) || C.isAfter(A)) is true
+   *
+   * <p>(C.equals(B) || C.isAfter(B)) is true
+   *
+   * <p>There is no D, such that D satisfies the above conditions and 
C.isAfter(D) is true
+   *
+   * <p>The implementation of this function should be reflexive, that is
+   * A.updateToMaximum(B).equals(B.updateToMaximum(A)) is true
+   *
+   * <p>Note: this function may modify the caller.
+   *
+   * @param consensusIndex the consensus index to be compared
+   * @return the maximum of the two consensus indexes
+   */
+  ConsensusIndex updateToMaximum(ConsensusIndex consensusIndex);
+}
diff --git 
a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/ConsensusIndexType.java
 
b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/ConsensusIndexType.java
new file mode 100644
index 00000000000..28a81a243db
--- /dev/null
+++ 
b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/ConsensusIndexType.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.consensus.index;
+
+import org.apache.iotdb.commons.consensus.index.impl.IoTConsensusIndex;
+import org.apache.iotdb.commons.consensus.index.impl.MinimumConsensusIndex;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+public enum ConsensusIndexType {
+  MINIMUM_CONSENSUS_INDEX((short) 1),
+  IOT_CONSENSUS_INDEX((short) 2),
+  ;
+
+  private final short type;
+
+  ConsensusIndexType(short type) {
+    this.type = type;
+  }
+
+  public short getType() {
+    return type;
+  }
+
+  public void serialize(ByteBuffer byteBuffer) {
+    ReadWriteIOUtils.write(type, byteBuffer);
+  }
+
+  public void serialize(OutputStream stream) throws IOException {
+    ReadWriteIOUtils.write(type, stream);
+  }
+
+  public static ConsensusIndex deserializeFrom(ByteBuffer byteBuffer) {
+    short indexType = byteBuffer.getShort();
+    switch (indexType) {
+      case 1:
+        return MinimumConsensusIndex.deserializeFrom(byteBuffer);
+      case 2:
+        return IoTConsensusIndex.deserializeFrom(byteBuffer);
+      default:
+        throw new UnsupportedOperationException(
+            String.format("Unsupported Consensus Index type %s.", indexType));
+    }
+  }
+
+  public static ConsensusIndex deserializeFrom(InputStream stream) throws 
IOException {
+    short indexType = ReadWriteIOUtils.readShort(stream);
+    switch (indexType) {
+      case 1:
+        return MinimumConsensusIndex.deserializeFrom(stream);
+      case 2:
+        return IoTConsensusIndex.deserializeFrom(stream);
+      default:
+        throw new UnsupportedOperationException(
+            String.format("Unsupported Consensus Index type %s.", indexType));
+    }
+  }
+}
diff --git 
a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/IoTConsensusIndex.java
 
b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/IoTConsensusIndex.java
new file mode 100644
index 00000000000..3bd7f11665b
--- /dev/null
+++ 
b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/IoTConsensusIndex.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.consensus.index.impl;
+
+import org.apache.iotdb.commons.consensus.index.ConsensusIndex;
+import org.apache.iotdb.commons.consensus.index.ConsensusIndexType;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+
+public class IoTConsensusIndex implements ConsensusIndex {
+  private final Map<Integer, Long> peerId2SearchIndex;
+
+  public IoTConsensusIndex() {
+    peerId2SearchIndex = new HashMap<>();
+  }
+
+  @Override
+  public void serialize(ByteBuffer byteBuffer) {
+    ConsensusIndexType.IOT_CONSENSUS_INDEX.serialize(byteBuffer);
+    // TODO: impl it
+  }
+
+  @Override
+  public void serialize(OutputStream stream) throws IOException {
+    ConsensusIndexType.IOT_CONSENSUS_INDEX.serialize(stream);
+    // TODO: impl it
+  }
+
+  @Override
+  public boolean isAfter(ConsensusIndex consensusIndex) {
+    if (!(consensusIndex instanceof IoTConsensusIndex)) {
+      return false;
+    }
+
+    return ((IoTConsensusIndex) consensusIndex)
+        .peerId2SearchIndex.entrySet().stream()
+            .noneMatch(
+                entry ->
+                    !this.peerId2SearchIndex.containsKey(entry.getKey())
+                        || this.peerId2SearchIndex.get(entry.getKey()) <= 
entry.getValue());
+  }
+
+  @Override
+  public boolean equals(ConsensusIndex consensusIndex) {
+    if (!(consensusIndex instanceof IoTConsensusIndex)) {
+      return false;
+    }
+
+    return ((IoTConsensusIndex) consensusIndex)
+        .peerId2SearchIndex.entrySet().stream()
+            .allMatch(
+                entry ->
+                    this.peerId2SearchIndex.containsKey(entry.getKey())
+                        && 
this.peerId2SearchIndex.get(entry.getKey()).equals(entry.getValue()));
+  }
+
+  @Override
+  public ConsensusIndex updateToMaximum(ConsensusIndex consensusIndex) {
+    if (!(consensusIndex instanceof IoTConsensusIndex)) {
+      return this;
+    }
+
+    ((IoTConsensusIndex) consensusIndex)
+        .peerId2SearchIndex.forEach(
+            (thatK, thatV) ->
+                this.peerId2SearchIndex.compute(
+                    thatK, (thisK, thisV) -> (thisV == null ? thatV : 
Math.max(thisV, thatV))));
+    return this;
+  }
+
+  public static IoTConsensusIndex deserializeFrom(ByteBuffer byteBuffer) {
+    // TODO: impl it
+    return new IoTConsensusIndex();
+  }
+
+  public static IoTConsensusIndex deserializeFrom(InputStream stream) {
+    // TODO: impl it
+    return new IoTConsensusIndex();
+  }
+}
diff --git 
a/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/MinimumConsensusIndex.java
 
b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/MinimumConsensusIndex.java
new file mode 100644
index 00000000000..03ba2ad2498
--- /dev/null
+++ 
b/node-commons/src/main/java/org/apache/iotdb/commons/consensus/index/impl/MinimumConsensusIndex.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.consensus.index.impl;
+
+import org.apache.iotdb.commons.consensus.index.ConsensusIndex;
+import org.apache.iotdb.commons.consensus.index.ConsensusIndexType;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+public class MinimumConsensusIndex implements ConsensusIndex {
+  public MinimumConsensusIndex() {}
+
+  @Override
+  public void serialize(ByteBuffer byteBuffer) {
+    ConsensusIndexType.MINIMUM_CONSENSUS_INDEX.serialize(byteBuffer);
+  }
+
+  @Override
+  public void serialize(OutputStream stream) throws IOException {
+    ConsensusIndexType.MINIMUM_CONSENSUS_INDEX.serialize(stream);
+  }
+
+  @Override
+  public boolean isAfter(ConsensusIndex consensusIndex) {
+    return false;
+  }
+
+  @Override
+  public boolean equals(ConsensusIndex consensusIndex) {
+    return consensusIndex instanceof MinimumConsensusIndex;
+  }
+
+  @Override
+  public ConsensusIndex updateToMaximum(ConsensusIndex consensusIndex) {
+    return consensusIndex == null ? this : consensusIndex;
+  }
+
+  public static MinimumConsensusIndex deserializeFrom(ByteBuffer byteBuffer) {
+    return new MinimumConsensusIndex();
+  }
+
+  public static MinimumConsensusIndex deserializeFrom(InputStream stream) {
+    return new MinimumConsensusIndex();
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+    return obj != null && getClass() == obj.getClass();
+  }
+
+  @Override
+  public String toString() {
+    return "MinimumConsensusIndex{}";
+  }
+}
diff --git 
a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTaskMeta.java
 
b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTaskMeta.java
index bb5149fb1f2..a66254b2f3a 100644
--- 
a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTaskMeta.java
+++ 
b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/task/meta/PipeTaskMeta.java
@@ -19,6 +19,8 @@
 
 package org.apache.iotdb.commons.pipe.task.meta;
 
+import org.apache.iotdb.commons.consensus.index.ConsensusIndex;
+import org.apache.iotdb.commons.consensus.index.ConsensusIndexType;
 import org.apache.iotdb.pipe.api.exception.PipeRuntimeCriticalException;
 import org.apache.iotdb.pipe.api.exception.PipeRuntimeException;
 import org.apache.iotdb.pipe.api.exception.PipeRuntimeNonCriticalException;
@@ -34,28 +36,25 @@ import java.util.Objects;
 import java.util.Queue;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
 
 public class PipeTaskMeta {
 
-  // TODO: replace it with consensus index
-  private final AtomicLong progressIndex = new AtomicLong(0L);
-  private final AtomicInteger regionLeader = new AtomicInteger(0);
+  private final AtomicReference<ConsensusIndex> progressIndex = new 
AtomicReference<>();
+  private final AtomicInteger dataNodeId = new AtomicInteger(0);
   private final Queue<PipeRuntimeException> exceptionMessages = new 
ConcurrentLinkedQueue<>();
 
-  private PipeTaskMeta() {}
-
-  public PipeTaskMeta(long progressIndex, int regionLeader) {
+  public PipeTaskMeta(ConsensusIndex progressIndex, int dataNodeId) {
     this.progressIndex.set(progressIndex);
-    this.regionLeader.set(regionLeader);
+    this.dataNodeId.set(dataNodeId);
   }
 
-  public long getProgressIndex() {
+  public ConsensusIndex getProgressIndex() {
     return progressIndex.get();
   }
 
-  public int getRegionLeader() {
-    return regionLeader.get();
+  public int getDataNodeId() {
+    return dataNodeId.get();
   }
 
   public Iterable<PipeRuntimeException> getExceptionMessages() {
@@ -70,17 +69,17 @@ public class PipeTaskMeta {
     exceptionMessages.clear();
   }
 
-  public void setProgressIndex(long progressIndex) {
-    this.progressIndex.set(progressIndex);
+  public void updateProgressIndex(ConsensusIndex updateIndex) {
+    progressIndex.updateAndGet(index -> index.updateToMaximum(updateIndex));
   }
 
-  public void setRegionLeader(int regionLeader) {
-    this.regionLeader.set(regionLeader);
+  public void setDataNodeId(int dataNodeId) {
+    this.dataNodeId.set(dataNodeId);
   }
 
   public void serialize(DataOutputStream outputStream) throws IOException {
-    ReadWriteIOUtils.write(progressIndex.get(), outputStream);
-    ReadWriteIOUtils.write(regionLeader.get(), outputStream);
+    progressIndex.get().serialize(outputStream);
+    ReadWriteIOUtils.write(dataNodeId.get(), outputStream);
     ReadWriteIOUtils.write(exceptionMessages.size(), outputStream);
     for (final PipeRuntimeException exceptionMessage : exceptionMessages) {
       ReadWriteIOUtils.write(
@@ -90,8 +89,8 @@ public class PipeTaskMeta {
   }
 
   public void serialize(FileOutputStream outputStream) throws IOException {
-    ReadWriteIOUtils.write(progressIndex.get(), outputStream);
-    ReadWriteIOUtils.write(regionLeader.get(), outputStream);
+    progressIndex.get().serialize(outputStream);
+    ReadWriteIOUtils.write(dataNodeId.get(), outputStream);
     ReadWriteIOUtils.write(exceptionMessages.size(), outputStream);
     for (final PipeRuntimeException exceptionMessage : exceptionMessages) {
       ReadWriteIOUtils.write(
@@ -101,9 +100,9 @@ public class PipeTaskMeta {
   }
 
   public static PipeTaskMeta deserialize(ByteBuffer byteBuffer) {
-    final PipeTaskMeta PipeTaskMeta = new PipeTaskMeta();
-    PipeTaskMeta.progressIndex.set(ReadWriteIOUtils.readLong(byteBuffer));
-    PipeTaskMeta.regionLeader.set(ReadWriteIOUtils.readInt(byteBuffer));
+    final PipeTaskMeta PipeTaskMeta =
+        new PipeTaskMeta(
+            ConsensusIndexType.deserializeFrom(byteBuffer), 
ReadWriteIOUtils.readInt(byteBuffer));
     final int size = ReadWriteIOUtils.readInt(byteBuffer);
     for (int i = 0; i < size; ++i) {
       final boolean critical = ReadWriteIOUtils.readBool(byteBuffer);
@@ -117,9 +116,9 @@ public class PipeTaskMeta {
   }
 
   public static PipeTaskMeta deserialize(InputStream inputStream) throws 
IOException {
-    final PipeTaskMeta PipeTaskMeta = new PipeTaskMeta();
-    PipeTaskMeta.progressIndex.set(ReadWriteIOUtils.readLong(inputStream));
-    PipeTaskMeta.regionLeader.set(ReadWriteIOUtils.readInt(inputStream));
+    final PipeTaskMeta PipeTaskMeta =
+        new PipeTaskMeta(
+            ConsensusIndexType.deserializeFrom(inputStream), 
ReadWriteIOUtils.readInt(inputStream));
     final int size = ReadWriteIOUtils.readInt(inputStream);
     for (int i = 0; i < size; ++i) {
       final boolean critical = ReadWriteIOUtils.readBool(inputStream);
@@ -141,14 +140,14 @@ public class PipeTaskMeta {
       return false;
     }
     PipeTaskMeta that = (PipeTaskMeta) obj;
-    return progressIndex.get() == that.progressIndex.get()
-        && regionLeader.get() == that.regionLeader.get()
+    return progressIndex.get().equals(that.progressIndex.get())
+        && dataNodeId.get() == that.dataNodeId.get()
         && Arrays.equals(exceptionMessages.toArray(), 
that.exceptionMessages.toArray());
   }
 
   @Override
   public int hashCode() {
-    return Objects.hash(progressIndex, regionLeader, exceptionMessages);
+    return Objects.hash(progressIndex, dataNodeId, exceptionMessages);
   }
 
   @Override
@@ -157,8 +156,8 @@ public class PipeTaskMeta {
         + "progressIndex='"
         + progressIndex
         + '\''
-        + ", regionLeader='"
-        + regionLeader
+        + ", dataNodeId='"
+        + dataNodeId
         + '\''
         + ", exceptionMessages="
         + exceptionMessages
diff --git 
a/node-commons/src/test/java/org/apache/iotdb/commons/pipe/task/meta/PipeMetaDeSerTest.java
 
b/node-commons/src/test/java/org/apache/iotdb/commons/pipe/task/meta/PipeMetaDeSerTest.java
index 08e7f86007c..2ec09c7fe1f 100644
--- 
a/node-commons/src/test/java/org/apache/iotdb/commons/pipe/task/meta/PipeMetaDeSerTest.java
+++ 
b/node-commons/src/test/java/org/apache/iotdb/commons/pipe/task/meta/PipeMetaDeSerTest.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.commons.pipe.task.meta;
 
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
+import org.apache.iotdb.commons.consensus.index.impl.MinimumConsensusIndex;
 
 import org.junit.Assert;
 import org.junit.Test;
@@ -59,10 +60,12 @@ public class PipeMetaDeSerTest {
               {
                 put(
                     new TConsensusGroupId(TConsensusGroupType.DataRegion, 456),
-                    new PipeTaskMeta(789, 987));
+                    new PipeTaskMeta(
+                        new MinimumConsensusIndex(), 987)); // TODO: replace 
with IoTConsensusIndex;
                 put(
                     new TConsensusGroupId(TConsensusGroupType.DataRegion, 123),
-                    new PipeTaskMeta(456, 789));
+                    new PipeTaskMeta(
+                        new MinimumConsensusIndex(), 789)); // TODO: replace 
with IoTConsensusIndex;
               }
             });
     ByteBuffer runtimeByteBuffer = pipeRuntimeMeta.serialize();
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index 55f06629258..6cce0662917 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -297,6 +297,7 @@ public class TsFileProcessor {
       tsFileResource.updateEndTime(
           insertRowNode.getDeviceID().toStringID(), insertRowNode.getTime());
     }
+    tsFileResource.updateConsensusIndex(insertRowNode.getConsensusIndex());
     PERFORMANCE_OVERVIEW_METRICS.recordScheduleMemTableCost(System.nanoTime() 
- startTime);
   }
 
@@ -406,6 +407,7 @@ public class TsFileProcessor {
       tsFileResource.updateEndTime(
           insertTabletNode.getDeviceID().toStringID(), 
insertTabletNode.getTimes()[end - 1]);
     }
+    tsFileResource.updateConsensusIndex(insertTabletNode.getConsensusIndex());
     PERFORMANCE_OVERVIEW_METRICS.recordScheduleMemTableCost(System.nanoTime() 
- startTime);
   }
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
index 0e5949f41b4..5b1a3de846a 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
@@ -18,6 +18,8 @@
  */
 package org.apache.iotdb.db.engine.storagegroup;
 
+import org.apache.iotdb.commons.consensus.index.ConsensusIndex;
+import org.apache.iotdb.commons.consensus.index.ConsensusIndexType;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.db.conf.IoTDBConfig;
@@ -153,6 +155,8 @@ public class TsFileResource {
    */
   private TsFileResource originTsFileResource;
 
+  private ConsensusIndex maxConsensusIndex;
+
   public TsFileResource() {}
 
   public TsFileResource(TsFileResource other) throws IOException {
@@ -170,6 +174,7 @@ public class TsFileResource {
     this.minPlanIndex = other.minPlanIndex;
     this.version = FilePathUtils.splitAndGetTsFileVersion(this.file.getName());
     this.tsFileSize = other.tsFileSize;
+    this.maxConsensusIndex = other.maxConsensusIndex;
   }
 
   /** for sealed TsFile, call setClosed to close TsFileResource */
@@ -243,6 +248,9 @@ public class TsFileResource {
         String modFileName = new File(modFile.getFilePath()).getName();
         ReadWriteIOUtils.write(modFileName, outputStream);
       }
+      if (maxConsensusIndex != null) {
+        maxConsensusIndex.serialize(outputStream);
+      }
     }
     File src = fsFactory.getFile(file + RESOURCE_SUFFIX + TEMP_SUFFIX);
     File dest = fsFactory.getFile(file + RESOURCE_SUFFIX);
@@ -265,6 +273,9 @@ public class TsFileResource {
           modFile = new ModificationFile(modF.getPath());
         }
       }
+      if (inputStream.available() > 0) {
+        maxConsensusIndex = ConsensusIndexType.deserializeFrom(inputStream);
+      }
     }
 
     // upgrade from v0.12 to v0.13, we need to rewrite the TsFileResource if 
the previous time index
@@ -310,6 +321,9 @@ public class TsFileResource {
           modFile = new ModificationFile(modF.getPath());
         }
       }
+      if (inputStream.available() > 0) {
+        maxConsensusIndex = ConsensusIndexType.deserializeFrom(inputStream);
+      }
     }
   }
 
@@ -1122,4 +1136,23 @@ public class TsFileResource {
   public boolean isFileInList() {
     return prev != null || next != null;
   }
+
+  public void updateConsensusIndex(ConsensusIndex consensusIndex) {
+    if (consensusIndex == null) {
+      return;
+    }
+
+    maxConsensusIndex =
+        (maxConsensusIndex == null
+            ? consensusIndex
+            : maxConsensusIndex.updateToMaximum(consensusIndex));
+  }
+
+  public ConsensusIndex getMaxConsensusIndexAfterClose() throws 
IllegalStateException {
+    if (status.equals(TsFileResourceStatus.UNCLOSED)) {
+      throw new IllegalStateException(
+          "Should not get consensus index from a unclosing TsFileResource.");
+    }
+    return maxConsensusIndex;
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertMultiTabletsNode.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertMultiTabletsNode.java
index 5a4e198b77c..e4b9817da9d 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertMultiTabletsNode.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertMultiTabletsNode.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.db.mpp.plan.planner.plan.node.write;
 
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.consensus.index.ConsensusIndex;
 import org.apache.iotdb.commons.utils.StatusUtils;
 import org.apache.iotdb.db.mpp.plan.analyze.Analysis;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
@@ -263,4 +264,10 @@ public class InsertMultiTabletsNode extends InsertNode {
   public long getMinTime() {
     throw new NotImplementedException();
   }
+
+  @Override
+  public void setConsensusIndex(ConsensusIndex consensusIndex) {
+    this.consensusIndex = consensusIndex;
+    insertTabletNodeList.forEach(node -> 
node.setConsensusIndex(consensusIndex));
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertNode.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertNode.java
index c191f03837a..c60057e7194 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertNode.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertNode.java
@@ -19,6 +19,8 @@
 package org.apache.iotdb.db.mpp.plan.planner.plan.node.write;
 
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.commons.consensus.index.ComparableConsensusRequest;
+import org.apache.iotdb.commons.consensus.index.ConsensusIndex;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.consensus.iot.wal.ConsensusReqReader;
@@ -40,7 +42,7 @@ import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.Objects;
 
-public abstract class InsertNode extends WritePlanNode {
+public abstract class InsertNode extends WritePlanNode implements 
ComparableConsensusRequest {
 
   /** this insert node doesn't need to participate in iot consensus */
   public static final long NO_CONSENSUS_INDEX = 
ConsensusReqReader.DEFAULT_SEARCH_INDEX;
@@ -73,6 +75,8 @@ public abstract class InsertNode extends WritePlanNode {
   /** Physical address of data region after splitting */
   protected TRegionReplicaSet dataRegionReplicaSet;
 
+  protected ConsensusIndex consensusIndex;
+
   protected InsertNode(PlanNodeId id) {
     super(id);
   }
@@ -267,6 +271,20 @@ public abstract class InsertNode extends WritePlanNode {
   }
   // endregion
 
+  // region consensus index
+
+  @Override
+  public ConsensusIndex getConsensusIndex() {
+    return consensusIndex;
+  }
+
+  @Override
+  public void setConsensusIndex(ConsensusIndex consensusIndex) {
+    this.consensusIndex = consensusIndex;
+  }
+
+  // endregion
+
   @Override
   public boolean equals(Object o) {
     if (this == o) return true;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowsNode.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowsNode.java
index 050839f6428..dcfe98e72ae 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowsNode.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowsNode.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.mpp.plan.planner.plan.node.write;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.consensus.index.ConsensusIndex;
 import org.apache.iotdb.commons.utils.StatusUtils;
 import org.apache.iotdb.db.mpp.plan.analyze.Analysis;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
@@ -240,4 +241,10 @@ public class InsertRowsNode extends InsertNode {
   public long getMinTime() {
     throw new NotImplementedException();
   }
+
+  @Override
+  public void setConsensusIndex(ConsensusIndex consensusIndex) {
+    this.consensusIndex = consensusIndex;
+    insertRowNodeList.forEach(insertRowNode -> 
setConsensusIndex(consensusIndex));
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
index dd788ea4db0..45d28ab5005 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.db.mpp.plan.planner.plan.node.write;
 
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.consensus.index.ConsensusIndex;
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.utils.StatusUtils;
@@ -292,4 +293,10 @@ public class InsertRowsOfOneDeviceNode extends InsertNode {
   public long getMinTime() {
     throw new NotImplementedException();
   }
+
+  @Override
+  public void setConsensusIndex(ConsensusIndex consensusIndex) {
+    this.consensusIndex = consensusIndex;
+    insertRowNodeList.forEach(insertRowNode -> 
insertRowNode.setConsensusIndex(consensusIndex));
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java 
b/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
index 2ff33b450c7..9206d5cd474 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
@@ -26,6 +26,8 @@ import 
org.apache.iotdb.commons.pipe.task.meta.PipeRuntimeMeta;
 import org.apache.iotdb.commons.pipe.task.meta.PipeStaticMeta;
 import org.apache.iotdb.commons.pipe.task.meta.PipeStatus;
 import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.pipe.agent.PipeAgent;
 import org.apache.iotdb.db.pipe.task.PipeBuilder;
 import org.apache.iotdb.db.pipe.task.PipeTask;
@@ -65,6 +67,7 @@ import java.util.stream.Collectors;
 public class PipeTaskAgent {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeTaskAgent.class);
+  private static final IoTDBConfig CONFIG = 
IoTDBDescriptor.getInstance().getConfig();
 
   private final PipeMetaKeeper pipeMetaKeeper;
   private final PipeTaskManager pipeTaskManager;
@@ -156,11 +159,7 @@ public class PipeTaskAgent {
 
       // if task meta does not exist on data node, create a new task
       if (taskMetaOnDataNode == null) {
-        createPipeTask(
-            consensusGroupIdFromConfigNode,
-            pipeStaticMeta,
-            taskMetaFromConfigNode.getProgressIndex(),
-            taskMetaFromConfigNode.getRegionLeader());
+        createPipeTask(consensusGroupIdFromConfigNode, pipeStaticMeta, 
taskMetaFromConfigNode);
         // we keep the new created task's status consistent with the status 
recorded in data node's
         // pipe runtime meta. please note that the status recorded in data 
node's pipe runtime meta
         // is not reliable, but we will have a check later to make sure the 
status is correct.
@@ -171,16 +170,12 @@ public class PipeTaskAgent {
       }
 
       // if task meta exists on data node, check if it has changed
-      final int regionLeaderFromConfigNode = 
taskMetaFromConfigNode.getRegionLeader();
-      final int regionLeaderOnDataNode = taskMetaOnDataNode.getRegionLeader();
+      final int dataNodeIdFromConfigNode = 
taskMetaFromConfigNode.getDataNodeId();
+      final int dataNodeIdOnDataNode = taskMetaOnDataNode.getDataNodeId();
 
-      if (regionLeaderFromConfigNode != regionLeaderOnDataNode) {
+      if (dataNodeIdFromConfigNode != dataNodeIdOnDataNode) {
         dropPipeTask(consensusGroupIdFromConfigNode, pipeStaticMeta);
-        createPipeTask(
-            consensusGroupIdFromConfigNode,
-            pipeStaticMeta,
-            taskMetaFromConfigNode.getProgressIndex(),
-            taskMetaFromConfigNode.getRegionLeader());
+        createPipeTask(consensusGroupIdFromConfigNode, pipeStaticMeta, 
taskMetaFromConfigNode);
         // we keep the new created task's status consistent with the status 
recorded in data node's
         // pipe runtime meta. please note that the status recorded in data 
node's pipe runtime meta
         // is not reliable, but we will have a check later to make sure the 
status is correct.
@@ -511,19 +506,22 @@ public class PipeTaskAgent {
   ///////////////////////// Manage by dataRegionGroupId 
/////////////////////////
 
   private void createPipeTask(
-      TConsensusGroupId dataRegionGroupId,
+      TConsensusGroupId consensusGroupId,
       PipeStaticMeta pipeStaticMeta,
-      long progressIndex,
-      int dataRegionId) {
-    final PipeTask pipeTask =
-        new PipeTaskBuilder(Integer.toString(dataRegionId), 
pipeStaticMeta).build();
-    pipeTask.create();
-    pipeTaskManager.addPipeTask(pipeStaticMeta, dataRegionGroupId, pipeTask);
+      PipeTaskMeta pipeTaskMeta) {
+    if (pipeTaskMeta.getDataNodeId() == CONFIG.getDataNodeId()) {
+      final PipeTask pipeTask =
+          new PipeTaskBuilder(consensusGroupId, pipeTaskMeta, 
pipeStaticMeta).build();
+      pipeTask.create();
+      pipeTaskManager.addPipeTask(pipeStaticMeta, consensusGroupId, pipeTask);
+    }
     pipeMetaKeeper
         .getPipeMeta(pipeStaticMeta.getPipeName())
         .getRuntimeMeta()
         .getConsensusGroupIdToTaskMetaMap()
-        .put(dataRegionGroupId, new PipeTaskMeta(progressIndex, dataRegionId));
+        .put(
+            consensusGroupId,
+            new PipeTaskMeta(pipeTaskMeta.getProgressIndex(), 
pipeTaskMeta.getDataNodeId()));
   }
 
   private void dropPipeTask(TConsensusGroupId dataRegionGroupId, 
PipeStaticMeta pipeStaticMeta) {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/IoTDBDataRegionCollector.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/IoTDBDataRegionCollector.java
index fe1c92638be..fa7b07c182b 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/IoTDBDataRegionCollector.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/IoTDBDataRegionCollector.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.pipe.core.collector;
 
 import org.apache.iotdb.commons.consensus.DataRegionId;
+import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
 import org.apache.iotdb.db.engine.StorageEngine;
 import org.apache.iotdb.db.pipe.config.PipeCollectorConstant;
 import 
org.apache.iotdb.db.pipe.core.collector.historical.PipeHistoricalDataRegionTsFileCollector;
@@ -44,10 +45,12 @@ public class IoTDBDataRegionCollector implements 
PipeCollector {
 
   private int dataRegionId;
 
-  public IoTDBDataRegionCollector(ListenableUnblockingPendingQueue<Event> 
collectorPendingQueue) {
+  public IoTDBDataRegionCollector(
+      PipeTaskMeta pipeTaskMeta, ListenableUnblockingPendingQueue<Event> 
collectorPendingQueue) {
     hasBeenStarted = new AtomicBoolean(false);
-    realtimeCollector = new 
PipeRealtimeDataRegionHybridCollector(collectorPendingQueue);
-    historicalCollector = new PipeHistoricalDataRegionTsFileCollector();
+    realtimeCollector =
+        new PipeRealtimeDataRegionHybridCollector(pipeTaskMeta, 
collectorPendingQueue);
+    historicalCollector = new 
PipeHistoricalDataRegionTsFileCollector(pipeTaskMeta);
   }
 
   @Override
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/historical/PipeHistoricalDataRegionTsFileCollector.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/historical/PipeHistoricalDataRegionTsFileCollector.java
index ed0d37c6c14..1235f010b8b 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/historical/PipeHistoricalDataRegionTsFileCollector.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/historical/PipeHistoricalDataRegionTsFileCollector.java
@@ -20,6 +20,8 @@
 package org.apache.iotdb.db.pipe.core.collector.historical;
 
 import org.apache.iotdb.commons.consensus.DataRegionId;
+import org.apache.iotdb.commons.consensus.index.ConsensusIndex;
+import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
 import org.apache.iotdb.db.engine.StorageEngine;
 import org.apache.iotdb.db.engine.storagegroup.DataRegion;
 import org.apache.iotdb.db.engine.storagegroup.TsFileManager;
@@ -37,10 +39,17 @@ import java.util.stream.Collectors;
 
 public class PipeHistoricalDataRegionTsFileCollector implements PipeCollector {
 
+  private final PipeTaskMeta pipeTaskMeta;
+  private final ConsensusIndex startIndex;
   private int dataRegionId;
 
   private Queue<PipeTsFileInsertionEvent> pendingQueue;
 
+  public PipeHistoricalDataRegionTsFileCollector(PipeTaskMeta pipeTaskMeta) {
+    this.pipeTaskMeta = pipeTaskMeta;
+    this.startIndex = pipeTaskMeta.getProgressIndex();
+  }
+
   @Override
   public void validate(PipeParameterValidator validator) throws Exception {
     validator.validateRequiredAttribute(PipeCollectorConstant.DATA_REGION_KEY);
@@ -71,16 +80,25 @@ public class PipeHistoricalDataRegionTsFileCollector 
implements PipeCollector {
         pendingQueue = new ArrayDeque<>(tsFileManager.size(true) + 
tsFileManager.size(false));
         pendingQueue.addAll(
             tsFileManager.getTsFileList(true).stream()
+                .filter(
+                    resource ->
+                        resource.getMaxConsensusIndexAfterClose() == null
+                            || 
!startIndex.isAfter(resource.getMaxConsensusIndexAfterClose()))
                 .map(PipeTsFileInsertionEvent::new)
                 .collect(Collectors.toList()));
         pendingQueue.addAll(
             tsFileManager.getTsFileList(false).stream()
+                .filter(
+                    resource ->
+                        resource.getMaxConsensusIndexAfterClose() == null
+                            || 
!startIndex.isAfter(resource.getMaxConsensusIndexAfterClose()))
                 .map(PipeTsFileInsertionEvent::new)
                 .collect(Collectors.toList()));
         pendingQueue.forEach(
-            event ->
-                event.increaseReferenceCount(
-                    PipeHistoricalDataRegionTsFileCollector.class.getName()));
+            event -> {
+              event.reportProgressIndexToPipeTaskMetaWhenFinish(pipeTaskMeta);
+              
event.increaseReferenceCount(PipeHistoricalDataRegionTsFileCollector.class.getName());
+            });
       } finally {
         tsFileManager.readUnlock();
       }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionCollector.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionCollector.java
index 41e9f5d76fb..b84f86040fb 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionCollector.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionCollector.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.pipe.core.collector.realtime;
 
+import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
 import org.apache.iotdb.db.pipe.config.PipeCollectorConstant;
 import 
org.apache.iotdb.db.pipe.core.collector.realtime.listener.PipeInsertionDataNodeListener;
 import org.apache.iotdb.db.pipe.core.event.realtime.PipeRealtimeCollectEvent;
@@ -31,6 +32,11 @@ public abstract class PipeRealtimeDataRegionCollector 
implements PipeCollector {
 
   protected String pattern;
   protected String dataRegionId;
+  protected final PipeTaskMeta pipeTaskMeta;
+
+  public PipeRealtimeDataRegionCollector(PipeTaskMeta pipeTaskMeta) {
+    this.pipeTaskMeta = pipeTaskMeta;
+  }
 
   @Override
   public void validate(PipeParameterValidator validator) throws Exception {
@@ -64,6 +70,10 @@ public abstract class PipeRealtimeDataRegionCollector 
implements PipeCollector {
     return pattern;
   }
 
+  public final PipeTaskMeta getPipeTaskMeta() {
+    return pipeTaskMeta;
+  }
+
   @Override
   public String toString() {
     return "PipeRealtimeDataRegionCollector{"
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionHybridCollector.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionHybridCollector.java
index 898b93cb22d..4ec7ed92261 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionHybridCollector.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionHybridCollector.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.pipe.core.collector.realtime;
 
+import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
 import org.apache.iotdb.db.pipe.agent.PipeAgent;
 import org.apache.iotdb.db.pipe.config.PipeConfig;
 import org.apache.iotdb.db.pipe.core.event.realtime.PipeRealtimeCollectEvent;
@@ -44,7 +45,8 @@ public class PipeRealtimeDataRegionHybridCollector extends 
PipeRealtimeDataRegio
   private final ListenableUnblockingPendingQueue<Event> pendingQueue;
 
   public PipeRealtimeDataRegionHybridCollector(
-      ListenableUnblockingPendingQueue<Event> pendingQueue) {
+      PipeTaskMeta pipeTaskMeta, ListenableUnblockingPendingQueue<Event> 
pendingQueue) {
+    super(pipeTaskMeta);
     this.pendingQueue = pendingQueue;
   }
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/assigner/PipeDataRegionAssigner.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/assigner/PipeDataRegionAssigner.java
index 2d805310f30..49509f0b961 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/assigner/PipeDataRegionAssigner.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/assigner/PipeDataRegionAssigner.java
@@ -53,8 +53,10 @@ public class PipeDataRegionAssigner {
         .match(event)
         .forEach(
             collector -> {
-              
event.increaseReferenceCount(PipeDataRegionAssigner.class.getName());
-              collector.collect(event);
+              PipeRealtimeCollectEvent copiedEvent = event.shallowCopySelf();
+              
copiedEvent.reportProgressIndexToPipeTaskMetaWhenFinish(collector.getPipeTaskMeta());
+              
copiedEvent.increaseReferenceCount(PipeDataRegionAssigner.class.getName());
+              collector.collect(copiedEvent);
             });
     event.gcSchemaInfo();
     event.decreaseReferenceCount(PipeDataRegionAssigner.class.getName());
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/EnrichedEvent.java 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/EnrichedEvent.java
index fa9e765ca25..7ffa1caf4ef 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/EnrichedEvent.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/EnrichedEvent.java
@@ -19,11 +19,34 @@
 
 package org.apache.iotdb.db.pipe.core.event;
 
+import org.apache.iotdb.commons.consensus.index.ConsensusIndex;
+import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
+import org.apache.iotdb.pipe.api.event.Event;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
 /**
  * EnrichedEvent is an event that can be enriched with additional runtime 
information. The
  * additional information mainly includes the reference count of the event.
  */
-public interface EnrichedEvent {
+public abstract class EnrichedEvent implements Event {
+  private final AtomicInteger referenceCount = new AtomicInteger(0);
+  private PipeTaskMeta pipeTaskMeta;
+
+  public EnrichedEvent() {}
+
+  public boolean increaseReferenceCount(String holderMessage) {
+    AtomicBoolean success = new AtomicBoolean(true);
+    referenceCount.getAndUpdate(
+        count -> {
+          if (count == 0) {
+            success.set(increaseResourceReferenceCount(holderMessage));
+          }
+          return count + 1;
+        });
+    return success.get();
+  }
 
   /**
    * Increase the reference count of this event.
@@ -32,7 +55,20 @@ public interface EnrichedEvent {
    * @return true if the reference count is increased successfully, false if 
the event is not
    *     controlled by the invoker, which means the data stored in the event 
is not safe to use
    */
-  boolean increaseReferenceCount(String holderMessage);
+  public abstract boolean increaseResourceReferenceCount(String holderMessage);
+
+  public boolean decreaseReferenceCount(String holderMessage) {
+    AtomicBoolean success = new AtomicBoolean(true);
+    referenceCount.getAndUpdate(
+        count -> {
+          if (count == 1) {
+            success.set(decreaseResourceReferenceCount(holderMessage));
+            reportProgress();
+          }
+          return count - 1;
+        });
+    return success.get();
+  }
 
   /**
    * Decrease the reference count of this event. If the reference count is 
decreased to 0, the event
@@ -41,14 +77,28 @@ public interface EnrichedEvent {
    * @param holderMessage the message of the invoker
    * @return true if the reference count is decreased successfully, false 
otherwise
    */
-  boolean decreaseReferenceCount(String holderMessage);
+  public abstract boolean decreaseResourceReferenceCount(String holderMessage);
+
+  private void reportProgress() {
+    if (pipeTaskMeta != null) {
+      pipeTaskMeta.updateProgressIndex(getConsensusIndex());
+    }
+  }
 
   /**
    * Get the reference count of this event.
    *
    * @return the reference count
    */
-  int getReferenceCount();
+  public int getReferenceCount() {
+    return referenceCount.get();
+  }
+
+  public abstract ConsensusIndex getConsensusIndex();
+
+  public void reportProgressIndexToPipeTaskMetaWhenFinish(PipeTaskMeta 
pipeTaskMeta) {
+    this.pipeTaskMeta = pipeTaskMeta;
+  }
 
-  // TODO: ConsensusIndex getConsensusIndex();
+  public abstract EnrichedEvent shallowCopySelf();
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTabletInsertionEvent.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTabletInsertionEvent.java
index 62c599b66c1..b6018b2cc6f 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTabletInsertionEvent.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTabletInsertionEvent.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.pipe.core.event.impl;
 
+import org.apache.iotdb.commons.consensus.index.ConsensusIndex;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode;
 import org.apache.iotdb.db.pipe.core.event.EnrichedEvent;
 import org.apache.iotdb.pipe.api.access.Row;
@@ -30,7 +31,7 @@ import java.util.Iterator;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.BiConsumer;
 
-public class PipeTabletInsertionEvent implements TabletInsertionEvent, 
EnrichedEvent {
+public class PipeTabletInsertionEvent extends EnrichedEvent implements 
TabletInsertionEvent {
 
   private final InsertNode insertNode;
 
@@ -61,23 +62,27 @@ public class PipeTabletInsertionEvent implements 
TabletInsertionEvent, EnrichedE
   }
 
   @Override
-  public boolean increaseReferenceCount(String holderMessage) {
+  public boolean increaseResourceReferenceCount(String holderMessage) {
     // TODO: use WALPipeHandler pinMemtable
     referenceCount.incrementAndGet();
     return true;
   }
 
   @Override
-  public boolean decreaseReferenceCount(String holderMessage) {
+  public boolean decreaseResourceReferenceCount(String holderMessage) {
     // TODO: use WALPipeHandler unpinMemetable
     referenceCount.decrementAndGet();
     return true;
   }
 
   @Override
-  public int getReferenceCount() {
-    // TODO: use WALPipeHandler unpinMemetable
-    return referenceCount.get();
+  public ConsensusIndex getConsensusIndex() {
+    return insertNode.getConsensusIndex();
+  }
+
+  @Override
+  public PipeTabletInsertionEvent shallowCopySelf() {
+    return new PipeTabletInsertionEvent(this.insertNode);
   }
 
   @Override
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTsFileInsertionEvent.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTsFileInsertionEvent.java
index 65f78cebac2..93e4232d0b4 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTsFileInsertionEvent.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/impl/PipeTsFileInsertionEvent.java
@@ -19,6 +19,8 @@
 
 package org.apache.iotdb.db.pipe.core.event.impl;
 
+import org.apache.iotdb.commons.consensus.index.ConsensusIndex;
+import org.apache.iotdb.commons.consensus.index.impl.MinimumConsensusIndex;
 import org.apache.iotdb.db.engine.storagegroup.TsFileProcessor;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.pipe.core.event.EnrichedEvent;
@@ -32,14 +34,16 @@ import org.slf4j.LoggerFactory;
 import java.io.File;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-public class PipeTsFileInsertionEvent implements TsFileInsertionEvent, 
EnrichedEvent {
+public class PipeTsFileInsertionEvent extends EnrichedEvent implements 
TsFileInsertionEvent {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeTsFileInsertionEvent.class);
 
+  private final TsFileResource resource;
   private File tsFile;
   private final AtomicBoolean isClosed;
 
   public PipeTsFileInsertionEvent(TsFileResource resource) {
+    this.resource = resource;
     tsFile = resource.getTsFile();
 
     isClosed = new AtomicBoolean(resource.isClosed());
@@ -83,7 +87,7 @@ public class PipeTsFileInsertionEvent implements 
TsFileInsertionEvent, EnrichedE
   }
 
   @Override
-  public boolean increaseReferenceCount(String holderMessage) {
+  public boolean increaseResourceReferenceCount(String holderMessage) {
     try {
       // TODO: increase reference count for mods & resource files
       tsFile = PipeResourceManager.file().increaseFileReference(tsFile, true);
@@ -99,7 +103,7 @@ public class PipeTsFileInsertionEvent implements 
TsFileInsertionEvent, EnrichedE
   }
 
   @Override
-  public boolean decreaseReferenceCount(String holderMessage) {
+  public boolean decreaseResourceReferenceCount(String holderMessage) {
     try {
       PipeResourceManager.file().decreaseFileReference(tsFile);
       return true;
@@ -114,8 +118,21 @@ public class PipeTsFileInsertionEvent implements 
TsFileInsertionEvent, EnrichedE
   }
 
   @Override
-  public int getReferenceCount() {
-    return PipeResourceManager.file().getFileReferenceCount(tsFile);
+  public ConsensusIndex getConsensusIndex() {
+    try {
+      waitForTsFileClose();
+      return resource.getMaxConsensusIndexAfterClose();
+    } catch (InterruptedException e) {
+      LOGGER.warn(
+          String.format(
+              "Interrupted when waiting for closing TsFile %s.", 
resource.getTsFilePath()));
+      return new MinimumConsensusIndex();
+    }
+  }
+
+  @Override
+  public PipeTsFileInsertionEvent shallowCopySelf() {
+    return new PipeTsFileInsertionEvent(this.resource);
   }
 
   @Override
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/PipeRealtimeCollectEvent.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/PipeRealtimeCollectEvent.java
index 0c63131e6e9..09a1ff0bdd1 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/PipeRealtimeCollectEvent.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/event/realtime/PipeRealtimeCollectEvent.java
@@ -19,20 +19,21 @@
 
 package org.apache.iotdb.db.pipe.core.event.realtime;
 
+import org.apache.iotdb.commons.consensus.index.ConsensusIndex;
 import org.apache.iotdb.db.pipe.core.event.EnrichedEvent;
 import org.apache.iotdb.pipe.api.event.Event;
 
 import java.util.Map;
 
-public class PipeRealtimeCollectEvent implements Event, EnrichedEvent {
+public class PipeRealtimeCollectEvent extends EnrichedEvent {
 
-  private final Event event;
+  private final EnrichedEvent event;
   private final TsFileEpoch tsFileEpoch;
 
   private Map<String, String[]> device2Measurements;
 
   public PipeRealtimeCollectEvent(
-      Event event, TsFileEpoch tsFileEpoch, Map<String, String[]> 
device2Measurements) {
+      EnrichedEvent event, TsFileEpoch tsFileEpoch, Map<String, String[]> 
device2Measurements) {
     this.event = event;
     this.tsFileEpoch = tsFileEpoch;
     this.device2Measurements = device2Measurements;
@@ -55,20 +56,24 @@ public class PipeRealtimeCollectEvent implements Event, 
EnrichedEvent {
   }
 
   @Override
-  public boolean increaseReferenceCount(String holderMessage) {
-    return !(event instanceof EnrichedEvent)
-        || ((EnrichedEvent) event).increaseReferenceCount(holderMessage);
+  public boolean increaseResourceReferenceCount(String holderMessage) {
+    return event.increaseResourceReferenceCount(holderMessage);
   }
 
   @Override
-  public boolean decreaseReferenceCount(String holderMessage) {
-    return !(event instanceof EnrichedEvent)
-        || ((EnrichedEvent) event).decreaseReferenceCount(holderMessage);
+  public boolean decreaseResourceReferenceCount(String holderMessage) {
+    return event.decreaseResourceReferenceCount(holderMessage);
   }
 
   @Override
-  public int getReferenceCount() {
-    return event instanceof EnrichedEvent ? ((EnrichedEvent) 
event).getReferenceCount() : 0;
+  public ConsensusIndex getConsensusIndex() {
+    return event.getConsensusIndex();
+  }
+
+  @Override
+  public PipeRealtimeCollectEvent shallowCopySelf() {
+    return new PipeRealtimeCollectEvent(
+        event.shallowCopySelf(), this.tsFileEpoch, this.device2Measurements);
   }
 
   @Override
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeBuilder.java 
b/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeBuilder.java
index 447ae5bdf09..d9dbe4dc1f7 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeBuilder.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeBuilder.java
@@ -24,12 +24,15 @@ import org.apache.iotdb.commons.pipe.task.meta.PipeMeta;
 import org.apache.iotdb.commons.pipe.task.meta.PipeRuntimeMeta;
 import org.apache.iotdb.commons.pipe.task.meta.PipeStaticMeta;
 import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.pipe.api.customizer.PipeParameters;
 
 import java.util.HashMap;
 import java.util.Map;
 
 public class PipeBuilder {
+  private static final IoTDBConfig CONFIG = 
IoTDBDescriptor.getInstance().getConfig();
 
   private final PipeMeta pipeMeta;
 
@@ -49,16 +52,18 @@ public class PipeBuilder {
     final PipeRuntimeMeta pipeRuntimeMeta = pipeMeta.getRuntimeMeta();
     for (Map.Entry<TConsensusGroupId, PipeTaskMeta> 
consensusGroupIdToPipeTaskMeta :
         pipeRuntimeMeta.getConsensusGroupIdToTaskMetaMap().entrySet()) {
-      consensusGroupIdToPipeTaskMap.put(
-          consensusGroupIdToPipeTaskMeta.getKey(),
-          new PipeTaskBuilder(
-                  pipeName,
-                  
Integer.toString(consensusGroupIdToPipeTaskMeta.getKey().getId()),
-                  // TODO: 
consensusGroupIdToPipeTaskMeta.getValue().getProgressIndex() is not used
-                  collectorParameters,
-                  processorParameters,
-                  connectorParameters)
-              .build());
+      if (consensusGroupIdToPipeTaskMeta.getValue().getDataNodeId() == 
CONFIG.getDataNodeId()) {
+        consensusGroupIdToPipeTaskMap.put(
+            consensusGroupIdToPipeTaskMeta.getKey(),
+            new PipeTaskBuilder(
+                    pipeName,
+                    consensusGroupIdToPipeTaskMeta.getKey(),
+                    consensusGroupIdToPipeTaskMeta.getValue(),
+                    collectorParameters,
+                    processorParameters,
+                    connectorParameters)
+                .build());
+      }
     }
 
     return consensusGroupIdToPipeTaskMap;
diff --git a/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTask.java 
b/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTask.java
index 26475c5c4b4..887b31a6e3a 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTask.java
@@ -19,12 +19,13 @@
 
 package org.apache.iotdb.db.pipe.task;
 
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
 import org.apache.iotdb.db.pipe.task.stage.PipeTaskStage;
 
 public class PipeTask {
 
   private final String pipeName;
-  private final String dataRegionId;
+  private final TConsensusGroupId dataRegionId;
 
   private final PipeTaskStage collectorStage;
   private final PipeTaskStage processorStage;
@@ -32,7 +33,7 @@ public class PipeTask {
 
   PipeTask(
       String pipeName,
-      String dataRegionId,
+      TConsensusGroupId dataRegionId,
       PipeTaskStage collectorStage,
       PipeTaskStage processorStage,
       PipeTaskStage connectorStage) {
@@ -68,7 +69,7 @@ public class PipeTask {
     connectorStage.stop();
   }
 
-  public String getDataRegionId() {
+  public TConsensusGroupId getDataRegionId() {
     return dataRegionId;
   }
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTaskBuilder.java 
b/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTaskBuilder.java
index 51e429b0455..d42d62e5f98 100644
--- a/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTaskBuilder.java
+++ b/server/src/main/java/org/apache/iotdb/db/pipe/task/PipeTaskBuilder.java
@@ -19,7 +19,9 @@
 
 package org.apache.iotdb.db.pipe.task;
 
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
 import org.apache.iotdb.commons.pipe.task.meta.PipeStaticMeta;
+import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
 import org.apache.iotdb.db.pipe.task.stage.PipeTaskCollectorStage;
 import org.apache.iotdb.db.pipe.task.stage.PipeTaskConnectorStage;
 import org.apache.iotdb.db.pipe.task.stage.PipeTaskProcessorStage;
@@ -28,28 +30,33 @@ import org.apache.iotdb.pipe.api.customizer.PipeParameters;
 public class PipeTaskBuilder {
 
   private final String pipeName;
-  private final String dataRegionId;
+  private final TConsensusGroupId dataRegionId;
+  private final PipeTaskMeta pipeTaskMeta;
   private final PipeParameters pipeCollectorParameters;
   private final PipeParameters pipeProcessorParameters;
   private final PipeParameters pipeConnectorParameters;
 
   PipeTaskBuilder(
       String pipeName,
-      String dataRegionId,
+      TConsensusGroupId dataRegionId,
+      PipeTaskMeta pipeTaskMeta,
       PipeParameters pipeCollectorParameters,
       PipeParameters pipeProcessorParameters,
       PipeParameters pipeConnectorParameters) {
     this.pipeName = pipeName;
     this.dataRegionId = dataRegionId;
+    this.pipeTaskMeta = pipeTaskMeta;
     this.pipeCollectorParameters = pipeCollectorParameters;
     this.pipeProcessorParameters = pipeProcessorParameters;
     this.pipeConnectorParameters = pipeConnectorParameters;
   }
 
-  public PipeTaskBuilder(String dataRegionId, PipeStaticMeta pipeStaticMeta) {
+  public PipeTaskBuilder(
+      TConsensusGroupId dataRegionId, PipeTaskMeta pipeTaskMeta, 
PipeStaticMeta pipeStaticMeta) {
     this(
         pipeStaticMeta.getPipeName(),
         dataRegionId,
+        pipeTaskMeta,
         pipeStaticMeta.getCollectorParameters(),
         pipeStaticMeta.getProcessorParameters(),
         pipeStaticMeta.getConnectorParameters());
@@ -60,7 +67,7 @@ public class PipeTaskBuilder {
 
     // we first build the collector and connector, then build the processor.
     final PipeTaskCollectorStage collectorStage =
-        new PipeTaskCollectorStage(dataRegionId, pipeCollectorParameters);
+        new PipeTaskCollectorStage(dataRegionId, pipeTaskMeta, 
pipeCollectorParameters);
     final PipeTaskConnectorStage connectorStage =
         new PipeTaskConnectorStage(pipeConnectorParameters);
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java
index a4866e87e8e..3dd445c7489 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskCollectorStage.java
@@ -19,7 +19,9 @@
 
 package org.apache.iotdb.db.pipe.task.stage;
 
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
 import org.apache.iotdb.commons.pipe.plugin.builtin.BuiltinPipePlugin;
+import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
 import org.apache.iotdb.db.pipe.agent.PipeAgent;
 import org.apache.iotdb.db.pipe.config.PipeCollectorConstant;
 import org.apache.iotdb.db.pipe.core.collector.IoTDBDataRegionCollector;
@@ -53,7 +55,10 @@ public class PipeTaskCollectorStage extends PipeTaskStage {
 
   private final PipeCollector pipeCollector;
 
-  public PipeTaskCollectorStage(String dataRegionId, PipeParameters 
collectorParameters) {
+  public PipeTaskCollectorStage(
+      TConsensusGroupId dataRegionId,
+      PipeTaskMeta pipeTaskMeta,
+      PipeParameters collectorParameters) {
     // TODO: avoid if-else, use reflection to create collector all the time
     if (collectorParameters
         .getStringOrDefault(
@@ -69,10 +74,10 @@ public class PipeTaskCollectorStage extends PipeTaskStage {
       // collector
       this.collectorParameters
           .getAttribute()
-          .put(PipeCollectorConstant.DATA_REGION_KEY, dataRegionId);
+          .put(PipeCollectorConstant.DATA_REGION_KEY, 
String.valueOf(dataRegionId.getId()));
 
       collectorPendingQueue = new ListenableUnblockingPendingQueue<>();
-      this.pipeCollector = new IoTDBDataRegionCollector(collectorPendingQueue);
+      this.pipeCollector = new IoTDBDataRegionCollector(pipeTaskMeta, 
collectorPendingQueue);
     } else {
       this.collectorParameters = collectorParameters;
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
index 6bbec4480c0..5542b943d59 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.pipe.task.stage;
 
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
 import org.apache.iotdb.commons.pipe.task.meta.PipeStatus;
 import org.apache.iotdb.db.pipe.agent.PipeAgent;
 import org.apache.iotdb.db.pipe.core.event.view.collector.PipeEventCollector;
@@ -60,7 +61,7 @@ public class PipeTaskProcessorStage extends PipeTaskStage {
    */
   public PipeTaskProcessorStage(
       String pipeName,
-      String dataRegionId,
+      TConsensusGroupId dataRegionId,
       EventSupplier pipeCollectorInputEventSupplier,
       @Nullable ListenablePendingQueue<Event> pipeCollectorInputPendingQueue,
       PipeParameters pipeProcessorParameters,
diff --git 
a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileResourceConsensusIndexTest.java
 
b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileResourceConsensusIndexTest.java
new file mode 100644
index 00000000000..f1a676b9c24
--- /dev/null
+++ 
b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileResourceConsensusIndexTest.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.engine.storagegroup;
+
+import org.apache.iotdb.commons.consensus.index.ConsensusIndex;
+import org.apache.iotdb.db.constant.TestConstant;
+import org.apache.iotdb.db.engine.storagegroup.timeindex.DeviceTimeIndex;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import org.apache.commons.io.FileUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.IntStream;
+
+public class TsFileResourceConsensusIndexTest {
+  private final File file =
+      new File(
+          
TsFileNameGenerator.generateNewTsFilePath(TestConstant.BASE_OUTPUT_PATH, 1, 1, 
1, 1));
+  private final TsFileResource tsFileResource = new TsFileResource(file);
+  private final Map<String, Integer> deviceToIndex = new HashMap<>();
+  private final long[] startTimes = new long[DEVICE_NUM];
+  private final long[] endTimes = new long[DEVICE_NUM];
+  private static final int DEVICE_NUM = 100;
+
+  private final List<ConsensusIndex> indexList = new ArrayList<>();
+  private static final int INDEX_NUM = 1000;
+
+  @Before
+  public void setUp() {
+    IntStream.range(0, DEVICE_NUM).forEach(i -> deviceToIndex.put("root.sg.d" 
+ i, i));
+    DeviceTimeIndex deviceTimeIndex = new DeviceTimeIndex(deviceToIndex, 
startTimes, endTimes);
+    IntStream.range(0, DEVICE_NUM)
+        .forEach(
+            i -> {
+              deviceTimeIndex.updateStartTime("root.sg.d" + i, i);
+              deviceTimeIndex.updateEndTime("root.sg.d" + i, i + 1);
+            });
+    tsFileResource.setTimeIndex(deviceTimeIndex);
+    tsFileResource.setStatus(TsFileResourceStatus.NORMAL);
+
+    IntStream.range(0, INDEX_NUM).forEach(i -> indexList.add(new 
MockConsensusIndex(i)));
+  }
+
+  @After
+  public void tearDown() throws IOException {
+    // clean fake file
+    if (file.exists()) {
+      FileUtils.delete(file);
+    }
+    File resourceFile = new File(file.getName() + 
TsFileResource.RESOURCE_SUFFIX);
+    if (resourceFile.exists()) {
+      FileUtils.delete(resourceFile);
+    }
+  }
+
+  @Test
+  public void testConsensusIndexRecorder() {
+    Assert.assertTrue(
+        new 
MockConsensusIndex(0).isAfter(tsFileResource.getMaxConsensusIndexAfterClose()));
+
+    indexList.forEach(tsFileResource::updateConsensusIndex);
+
+    Assert.assertFalse(
+        new 
MockConsensusIndex(-1).isAfter(tsFileResource.getMaxConsensusIndexAfterClose()));
+    Assert.assertFalse(
+        new 
MockConsensusIndex(0).isAfter(tsFileResource.getMaxConsensusIndexAfterClose()));
+    Assert.assertFalse(
+        new 
MockConsensusIndex(1).isAfter(tsFileResource.getMaxConsensusIndexAfterClose()));
+    Assert.assertFalse(
+        new MockConsensusIndex(INDEX_NUM - 1)
+            .isAfter(tsFileResource.getMaxConsensusIndexAfterClose()));
+
+    Assert.assertTrue(
+        new 
MockConsensusIndex(INDEX_NUM).isAfter(tsFileResource.getMaxConsensusIndexAfterClose()));
+    Assert.assertTrue(
+        new MockConsensusIndex(Integer.MAX_VALUE)
+            .isAfter(tsFileResource.getMaxConsensusIndexAfterClose()));
+
+    Assert.assertFalse(
+        new MockConsensusIndex(1, INDEX_NUM - 1)
+            .isAfter(tsFileResource.getMaxConsensusIndexAfterClose()));
+  }
+
+  @Test
+  public void testConsensusIndexRecorderSerialize() {
+    // TODO: wait for implements of ConsensusIndex.deserializeFrom
+  }
+
+  public static class MockConsensusIndex implements ConsensusIndex {
+    private final int type;
+    private int val;
+
+    public MockConsensusIndex(int val) {
+      this(0, val);
+    }
+
+    public MockConsensusIndex(int type, int val) {
+      this.type = type;
+      this.val = val;
+    }
+
+    @Override
+    public void serialize(ByteBuffer byteBuffer) {
+      ReadWriteIOUtils.write(val, byteBuffer);
+    }
+
+    @Override
+    public void serialize(OutputStream stream) throws IOException {
+      ReadWriteIOUtils.write(val, stream);
+    }
+
+    @Override
+    public boolean isAfter(ConsensusIndex consensusIndex) {
+      if (!(consensusIndex instanceof MockConsensusIndex)) {
+        return true;
+      }
+
+      MockConsensusIndex that = (MockConsensusIndex) consensusIndex;
+      return this.type == that.type && this.val > that.val;
+    }
+
+    @Override
+    public boolean equals(ConsensusIndex consensusIndex) {
+      if (!(consensusIndex instanceof MockConsensusIndex)) {
+        return false;
+      }
+
+      MockConsensusIndex that = (MockConsensusIndex) consensusIndex;
+      return this.type == that.type && this.val == that.val;
+    }
+
+    @Override
+    public ConsensusIndex updateToMaximum(ConsensusIndex consensusIndex) {
+      if (!(consensusIndex instanceof MockConsensusIndex)) {
+        throw new IllegalStateException("Mock update error.");
+      }
+
+      MockConsensusIndex that = (MockConsensusIndex) consensusIndex;
+      if (that.type == this.type) {
+        this.val = Math.max(this.val, that.val);
+      }
+      return this;
+    }
+  }
+}
diff --git 
a/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/CachedSchemaPatternMatcherTest.java
 
b/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/CachedSchemaPatternMatcherTest.java
index d833e2761a0..198761eee3b 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/CachedSchemaPatternMatcherTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/CachedSchemaPatternMatcherTest.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.pipe.core.collector;
 
+import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
 import org.apache.iotdb.db.pipe.config.PipeCollectorConstant;
 import 
org.apache.iotdb.db.pipe.core.collector.realtime.PipeRealtimeDataRegionCollector;
 import 
org.apache.iotdb.db.pipe.core.collector.realtime.matcher.CachedSchemaPatternMatcher;
@@ -64,7 +65,8 @@ public class CachedSchemaPatternMatcherTest {
 
   @Test
   public void testCachedMatcher() throws ExecutionException, 
InterruptedException {
-    PipeRealtimeDataRegionCollector databaseCollector = new 
PipeRealtimeDataRegionFakeCollector();
+    PipeRealtimeDataRegionCollector databaseCollector =
+        new PipeRealtimeDataRegionFakeCollector(null);
     databaseCollector.customize(
         new PipeParameters(
             new HashMap<String, String>() {
@@ -79,7 +81,8 @@ public class CachedSchemaPatternMatcherTest {
     int deviceCollectorNum = 10;
     int seriesCollectorNum = 10;
     for (int i = 0; i < deviceCollectorNum; i++) {
-      PipeRealtimeDataRegionCollector deviceCollector = new 
PipeRealtimeDataRegionFakeCollector();
+      PipeRealtimeDataRegionCollector deviceCollector =
+          new PipeRealtimeDataRegionFakeCollector(null);
       int finalI1 = i;
       deviceCollector.customize(
           new PipeParameters(
@@ -92,7 +95,8 @@ public class CachedSchemaPatternMatcherTest {
           null);
       collectorList.add(deviceCollector);
       for (int j = 0; j < seriesCollectorNum; j++) {
-        PipeRealtimeDataRegionCollector seriesCollector = new 
PipeRealtimeDataRegionFakeCollector();
+        PipeRealtimeDataRegionCollector seriesCollector =
+            new PipeRealtimeDataRegionFakeCollector(null);
         int finalI = i;
         int finalJ = j;
         seriesCollector.customize(
@@ -149,6 +153,10 @@ public class CachedSchemaPatternMatcherTest {
 
   public static class PipeRealtimeDataRegionFakeCollector extends 
PipeRealtimeDataRegionCollector {
 
+    public PipeRealtimeDataRegionFakeCollector(PipeTaskMeta pipeTaskMeta) {
+      super(pipeTaskMeta);
+    }
+
     @Override
     public Event supply() {
       return null;
diff --git 
a/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/PipeRealtimeCollectTest.java
 
b/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/PipeRealtimeCollectTest.java
index 7cd705af588..09ee2a13a8b 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/PipeRealtimeCollectTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/pipe/core/collector/PipeRealtimeCollectTest.java
@@ -99,13 +99,17 @@ public class PipeRealtimeCollectTest {
     // set up realtime collector
 
     try (PipeRealtimeDataRegionHybridCollector collector1 =
-            new PipeRealtimeDataRegionHybridCollector(new 
ListenableUnblockingPendingQueue<>());
+            new PipeRealtimeDataRegionHybridCollector(
+                null, new ListenableUnblockingPendingQueue<>());
         PipeRealtimeDataRegionHybridCollector collector2 =
-            new PipeRealtimeDataRegionHybridCollector(new 
ListenableUnblockingPendingQueue<>());
+            new PipeRealtimeDataRegionHybridCollector(
+                null, new ListenableUnblockingPendingQueue<>());
         PipeRealtimeDataRegionHybridCollector collector3 =
-            new PipeRealtimeDataRegionHybridCollector(new 
ListenableUnblockingPendingQueue<>());
+            new PipeRealtimeDataRegionHybridCollector(
+                null, new ListenableUnblockingPendingQueue<>());
         PipeRealtimeDataRegionHybridCollector collector4 =
-            new PipeRealtimeDataRegionHybridCollector(new 
ListenableUnblockingPendingQueue<>())) {
+            new PipeRealtimeDataRegionHybridCollector(
+                null, new ListenableUnblockingPendingQueue<>())) {
 
       collector1.customize(
           new PipeParameters(


Reply via email to