This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new a6897264a66 IoTConsensusV2: Support Table Model Replicate (#14169)
a6897264a66 is described below

commit a6897264a666a4240ac65e63f1a53e042aea66af
Author: Peng Junzhi <[email protected]>
AuthorDate: Fri Nov 22 22:09:30 2024 +0800

    IoTConsensusV2: Support Table Model Replicate (#14169)
---
 .../pipe/consensuspipe/ConsensusPipeManager.java   |  6 ++
 .../request/PipeConsensusDeleteNodeReq.java        |  7 +-
 .../pipe/consensus/deletion/DeletionResource.java  | 15 ++--
 .../deletion/DeletionResourceManager.java          |  8 +--
 .../common/deletion/PipeDeleteDataNodeEvent.java   |  9 +--
 .../event/realtime/PipeRealtimeEventFactory.java   |  4 +-
 .../listener/PipeInsertionDataNodeListener.java    | 17 +----
 .../pipeconsensus/PipeConsensusReceiver.java       |  4 +-
 .../plan/node/pipe/PipeEnrichedDeleteDataNode.java |  2 +
 .../plan/node/write/AbstractDeleteDataNode.java    | 81 ++++++++++++++++++++++
 .../planner/plan/node/write/DeleteDataNode.java    | 36 +---------
 .../planner/plan/node/write/DeleteNodeType.java    | 68 ++++++++++++++++++
 .../plan/node/write/RelationalDeleteDataNode.java  | 40 ++---------
 .../db/pipe/consensus/DeletionRecoverTest.java     | 48 +++++++++----
 .../db/pipe/consensus/DeletionResourceTest.java    | 70 ++++++++++++++++---
 .../node/write/RelationalDeleteDataNodeTest.java   |  2 +-
 16 files changed, 292 insertions(+), 125 deletions(-)

diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/consensuspipe/ConsensusPipeManager.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/consensuspipe/ConsensusPipeManager.java
index f4bb5ca8a35..aed0155e9c3 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/consensuspipe/ConsensusPipeManager.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/consensuspipe/ConsensusPipeManager.java
@@ -37,6 +37,8 @@ import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstan
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_PORT_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_REALTIME_FIRST_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_CAPTURE_TABLE_KEY;
+import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_CAPTURE_TREE_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_CONSENSUS_GROUP_ID_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_CONSENSUS_RECEIVER_DATANODE_ID_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_CONSENSUS_RESTORE_PROGRESS_PIPE_TASK_NAME_KEY;
@@ -103,6 +105,8 @@ public class ConsensusPipeManager {
                   EXTRACTOR_CONSENSUS_RECEIVER_DATANODE_ID_KEY,
                   String.valueOf(consensusPipeName.getReceiverDataNodeId()))
               .put(EXTRACTOR_REALTIME_MODE_KEY, replicateMode.getValue())
+              .put(EXTRACTOR_CAPTURE_TABLE_KEY, String.valueOf(true))
+              .put(EXTRACTOR_CAPTURE_TREE_KEY, String.valueOf(true))
               .build();
     } else {
       extractorParams =
@@ -119,6 +123,8 @@ public class ConsensusPipeManager {
                   EXTRACTOR_CONSENSUS_RECEIVER_DATANODE_ID_KEY,
                   String.valueOf(consensusPipeName.getReceiverDataNodeId()))
               .put(EXTRACTOR_REALTIME_MODE_KEY, replicateMode.getValue())
+              .put(EXTRACTOR_CAPTURE_TABLE_KEY, String.valueOf(true))
+              .put(EXTRACTOR_CAPTURE_TREE_KEY, String.valueOf(true))
               .put(
                   EXTRACTOR_CONSENSUS_RESTORE_PROGRESS_PIPE_TASK_NAME_KEY,
                   String.valueOf(new ConsensusPipeName(senderPeer, 
regionMigrationCoordinatorPeer)))
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/payload/request/PipeConsensusDeleteNodeReq.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/payload/request/PipeConsensusDeleteNodeReq.java
index dab135f9282..c8bc33bb52a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/payload/request/PipeConsensusDeleteNodeReq.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/payload/request/PipeConsensusDeleteNodeReq.java
@@ -26,6 +26,7 @@ import 
org.apache.iotdb.commons.pipe.connector.payload.pipeconsensus.request.Pip
 import org.apache.iotdb.consensus.pipe.thrift.TCommitId;
 import org.apache.iotdb.consensus.pipe.thrift.TPipeConsensusTransferReq;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.AbstractDeleteDataNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNode;
 
 import org.apache.tsfile.utils.PublicBAOS;
@@ -39,20 +40,20 @@ import java.util.Objects;
 
 public class PipeConsensusDeleteNodeReq extends TPipeConsensusTransferReq {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeConsensusDeleteNodeReq.class);
-  private transient DeleteDataNode deleteDataNode;
+  private transient AbstractDeleteDataNode deleteDataNode;
 
   private PipeConsensusDeleteNodeReq() {
     // Do nothing
   }
 
-  public DeleteDataNode getDeleteDataNode() {
+  public AbstractDeleteDataNode getDeleteDataNode() {
     return deleteDataNode;
   }
 
   /////////////////////////////// Thrift ///////////////////////////////
 
   public static PipeConsensusDeleteNodeReq toTPipeConsensusTransferReq(
-      DeleteDataNode deleteDataNode,
+      AbstractDeleteDataNode deleteDataNode,
       TCommitId commitId,
       TConsensusGroupId consensusGroupId,
       ProgressIndex progressIndex,
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/DeletionResource.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/DeletionResource.java
index 2230f630322..afb34bfff6b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/DeletionResource.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/DeletionResource.java
@@ -27,7 +27,8 @@ import 
org.apache.iotdb.commons.pipe.datastructure.PersistentResource;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.consensus.DataRegionConsensusImpl;
 import org.apache.iotdb.db.pipe.event.common.deletion.PipeDeleteDataNodeEvent;
-import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.AbstractDeleteDataNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteNodeType;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -47,7 +48,7 @@ public class DeletionResource implements PersistentResource {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(DeletionResource.class);
   private final Consumer<DeletionResource> removeHook;
   private final AtomicInteger pipeTaskReferenceCount;
-  private final DeleteDataNode deleteDataNode;
+  private final AbstractDeleteDataNode deleteDataNode;
   private final ConsensusGroupId consensusGroupId;
   private volatile Status currentStatus;
 
@@ -56,7 +57,9 @@ public class DeletionResource implements PersistentResource {
   private volatile Exception cause;
 
   public DeletionResource(
-      DeleteDataNode deleteDataNode, Consumer<DeletionResource> removeHook, 
String regionId) {
+      AbstractDeleteDataNode deleteDataNode,
+      Consumer<DeletionResource> removeHook,
+      String regionId) {
     this.deleteDataNode = deleteDataNode;
     this.removeHook = removeHook;
     this.currentStatus = Status.RUNNING;
@@ -135,7 +138,7 @@ public class DeletionResource implements PersistentResource 
{
     return 0;
   }
 
-  public DeleteDataNode getDeleteDataNode() {
+  public AbstractDeleteDataNode getDeleteDataNode() {
     return deleteDataNode;
   }
 
@@ -149,11 +152,11 @@ public class DeletionResource implements 
PersistentResource {
   public static DeletionResource deserialize(
       final ByteBuffer buffer, final String regionId, final 
Consumer<DeletionResource> removeHook)
       throws IOException {
-    DeleteDataNode node = DeleteDataNode.deserializeFromDAL(buffer);
+    AbstractDeleteDataNode node = DeleteNodeType.deserializeFromDAL(buffer);
     return new DeletionResource(node, removeHook, regionId);
   }
 
-  public static boolean isDeleteNodeGeneratedInLocalByIoTV2(DeleteDataNode 
node) {
+  public static boolean 
isDeleteNodeGeneratedInLocalByIoTV2(AbstractDeleteDataNode node) {
     if (node.getProgressIndex() instanceof RecoverProgressIndex) {
       RecoverProgressIndex recoverProgressIndex = (RecoverProgressIndex) 
node.getProgressIndex();
       return recoverProgressIndex
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/DeletionResourceManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/DeletionResourceManager.java
index 8459b3d3f1e..0f6ff3c6315 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/DeletionResourceManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/DeletionResourceManager.java
@@ -30,7 +30,7 @@ import 
org.apache.iotdb.db.pipe.consensus.ProgressIndexDataNodeManager;
 import org.apache.iotdb.db.pipe.consensus.deletion.persist.DeletionBuffer;
 import 
org.apache.iotdb.db.pipe.consensus.deletion.persist.PageCacheDeletionBuffer;
 import org.apache.iotdb.db.pipe.consensus.deletion.recover.DeletionReader;
-import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.AbstractDeleteDataNode;
 
 import com.google.common.collect.ImmutableList;
 import org.slf4j.Logger;
@@ -64,7 +64,7 @@ public class DeletionResourceManager implements AutoCloseable 
{
   private final String dataRegionId;
   private final DeletionBuffer deletionBuffer;
   private final File storageDir;
-  private final Map<DeleteDataNode, DeletionResource> deleteNode2ResourcesMap =
+  private final Map<AbstractDeleteDataNode, DeletionResource> 
deleteNode2ResourcesMap =
       new ConcurrentHashMap<>();
   private final Lock recoverLock = new ReentrantLock();
   private final Condition recoveryReadyCondition = recoverLock.newCondition();
@@ -133,7 +133,7 @@ public class DeletionResourceManager implements 
AutoCloseable {
     LOGGER.info("Deletion resource manager for {} has been successfully 
closed!", dataRegionId);
   }
 
-  public DeletionResource registerDeletionResource(DeleteDataNode 
deleteDataNode) {
+  public DeletionResource registerDeletionResource(AbstractDeleteDataNode 
deleteDataNode) {
     DeletionResource deletionResource =
         deleteNode2ResourcesMap.computeIfAbsent(
             deleteDataNode,
@@ -144,7 +144,7 @@ public class DeletionResourceManager implements 
AutoCloseable {
     return deletionResource;
   }
 
-  public DeletionResource getDeletionResource(DeleteDataNode deleteDataNode) {
+  public DeletionResource getDeletionResource(AbstractDeleteDataNode 
deleteDataNode) {
     return deleteNode2ResourcesMap.get(deleteDataNode);
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/deletion/PipeDeleteDataNodeEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/deletion/PipeDeleteDataNodeEvent.java
index 00a8259a900..27bcb3f9143 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/deletion/PipeDeleteDataNodeEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/deletion/PipeDeleteDataNodeEvent.java
@@ -28,6 +28,7 @@ import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import org.apache.iotdb.commons.pipe.event.SerializableEvent;
 import org.apache.iotdb.db.pipe.consensus.deletion.DeletionResource;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.AbstractDeleteDataNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNode;
 
 import org.apache.tsfile.utils.ReadWriteIOUtils;
@@ -36,7 +37,7 @@ import java.nio.ByteBuffer;
 import java.util.Optional;
 
 public class PipeDeleteDataNodeEvent extends EnrichedEvent implements 
SerializableEvent {
-  private DeleteDataNode deleteDataNode;
+  private AbstractDeleteDataNode deleteDataNode;
   private DeletionResource deletionResource;
   private boolean isGeneratedByPipe;
   private ProgressIndex progressIndex;
@@ -47,12 +48,12 @@ public class PipeDeleteDataNodeEvent extends EnrichedEvent 
implements Serializab
   }
 
   public PipeDeleteDataNodeEvent(
-      final DeleteDataNode deleteDataNode, final boolean isGeneratedByPipe) {
+      final AbstractDeleteDataNode deleteDataNode, final boolean 
isGeneratedByPipe) {
     this(deleteDataNode, null, 0, null, null, null, isGeneratedByPipe);
   }
 
   public PipeDeleteDataNodeEvent(
-      final DeleteDataNode deleteDataNode,
+      final AbstractDeleteDataNode deleteDataNode,
       final String pipeName,
       final long creationTime,
       final PipeTaskMeta pipeTaskMeta,
@@ -73,7 +74,7 @@ public class PipeDeleteDataNodeEvent extends EnrichedEvent 
implements Serializab
         .ifPresent(node -> this.progressIndex = 
deleteDataNode.getProgressIndex());
   }
 
-  public DeleteDataNode getDeleteDataNode() {
+  public AbstractDeleteDataNode getDeleteDataNode() {
     return deleteDataNode;
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEventFactory.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEventFactory.java
index 1e3716b6304..b7841150dbe 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEventFactory.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEventFactory.java
@@ -25,7 +25,7 @@ import 
org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
 import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
 import 
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.epoch.TsFileEpochManager;
-import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.AbstractDeleteDataNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
 import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALEntryHandler;
@@ -67,7 +67,7 @@ public class PipeRealtimeEventFactory {
         new PipeHeartbeatEvent(dataRegionId, shouldPrintMessage), null, null, 
null, null);
   }
 
-  public static PipeRealtimeEvent createRealtimeEvent(final DeleteDataNode 
node) {
+  public static PipeRealtimeEvent createRealtimeEvent(final 
AbstractDeleteDataNode node) {
     return new PipeRealtimeEvent(
         new PipeDeleteDataNodeEvent(node, node.isGeneratedByPipe()), null, 
null, null, null);
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/listener/PipeInsertionDataNodeListener.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/listener/PipeInsertionDataNodeListener.java
index 1a48a5267ed..b4c511bb44c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/listener/PipeInsertionDataNodeListener.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/listener/PipeInsertionDataNodeListener.java
@@ -26,9 +26,8 @@ import 
org.apache.iotdb.db.pipe.consensus.deletion.DeletionResourceManager;
 import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEventFactory;
 import 
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.PipeRealtimeDataRegionExtractor;
 import 
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.assigner.PipeDataRegionAssigner;
-import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.AbstractDeleteDataNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
-import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalDeleteDataNode;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
 import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALEntryHandler;
 
@@ -142,7 +141,8 @@ public class PipeInsertionDataNodeListener {
   }
 
   // TODO: record database name in enriched events?
-  public DeletionResource listenToDeleteData(final String regionId, final 
DeleteDataNode node) {
+  public DeletionResource listenToDeleteData(
+      final String regionId, final AbstractDeleteDataNode node) {
     final PipeDataRegionAssigner assigner = 
dataRegionId2Assigner.get(regionId);
     // only events from registered data region will be extracted
     if (assigner == null) {
@@ -169,12 +169,6 @@ public class PipeInsertionDataNodeListener {
     return deletionResource;
   }
 
-  public DeletionResource listenToDeleteData(
-      final String regionId, final RelationalDeleteDataNode node) {
-    // TODO: implement
-    return null;
-  }
-
   public void listenToHeartbeat(boolean shouldPrintMessage) {
     dataRegionId2Assigner.forEach(
         (key, value) ->
@@ -182,11 +176,6 @@ public class PipeInsertionDataNodeListener {
                 PipeRealtimeEventFactory.createRealtimeEvent(key, 
shouldPrintMessage)));
   }
 
-  public void listenToDeleteData(DeleteDataNode node) {
-    dataRegionId2Assigner.forEach(
-        (key, value) -> 
value.publishToAssign(PipeRealtimeEventFactory.createRealtimeEvent(node)));
-  }
-
   /////////////////////////////// singleton ///////////////////////////////
 
   private PipeInsertionDataNodeListener() {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
index 1e1ae84f3a2..20164ff2293 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
@@ -50,7 +50,7 @@ import 
org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.payload.request
 import 
org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.payload.request.PipeConsensusTsFileSealWithModReq;
 import org.apache.iotdb.db.pipe.consensus.metric.PipeConsensusReceiverMetrics;
 import 
org.apache.iotdb.db.pipe.event.common.tsfile.aggregator.TsFileInsertionPointCounter;
-import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.AbstractDeleteDataNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
 import org.apache.iotdb.db.storageengine.StorageEngine;
 import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
@@ -314,7 +314,7 @@ public class PipeConsensusReceiver {
     PipeConsensusServerImpl impl =
         Optional.ofNullable(pipeConsensus.getImpl(consensusGroupId))
             .orElseThrow(() -> new 
ConsensusGroupNotExistException(consensusGroupId));
-    final DeleteDataNode planNode = req.getDeleteDataNode();
+    final AbstractDeleteDataNode planNode = req.getDeleteDataNode();
     planNode.markAsGeneratedByRemoteConsensusLeader();
     planNode.setProgressIndex(
         
ProgressIndexType.deserializeFrom(ByteBuffer.wrap(req.getProgressIndex())));
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/pipe/PipeEnrichedDeleteDataNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/pipe/PipeEnrichedDeleteDataNode.java
index 57450a3cd06..3c4043f2518 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/pipe/PipeEnrichedDeleteDataNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/pipe/PipeEnrichedDeleteDataNode.java
@@ -44,6 +44,8 @@ import java.util.stream.Collectors;
  *
  * <p>2.{@link DataExecutionVisitor}, to actually write data on data region 
and mark it as received
  * from pipe.
+ *
+ * <p>TODO: support relational deleteNode
  */
 public class PipeEnrichedDeleteDataNode extends DeleteDataNode {
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/AbstractDeleteDataNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/AbstractDeleteDataNode.java
new file mode 100644
index 00000000000..ff08fbd8dbd
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/AbstractDeleteDataNode.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.planner.plan.node.write;
+
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.commons.consensus.index.ProgressIndex;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntryValue;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+@SuppressWarnings({"java:S1854", "unused"})
+public abstract class AbstractDeleteDataNode extends SearchNode implements 
WALEntryValue {
+  protected TRegionReplicaSet regionReplicaSet;
+  protected ProgressIndex progressIndex;
+
+  protected AbstractDeleteDataNode(PlanNodeId id) {
+    super(id);
+  }
+
+  public abstract ByteBuffer serializeToDAL();
+
+  public static AbstractDeleteDataNode deserializeFromDAL(
+      ByteBuffer byteBuffer, boolean isRelational) {
+    if (isRelational) {
+      return RelationalDeleteDataNode.deserializeFromDAL(byteBuffer);
+    } else {
+      return DeleteDataNode.deserializeFromDAL(byteBuffer);
+    }
+  }
+
+  @Override
+  public ProgressIndex getProgressIndex() {
+    return progressIndex;
+  }
+
+  @Override
+  public void setProgressIndex(ProgressIndex progressIndex) {
+    this.progressIndex = progressIndex;
+  }
+
+  @Override
+  public List<PlanNode> getChildren() {
+    return new ArrayList<>();
+  }
+
+  @Override
+  public void addChild(PlanNode child) {
+    throw new UnsupportedOperationException("Not supported.");
+  }
+
+  @Override
+  public int allowedChildCount() {
+    return NO_CHILD_ALLOWED;
+  }
+
+  @Override
+  public List<String> getOutputColumnNames() {
+    return null;
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/DeleteDataNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/DeleteDataNode.java
index 1e73c13de5a..7374e44cbf3 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/DeleteDataNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/DeleteDataNode.java
@@ -38,7 +38,6 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.WritePlanNode;
 import 
org.apache.iotdb.db.storageengine.dataregion.wal.buffer.IWALByteBufferView;
-import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntryValue;
 import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALWriteUtils;
 
 import org.apache.tsfile.read.filter.factory.TimeFilterApi;
@@ -60,7 +59,7 @@ import java.util.stream.Collectors;
 
 import static 
org.apache.iotdb.commons.conf.IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD;
 
-public class DeleteDataNode extends SearchNode implements WALEntryValue {
+public class DeleteDataNode extends AbstractDeleteDataNode {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(DeleteDataNode.class);
 
   /** byte: type, integer: pathList.size(), long: deleteStartTime, 
deleteEndTime, searchIndex */
@@ -70,9 +69,6 @@ public class DeleteDataNode extends SearchNode implements 
WALEntryValue {
   private final long deleteStartTime;
   private final long deleteEndTime;
 
-  private TRegionReplicaSet regionReplicaSet;
-  private ProgressIndex progressIndex;
-
   public DeleteDataNode(
       PlanNodeId id, List<MeasurementPath> pathList, long deleteStartTime, 
long deleteEndTime) {
     super(id);
@@ -182,9 +178,11 @@ public class DeleteDataNode extends SearchNode implements 
WALEntryValue {
         planNodeId, pathList, deleteStartTime, deleteEndTime, 
deserializedIndex);
   }
 
+  @Override
   public ByteBuffer serializeToDAL() {
     try (PublicBAOS byteArrayOutputStream = new PublicBAOS();
         DataOutputStream outputStream = new 
DataOutputStream(byteArrayOutputStream)) {
+      DeleteNodeType.TREE_DELETE_NODE.serialize(outputStream);
       serializeAttributes(outputStream);
       progressIndex.serialize(outputStream);
       id.serialize(outputStream);
@@ -209,24 +207,6 @@ public class DeleteDataNode extends SearchNode implements 
WALEntryValue {
     return deleteEndTime;
   }
 
-  @Override
-  public ProgressIndex getProgressIndex() {
-    return progressIndex;
-  }
-
-  @Override
-  public void setProgressIndex(ProgressIndex progressIndex) {
-    this.progressIndex = progressIndex;
-  }
-
-  @Override
-  public List<PlanNode> getChildren() {
-    return new ArrayList<>();
-  }
-
-  @Override
-  public void addChild(PlanNode child) {}
-
   @Override
   public PlanNodeType getType() {
     return PlanNodeType.DELETE_DATA;
@@ -237,16 +217,6 @@ public class DeleteDataNode extends SearchNode implements 
WALEntryValue {
     return new DeleteDataNode(getPlanNodeId(), pathList, deleteStartTime, 
deleteEndTime);
   }
 
-  @Override
-  public int allowedChildCount() {
-    return NO_CHILD_ALLOWED;
-  }
-
-  @Override
-  public List<String> getOutputColumnNames() {
-    return null;
-  }
-
   @Override
   public int serializedSize() {
     int size = FIXED_SERIALIZED_SIZE;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/DeleteNodeType.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/DeleteNodeType.java
new file mode 100644
index 00000000000..a0942b03015
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/DeleteNodeType.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.planner.plan.node.write;
+
+import org.apache.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+/**
+ * Similar to <code>PlanNodeType</code>, this class is used to provide 
serialization and
+ * deserialization methods for Delete Ahead Log(DAL) which used for 
IoTConsensusV2 and pipe.
+ */
+public enum DeleteNodeType {
+  TREE_DELETE_NODE((short) 1),
+  RELATIONAL_DELETE_NODE((short) 2),
+  ;
+
+  public static final int BYTES = Short.BYTES;
+
+  private final short nodeType;
+
+  DeleteNodeType(short nodeType) {
+    this.nodeType = nodeType;
+  }
+
+  public short getNodeType() {
+    return nodeType;
+  }
+
+  public void serialize(ByteBuffer buffer) {
+    ReadWriteIOUtils.write(nodeType, buffer);
+  }
+
+  public void serialize(DataOutputStream stream) throws IOException {
+    ReadWriteIOUtils.write(nodeType, stream);
+  }
+
+  public static AbstractDeleteDataNode deserializeFromDAL(ByteBuffer buffer) {
+    short nodeType = buffer.getShort();
+    switch (nodeType) {
+      case 1:
+        return DeleteDataNode.deserializeFromDAL(buffer);
+      case 2:
+        return RelationalDeleteDataNode.deserializeFromDAL(buffer);
+      default:
+        throw new IllegalArgumentException("Invalid node type: " + nodeType);
+    }
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalDeleteDataNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalDeleteDataNode.java
index 6bd1db45273..3bda6d51443 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalDeleteDataNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalDeleteDataNode.java
@@ -33,7 +33,6 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Delete;
 import org.apache.iotdb.db.storageengine.dataregion.modification.ModEntry;
 import 
org.apache.iotdb.db.storageengine.dataregion.modification.TableDeletionEntry;
 import 
org.apache.iotdb.db.storageengine.dataregion.wal.buffer.IWALByteBufferView;
-import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntryValue;
 
 import org.apache.tsfile.utils.PublicBAOS;
 import org.apache.tsfile.utils.ReadWriteForEncodingUtils;
@@ -53,7 +52,7 @@ import java.util.Objects;
 import java.util.stream.Collectors;
 
 @SuppressWarnings({"java:S1854", "unused"})
-public class RelationalDeleteDataNode extends SearchNode implements 
WALEntryValue {
+public class RelationalDeleteDataNode extends AbstractDeleteDataNode {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(RelationalDeleteDataNode.class);
 
   /** byte: type */
@@ -63,9 +62,6 @@ public class RelationalDeleteDataNode extends SearchNode 
implements WALEntryValu
 
   private Collection<TRegionReplicaSet> replicaSets;
 
-  private TRegionReplicaSet regionReplicaSet;
-  private ProgressIndex progressIndex;
-
   public RelationalDeleteDataNode(PlanNodeId id, Delete delete) {
     super(id);
     this.modEntries = delete.getTableDeletionEntries();
@@ -165,9 +161,11 @@ public class RelationalDeleteDataNode extends SearchNode 
implements WALEntryValu
     return relationalDeleteDataNode;
   }
 
+  @Override
   public ByteBuffer serializeToDAL() {
     try (PublicBAOS byteArrayOutputStream = new PublicBAOS();
         DataOutputStream outputStream = new 
DataOutputStream(byteArrayOutputStream)) {
+      DeleteNodeType.RELATIONAL_DELETE_NODE.serialize(outputStream);
       serializeAttributes(outputStream);
       progressIndex.serialize(outputStream);
       id.serialize(outputStream);
@@ -179,26 +177,6 @@ public class RelationalDeleteDataNode extends SearchNode 
implements WALEntryValu
     }
   }
 
-  @Override
-  public ProgressIndex getProgressIndex() {
-    return progressIndex;
-  }
-
-  @Override
-  public void setProgressIndex(ProgressIndex progressIndex) {
-    this.progressIndex = progressIndex;
-  }
-
-  @Override
-  public List<PlanNode> getChildren() {
-    return new ArrayList<>();
-  }
-
-  @Override
-  public void addChild(PlanNode child) {
-    throw new UnsupportedOperationException("Not supported.");
-  }
-
   @Override
   public PlanNodeType getType() {
     return PlanNodeType.RELATIONAL_DELETE_DATA;
@@ -210,16 +188,6 @@ public class RelationalDeleteDataNode extends SearchNode 
implements WALEntryValu
     return new RelationalDeleteDataNode(getPlanNodeId(), modEntries);
   }
 
-  @Override
-  public int allowedChildCount() {
-    return NO_CHILD_ALLOWED;
-  }
-
-  @Override
-  public List<String> getOutputColumnNames() {
-    return null;
-  }
-
   @Override
   public int serializedSize() {
     int size = FIXED_SERIALIZED_SIZE + 
ReadWriteForEncodingUtils.varIntSize(modEntries.size());
@@ -288,7 +256,7 @@ public class RelationalDeleteDataNode extends SearchNode 
implements WALEntryValu
 
   @Override
   public int hashCode() {
-    return Objects.hash(getPlanNodeId(), modEntries);
+    return Objects.hash(getPlanNodeId(), modEntries, progressIndex);
   }
 
   public String toString() {
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/consensus/DeletionRecoverTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/consensus/DeletionRecoverTest.java
index cf23f763d20..f2add749724 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/consensus/DeletionRecoverTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/consensus/DeletionRecoverTest.java
@@ -26,29 +26,32 @@ import org.apache.iotdb.commons.utils.FileUtils;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.pipe.consensus.deletion.DeletionResourceManager;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.AbstractDeleteDataNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalDeleteDataNode;
+import 
org.apache.iotdb.db.storageengine.dataregion.modification.DeletionPredicate;
+import 
org.apache.iotdb.db.storageengine.dataregion.modification.IDPredicate.NOP;
+import 
org.apache.iotdb.db.storageengine.dataregion.modification.TableDeletionEntry;
 
+import org.apache.tsfile.read.common.TimeRange;
 import org.junit.After;
 import org.junit.Assert;
-import org.junit.Before;
 import org.junit.Test;
 
 import java.io.File;
 import java.util.Collections;
 
 public class DeletionRecoverTest {
-  private static final String FAKE_DATA_REGION_ID = "1";
+  private static final String[] FAKE_DATA_REGION_IDS = {"2", "3"};
   private static final int THIS_DATANODE_ID =
       IoTDBDescriptor.getInstance().getConfig().getDataNodeId();
   private static final String DELETION_BASE_DIR =
       
IoTDBDescriptor.getInstance().getConfig().getIotConsensusV2DeletionFileDir();
-  private static final String BASE_PATH = DELETION_BASE_DIR + File.separator + 
FAKE_DATA_REGION_ID;
   private final int deletionCount = 10;
   private DeletionResourceManager deletionResourceManager;
 
-  @Before
-  public void setUp() throws Exception {
-    File baseDir = new File(BASE_PATH);
+  public void setUp(boolean isRelational, String FAKE_DATA_REGION_ID) throws 
Exception {
+    File baseDir = new File(DELETION_BASE_DIR + File.separator + 
FAKE_DATA_REGION_ID);
     if (baseDir.exists()) {
       FileUtils.deleteFileOrDirectory(baseDir);
     }
@@ -58,8 +61,18 @@ public class DeletionRecoverTest {
     int rebootTimes = 0;
     MeasurementPath path = new MeasurementPath("root.vehicle.d2.s0");
     for (int i = 0; i < deletionCount; i++) {
-      DeleteDataNode deleteDataNode =
-          new DeleteDataNode(new PlanNodeId("1"), 
Collections.singletonList(path), 50, 150);
+      AbstractDeleteDataNode deleteDataNode;
+      if (isRelational) {
+        deleteDataNode =
+            new RelationalDeleteDataNode(
+                new PlanNodeId("testPlan"),
+                Collections.singletonList(
+                    new TableDeletionEntry(
+                        new DeletionPredicate("table1", new NOP()), new 
TimeRange(0, 10))));
+      } else {
+        deleteDataNode =
+            new DeleteDataNode(new PlanNodeId("1"), 
Collections.singletonList(path), 50, 150);
+      }
       deleteDataNode.setProgressIndex(
           new RecoverProgressIndex(THIS_DATANODE_ID, new 
SimpleProgressIndex(rebootTimes, i)));
       deletionResourceManager.registerDeletionResource(deleteDataNode);
@@ -70,14 +83,25 @@ public class DeletionRecoverTest {
 
   @After
   public void tearDown() throws Exception {
-    File baseDir = new File(DELETION_BASE_DIR + File.separator + 
FAKE_DATA_REGION_ID);
-    if (baseDir.exists()) {
-      FileUtils.deleteFileOrDirectory(baseDir);
+    for (String FAKE_DATA_REGION_ID : FAKE_DATA_REGION_IDS) {
+      File baseDir = new File(DELETION_BASE_DIR + File.separator + 
FAKE_DATA_REGION_ID);
+      if (baseDir.exists()) {
+        FileUtils.deleteFileOrDirectory(baseDir);
+      }
     }
   }
 
   @Test
-  public void testDeletionRecover() throws Exception {
+  public void testDeletionRecoverTreeModel() throws Exception {
+    setUp(false, FAKE_DATA_REGION_IDS[0]);
+    Assert.assertEquals(0, 
deletionResourceManager.getAllDeletionResources().size());
+    deletionResourceManager.recoverForTest();
+    Assert.assertEquals(deletionCount, 
deletionResourceManager.getAllDeletionResources().size());
+  }
+
+  @Test
+  public void testDeletionRecoverTableModel() throws Exception {
+    setUp(true, FAKE_DATA_REGION_IDS[1]);
     Assert.assertEquals(0, 
deletionResourceManager.getAllDeletionResources().size());
     deletionResourceManager.recoverForTest();
     Assert.assertEquals(deletionCount, 
deletionResourceManager.getAllDeletionResources().size());
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/consensus/DeletionResourceTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/consensus/DeletionResourceTest.java
index 1b62d4b63a1..39ab2f8e889 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/consensus/DeletionResourceTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/consensus/DeletionResourceTest.java
@@ -36,9 +36,15 @@ import 
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.PipeRealtimeDataRe
 import 
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.PipeRealtimeDataRegionHybridExtractor;
 import 
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.listener.PipeInsertionDataNodeListener;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.AbstractDeleteDataNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalDeleteDataNode;
+import 
org.apache.iotdb.db.storageengine.dataregion.modification.DeletionPredicate;
+import 
org.apache.iotdb.db.storageengine.dataregion.modification.IDPredicate.NOP;
+import 
org.apache.iotdb.db.storageengine.dataregion.modification.TableDeletionEntry;
 import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
 
+import org.apache.tsfile.read.common.TimeRange;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -91,14 +97,30 @@ public class DeletionResourceTest {
 
   @Test
   public void testAddBatchDeletionResource()
+      throws IllegalPathException, IOException, InterruptedException {
+    addBatchDeletionResource(true);
+    addBatchDeletionResource(false);
+  }
+
+  public void addBatchDeletionResource(boolean isRelational)
       throws IllegalPathException, InterruptedException, IOException {
     deletionResourceManager = 
DeletionResourceManager.getInstance(FAKE_DATA_REGION_IDS[1]);
     int deletionCount = 10;
     int rebootTimes = 0;
     MeasurementPath path = new MeasurementPath("root.vehicle.d2.s0");
     for (int i = 0; i < deletionCount; i++) {
-      DeleteDataNode deleteDataNode =
-          new DeleteDataNode(new PlanNodeId("1"), 
Collections.singletonList(path), 50, 150);
+      AbstractDeleteDataNode deleteDataNode;
+      if (isRelational) {
+        deleteDataNode =
+            new RelationalDeleteDataNode(
+                new PlanNodeId("testPlan"),
+                Collections.singletonList(
+                    new TableDeletionEntry(
+                        new DeletionPredicate("table1", new NOP()), new 
TimeRange(0, 10))));
+      } else {
+        deleteDataNode =
+            new DeleteDataNode(new PlanNodeId("1"), 
Collections.singletonList(path), 50, 150);
+      }
       deleteDataNode.setProgressIndex(
           new RecoverProgressIndex(THIS_DATANODE_ID, new 
SimpleProgressIndex(rebootTimes, i)));
       deletionResourceManager.registerDeletionResource(deleteDataNode);
@@ -111,12 +133,28 @@ public class DeletionResourceTest {
 
   @Test
   public void testAddDeletionResourceTimeout()
+      throws IllegalPathException, IOException, InterruptedException {
+    addDeletionResourceTimeout(true);
+    addDeletionResourceTimeout(false);
+  }
+
+  public void addDeletionResourceTimeout(boolean isRelational)
       throws IllegalPathException, InterruptedException, IOException {
     deletionResourceManager = 
DeletionResourceManager.getInstance(FAKE_DATA_REGION_IDS[2]);
     int rebootTimes = 0;
     MeasurementPath path = new MeasurementPath("root.vehicle.d2.s0");
-    DeleteDataNode deleteDataNode =
-        new DeleteDataNode(new PlanNodeId("1"), 
Collections.singletonList(path), 50, 150);
+    AbstractDeleteDataNode deleteDataNode;
+    if (isRelational) {
+      deleteDataNode =
+          new RelationalDeleteDataNode(
+              new PlanNodeId("testPlan"),
+              Collections.singletonList(
+                  new TableDeletionEntry(
+                      new DeletionPredicate("table1", new NOP()), new 
TimeRange(0, 10))));
+    } else {
+      deleteDataNode =
+          new DeleteDataNode(new PlanNodeId("1"), 
Collections.singletonList(path), 50, 150);
+    }
     deleteDataNode.setProgressIndex(
         new RecoverProgressIndex(THIS_DATANODE_ID, new 
SimpleProgressIndex(rebootTimes, 1)));
     // Only register one deletionResource
@@ -132,15 +170,31 @@ public class DeletionResourceTest {
 
   @Test
   public void testDeletionRemove() throws IllegalPathException, 
InterruptedException, IOException {
+    deletionRemove(true);
+    deletionRemove(false);
+  }
+
+  public void deletionRemove(boolean isRelational)
+      throws IllegalPathException, InterruptedException, IOException {
     deletionResourceManager = 
DeletionResourceManager.getInstance(FAKE_DATA_REGION_IDS[3]);
     // new a deletion
     int rebootTimes = 0;
-    int deletionCount = 100;
+    int deletionCount = 20;
     MeasurementPath path = new MeasurementPath("root.vehicle.d2.s0");
     List<PipeDeleteDataNodeEvent> deletionEvents = new ArrayList<>();
     for (int i = 0; i < deletionCount; i++) {
-      DeleteDataNode deleteDataNode =
-          new DeleteDataNode(new PlanNodeId("1"), 
Collections.singletonList(path), 50, 150);
+      AbstractDeleteDataNode deleteDataNode;
+      if (isRelational) {
+        deleteDataNode =
+            new RelationalDeleteDataNode(
+                new PlanNodeId("testPlan"),
+                Collections.singletonList(
+                    new TableDeletionEntry(
+                        new DeletionPredicate("table1", new NOP()), new 
TimeRange(0, 10))));
+      } else {
+        deleteDataNode =
+            new DeleteDataNode(new PlanNodeId("1"), 
Collections.singletonList(path), 50, 150);
+      }
       deleteDataNode.setProgressIndex(
           new RecoverProgressIndex(THIS_DATANODE_ID, new 
SimpleProgressIndex(rebootTimes, i)));
       PipeDeleteDataNodeEvent deletionEvent = new 
PipeDeleteDataNodeEvent(deleteDataNode, true);
@@ -196,7 +250,7 @@ public class DeletionResourceTest {
     deletionResourceManager = 
DeletionResourceManager.getInstance(FAKE_DATA_REGION_IDS[4]);
     int rebootTimes = 0;
     MeasurementPath path = new MeasurementPath("root.vehicle.d2.s0");
-    DeleteDataNode deleteDataNode =
+    AbstractDeleteDataNode deleteDataNode =
         new DeleteDataNode(new PlanNodeId("1"), 
Collections.singletonList(path), 50, 150);
     deleteDataNode.setProgressIndex(
         new RecoverProgressIndex(THIS_DATANODE_ID, new 
SimpleProgressIndex(rebootTimes, 1)));
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalDeleteDataNodeTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalDeleteDataNodeTest.java
index 6a2220e4adb..7ba8d2c5544 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalDeleteDataNodeTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalDeleteDataNodeTest.java
@@ -96,7 +96,7 @@ public class RelationalDeleteDataNodeTest {
     assertEquals(relationalDeleteDataNode, planNode);
 
     buffer = relationalDeleteDataNode.serializeToDAL();
-    RelationalDeleteDataNode deserialized = 
RelationalDeleteDataNode.deserializeFromDAL(buffer);
+    AbstractDeleteDataNode deserialized = 
DeleteNodeType.deserializeFromDAL(buffer);
     // plan node id is not serialized to DAL, manually set it to pass 
comparison
     deserialized.setPlanNodeId(relationalDeleteDataNode.getPlanNodeId());
     assertEquals(relationalDeleteDataNode, deserialized);


Reply via email to