This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new a6897264a66 IoTConsensusV2: Support Table Model Replicate (#14169)
a6897264a66 is described below
commit a6897264a666a4240ac65e63f1a53e042aea66af
Author: Peng Junzhi <[email protected]>
AuthorDate: Fri Nov 22 22:09:30 2024 +0800
IoTConsensusV2: Support Table Model Replicate (#14169)
---
.../pipe/consensuspipe/ConsensusPipeManager.java | 6 ++
.../request/PipeConsensusDeleteNodeReq.java | 7 +-
.../pipe/consensus/deletion/DeletionResource.java | 15 ++--
.../deletion/DeletionResourceManager.java | 8 +--
.../common/deletion/PipeDeleteDataNodeEvent.java | 9 +--
.../event/realtime/PipeRealtimeEventFactory.java | 4 +-
.../listener/PipeInsertionDataNodeListener.java | 17 +----
.../pipeconsensus/PipeConsensusReceiver.java | 4 +-
.../plan/node/pipe/PipeEnrichedDeleteDataNode.java | 2 +
.../plan/node/write/AbstractDeleteDataNode.java | 81 ++++++++++++++++++++++
.../planner/plan/node/write/DeleteDataNode.java | 36 +---------
.../planner/plan/node/write/DeleteNodeType.java | 68 ++++++++++++++++++
.../plan/node/write/RelationalDeleteDataNode.java | 40 ++---------
.../db/pipe/consensus/DeletionRecoverTest.java | 48 +++++++++----
.../db/pipe/consensus/DeletionResourceTest.java | 70 ++++++++++++++++---
.../node/write/RelationalDeleteDataNodeTest.java | 2 +-
16 files changed, 292 insertions(+), 125 deletions(-)
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/consensuspipe/ConsensusPipeManager.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/consensuspipe/ConsensusPipeManager.java
index f4bb5ca8a35..aed0155e9c3 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/consensuspipe/ConsensusPipeManager.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/consensuspipe/ConsensusPipeManager.java
@@ -37,6 +37,8 @@ import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstan
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_PORT_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_REALTIME_FIRST_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_CAPTURE_TABLE_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_CAPTURE_TREE_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_CONSENSUS_GROUP_ID_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_CONSENSUS_RECEIVER_DATANODE_ID_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeExtractorConstant.EXTRACTOR_CONSENSUS_RESTORE_PROGRESS_PIPE_TASK_NAME_KEY;
@@ -103,6 +105,8 @@ public class ConsensusPipeManager {
EXTRACTOR_CONSENSUS_RECEIVER_DATANODE_ID_KEY,
String.valueOf(consensusPipeName.getReceiverDataNodeId()))
.put(EXTRACTOR_REALTIME_MODE_KEY, replicateMode.getValue())
+ .put(EXTRACTOR_CAPTURE_TABLE_KEY, String.valueOf(true))
+ .put(EXTRACTOR_CAPTURE_TREE_KEY, String.valueOf(true))
.build();
} else {
extractorParams =
@@ -119,6 +123,8 @@ public class ConsensusPipeManager {
EXTRACTOR_CONSENSUS_RECEIVER_DATANODE_ID_KEY,
String.valueOf(consensusPipeName.getReceiverDataNodeId()))
.put(EXTRACTOR_REALTIME_MODE_KEY, replicateMode.getValue())
+ .put(EXTRACTOR_CAPTURE_TABLE_KEY, String.valueOf(true))
+ .put(EXTRACTOR_CAPTURE_TREE_KEY, String.valueOf(true))
.put(
EXTRACTOR_CONSENSUS_RESTORE_PROGRESS_PIPE_TASK_NAME_KEY,
String.valueOf(new ConsensusPipeName(senderPeer,
regionMigrationCoordinatorPeer)))
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/payload/request/PipeConsensusDeleteNodeReq.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/payload/request/PipeConsensusDeleteNodeReq.java
index dab135f9282..c8bc33bb52a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/payload/request/PipeConsensusDeleteNodeReq.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/payload/request/PipeConsensusDeleteNodeReq.java
@@ -26,6 +26,7 @@ import
org.apache.iotdb.commons.pipe.connector.payload.pipeconsensus.request.Pip
import org.apache.iotdb.consensus.pipe.thrift.TCommitId;
import org.apache.iotdb.consensus.pipe.thrift.TPipeConsensusTransferReq;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.AbstractDeleteDataNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNode;
import org.apache.tsfile.utils.PublicBAOS;
@@ -39,20 +40,20 @@ import java.util.Objects;
public class PipeConsensusDeleteNodeReq extends TPipeConsensusTransferReq {
private static final Logger LOGGER =
LoggerFactory.getLogger(PipeConsensusDeleteNodeReq.class);
- private transient DeleteDataNode deleteDataNode;
+ private transient AbstractDeleteDataNode deleteDataNode;
private PipeConsensusDeleteNodeReq() {
// Do nothing
}
- public DeleteDataNode getDeleteDataNode() {
+ public AbstractDeleteDataNode getDeleteDataNode() {
return deleteDataNode;
}
/////////////////////////////// Thrift ///////////////////////////////
public static PipeConsensusDeleteNodeReq toTPipeConsensusTransferReq(
- DeleteDataNode deleteDataNode,
+ AbstractDeleteDataNode deleteDataNode,
TCommitId commitId,
TConsensusGroupId consensusGroupId,
ProgressIndex progressIndex,
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/DeletionResource.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/DeletionResource.java
index 2230f630322..afb34bfff6b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/DeletionResource.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/DeletionResource.java
@@ -27,7 +27,8 @@ import
org.apache.iotdb.commons.pipe.datastructure.PersistentResource;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.consensus.DataRegionConsensusImpl;
import org.apache.iotdb.db.pipe.event.common.deletion.PipeDeleteDataNodeEvent;
-import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.AbstractDeleteDataNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteNodeType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -47,7 +48,7 @@ public class DeletionResource implements PersistentResource {
private static final Logger LOGGER =
LoggerFactory.getLogger(DeletionResource.class);
private final Consumer<DeletionResource> removeHook;
private final AtomicInteger pipeTaskReferenceCount;
- private final DeleteDataNode deleteDataNode;
+ private final AbstractDeleteDataNode deleteDataNode;
private final ConsensusGroupId consensusGroupId;
private volatile Status currentStatus;
@@ -56,7 +57,9 @@ public class DeletionResource implements PersistentResource {
private volatile Exception cause;
public DeletionResource(
- DeleteDataNode deleteDataNode, Consumer<DeletionResource> removeHook,
String regionId) {
+ AbstractDeleteDataNode deleteDataNode,
+ Consumer<DeletionResource> removeHook,
+ String regionId) {
this.deleteDataNode = deleteDataNode;
this.removeHook = removeHook;
this.currentStatus = Status.RUNNING;
@@ -135,7 +138,7 @@ public class DeletionResource implements PersistentResource
{
return 0;
}
- public DeleteDataNode getDeleteDataNode() {
+ public AbstractDeleteDataNode getDeleteDataNode() {
return deleteDataNode;
}
@@ -149,11 +152,11 @@ public class DeletionResource implements
PersistentResource {
public static DeletionResource deserialize(
final ByteBuffer buffer, final String regionId, final
Consumer<DeletionResource> removeHook)
throws IOException {
- DeleteDataNode node = DeleteDataNode.deserializeFromDAL(buffer);
+ AbstractDeleteDataNode node = DeleteNodeType.deserializeFromDAL(buffer);
return new DeletionResource(node, removeHook, regionId);
}
- public static boolean isDeleteNodeGeneratedInLocalByIoTV2(DeleteDataNode
node) {
+ public static boolean
isDeleteNodeGeneratedInLocalByIoTV2(AbstractDeleteDataNode node) {
if (node.getProgressIndex() instanceof RecoverProgressIndex) {
RecoverProgressIndex recoverProgressIndex = (RecoverProgressIndex)
node.getProgressIndex();
return recoverProgressIndex
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/DeletionResourceManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/DeletionResourceManager.java
index 8459b3d3f1e..0f6ff3c6315 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/DeletionResourceManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/DeletionResourceManager.java
@@ -30,7 +30,7 @@ import
org.apache.iotdb.db.pipe.consensus.ProgressIndexDataNodeManager;
import org.apache.iotdb.db.pipe.consensus.deletion.persist.DeletionBuffer;
import
org.apache.iotdb.db.pipe.consensus.deletion.persist.PageCacheDeletionBuffer;
import org.apache.iotdb.db.pipe.consensus.deletion.recover.DeletionReader;
-import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.AbstractDeleteDataNode;
import com.google.common.collect.ImmutableList;
import org.slf4j.Logger;
@@ -64,7 +64,7 @@ public class DeletionResourceManager implements AutoCloseable
{
private final String dataRegionId;
private final DeletionBuffer deletionBuffer;
private final File storageDir;
- private final Map<DeleteDataNode, DeletionResource> deleteNode2ResourcesMap =
+ private final Map<AbstractDeleteDataNode, DeletionResource>
deleteNode2ResourcesMap =
new ConcurrentHashMap<>();
private final Lock recoverLock = new ReentrantLock();
private final Condition recoveryReadyCondition = recoverLock.newCondition();
@@ -133,7 +133,7 @@ public class DeletionResourceManager implements
AutoCloseable {
LOGGER.info("Deletion resource manager for {} has been successfully
closed!", dataRegionId);
}
- public DeletionResource registerDeletionResource(DeleteDataNode
deleteDataNode) {
+ public DeletionResource registerDeletionResource(AbstractDeleteDataNode
deleteDataNode) {
DeletionResource deletionResource =
deleteNode2ResourcesMap.computeIfAbsent(
deleteDataNode,
@@ -144,7 +144,7 @@ public class DeletionResourceManager implements
AutoCloseable {
return deletionResource;
}
- public DeletionResource getDeletionResource(DeleteDataNode deleteDataNode) {
+ public DeletionResource getDeletionResource(AbstractDeleteDataNode
deleteDataNode) {
return deleteNode2ResourcesMap.get(deleteDataNode);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/deletion/PipeDeleteDataNodeEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/deletion/PipeDeleteDataNodeEvent.java
index 00a8259a900..27bcb3f9143 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/deletion/PipeDeleteDataNodeEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/deletion/PipeDeleteDataNodeEvent.java
@@ -28,6 +28,7 @@ import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.commons.pipe.event.SerializableEvent;
import org.apache.iotdb.db.pipe.consensus.deletion.DeletionResource;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.AbstractDeleteDataNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNode;
import org.apache.tsfile.utils.ReadWriteIOUtils;
@@ -36,7 +37,7 @@ import java.nio.ByteBuffer;
import java.util.Optional;
public class PipeDeleteDataNodeEvent extends EnrichedEvent implements
SerializableEvent {
- private DeleteDataNode deleteDataNode;
+ private AbstractDeleteDataNode deleteDataNode;
private DeletionResource deletionResource;
private boolean isGeneratedByPipe;
private ProgressIndex progressIndex;
@@ -47,12 +48,12 @@ public class PipeDeleteDataNodeEvent extends EnrichedEvent
implements Serializab
}
public PipeDeleteDataNodeEvent(
- final DeleteDataNode deleteDataNode, final boolean isGeneratedByPipe) {
+ final AbstractDeleteDataNode deleteDataNode, final boolean
isGeneratedByPipe) {
this(deleteDataNode, null, 0, null, null, null, isGeneratedByPipe);
}
public PipeDeleteDataNodeEvent(
- final DeleteDataNode deleteDataNode,
+ final AbstractDeleteDataNode deleteDataNode,
final String pipeName,
final long creationTime,
final PipeTaskMeta pipeTaskMeta,
@@ -73,7 +74,7 @@ public class PipeDeleteDataNodeEvent extends EnrichedEvent
implements Serializab
.ifPresent(node -> this.progressIndex =
deleteDataNode.getProgressIndex());
}
- public DeleteDataNode getDeleteDataNode() {
+ public AbstractDeleteDataNode getDeleteDataNode() {
return deleteDataNode;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEventFactory.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEventFactory.java
index 1e3716b6304..b7841150dbe 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEventFactory.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEventFactory.java
@@ -25,7 +25,7 @@ import
org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
import
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
import
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.epoch.TsFileEpochManager;
-import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.AbstractDeleteDataNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALEntryHandler;
@@ -67,7 +67,7 @@ public class PipeRealtimeEventFactory {
new PipeHeartbeatEvent(dataRegionId, shouldPrintMessage), null, null,
null, null);
}
- public static PipeRealtimeEvent createRealtimeEvent(final DeleteDataNode
node) {
+ public static PipeRealtimeEvent createRealtimeEvent(final
AbstractDeleteDataNode node) {
return new PipeRealtimeEvent(
new PipeDeleteDataNodeEvent(node, node.isGeneratedByPipe()), null,
null, null, null);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/listener/PipeInsertionDataNodeListener.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/listener/PipeInsertionDataNodeListener.java
index 1a48a5267ed..b4c511bb44c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/listener/PipeInsertionDataNodeListener.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/listener/PipeInsertionDataNodeListener.java
@@ -26,9 +26,8 @@ import
org.apache.iotdb.db.pipe.consensus.deletion.DeletionResourceManager;
import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEventFactory;
import
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.PipeRealtimeDataRegionExtractor;
import
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.assigner.PipeDataRegionAssigner;
-import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.AbstractDeleteDataNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
-import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalDeleteDataNode;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALEntryHandler;
@@ -142,7 +141,8 @@ public class PipeInsertionDataNodeListener {
}
// TODO: record database name in enriched events?
- public DeletionResource listenToDeleteData(final String regionId, final
DeleteDataNode node) {
+ public DeletionResource listenToDeleteData(
+ final String regionId, final AbstractDeleteDataNode node) {
final PipeDataRegionAssigner assigner =
dataRegionId2Assigner.get(regionId);
// only events from registered data region will be extracted
if (assigner == null) {
@@ -169,12 +169,6 @@ public class PipeInsertionDataNodeListener {
return deletionResource;
}
- public DeletionResource listenToDeleteData(
- final String regionId, final RelationalDeleteDataNode node) {
- // TODO: implement
- return null;
- }
-
public void listenToHeartbeat(boolean shouldPrintMessage) {
dataRegionId2Assigner.forEach(
(key, value) ->
@@ -182,11 +176,6 @@ public class PipeInsertionDataNodeListener {
PipeRealtimeEventFactory.createRealtimeEvent(key,
shouldPrintMessage)));
}
- public void listenToDeleteData(DeleteDataNode node) {
- dataRegionId2Assigner.forEach(
- (key, value) ->
value.publishToAssign(PipeRealtimeEventFactory.createRealtimeEvent(node)));
- }
-
/////////////////////////////// singleton ///////////////////////////////
private PipeInsertionDataNodeListener() {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
index 1e1ae84f3a2..20164ff2293 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
@@ -50,7 +50,7 @@ import
org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.payload.request
import
org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.payload.request.PipeConsensusTsFileSealWithModReq;
import org.apache.iotdb.db.pipe.consensus.metric.PipeConsensusReceiverMetrics;
import
org.apache.iotdb.db.pipe.event.common.tsfile.aggregator.TsFileInsertionPointCounter;
-import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.AbstractDeleteDataNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
import org.apache.iotdb.db.storageengine.StorageEngine;
import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
@@ -314,7 +314,7 @@ public class PipeConsensusReceiver {
PipeConsensusServerImpl impl =
Optional.ofNullable(pipeConsensus.getImpl(consensusGroupId))
.orElseThrow(() -> new
ConsensusGroupNotExistException(consensusGroupId));
- final DeleteDataNode planNode = req.getDeleteDataNode();
+ final AbstractDeleteDataNode planNode = req.getDeleteDataNode();
planNode.markAsGeneratedByRemoteConsensusLeader();
planNode.setProgressIndex(
ProgressIndexType.deserializeFrom(ByteBuffer.wrap(req.getProgressIndex())));
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/pipe/PipeEnrichedDeleteDataNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/pipe/PipeEnrichedDeleteDataNode.java
index 57450a3cd06..3c4043f2518 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/pipe/PipeEnrichedDeleteDataNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/pipe/PipeEnrichedDeleteDataNode.java
@@ -44,6 +44,8 @@ import java.util.stream.Collectors;
*
* <p>2.{@link DataExecutionVisitor}, to actually write data on data region
and mark it as received
* from pipe.
+ *
+ * <p>TODO: support relational deleteNode
*/
public class PipeEnrichedDeleteDataNode extends DeleteDataNode {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/AbstractDeleteDataNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/AbstractDeleteDataNode.java
new file mode 100644
index 00000000000..ff08fbd8dbd
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/AbstractDeleteDataNode.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.planner.plan.node.write;
+
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.commons.consensus.index.ProgressIndex;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntryValue;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+@SuppressWarnings({"java:S1854", "unused"})
+public abstract class AbstractDeleteDataNode extends SearchNode implements
WALEntryValue {
+ protected TRegionReplicaSet regionReplicaSet;
+ protected ProgressIndex progressIndex;
+
+ protected AbstractDeleteDataNode(PlanNodeId id) {
+ super(id);
+ }
+
+ public abstract ByteBuffer serializeToDAL();
+
+ public static AbstractDeleteDataNode deserializeFromDAL(
+ ByteBuffer byteBuffer, boolean isRelational) {
+ if (isRelational) {
+ return RelationalDeleteDataNode.deserializeFromDAL(byteBuffer);
+ } else {
+ return DeleteDataNode.deserializeFromDAL(byteBuffer);
+ }
+ }
+
+ @Override
+ public ProgressIndex getProgressIndex() {
+ return progressIndex;
+ }
+
+ @Override
+ public void setProgressIndex(ProgressIndex progressIndex) {
+ this.progressIndex = progressIndex;
+ }
+
+ @Override
+ public List<PlanNode> getChildren() {
+ return new ArrayList<>();
+ }
+
+ @Override
+ public void addChild(PlanNode child) {
+ throw new UnsupportedOperationException("Not supported.");
+ }
+
+ @Override
+ public int allowedChildCount() {
+ return NO_CHILD_ALLOWED;
+ }
+
+ @Override
+ public List<String> getOutputColumnNames() {
+ return null;
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/DeleteDataNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/DeleteDataNode.java
index 1e73c13de5a..7374e44cbf3 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/DeleteDataNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/DeleteDataNode.java
@@ -38,7 +38,6 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.WritePlanNode;
import
org.apache.iotdb.db.storageengine.dataregion.wal.buffer.IWALByteBufferView;
-import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntryValue;
import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALWriteUtils;
import org.apache.tsfile.read.filter.factory.TimeFilterApi;
@@ -60,7 +59,7 @@ import java.util.stream.Collectors;
import static
org.apache.iotdb.commons.conf.IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD;
-public class DeleteDataNode extends SearchNode implements WALEntryValue {
+public class DeleteDataNode extends AbstractDeleteDataNode {
private static final Logger LOGGER =
LoggerFactory.getLogger(DeleteDataNode.class);
/** byte: type, integer: pathList.size(), long: deleteStartTime,
deleteEndTime, searchIndex */
@@ -70,9 +69,6 @@ public class DeleteDataNode extends SearchNode implements
WALEntryValue {
private final long deleteStartTime;
private final long deleteEndTime;
- private TRegionReplicaSet regionReplicaSet;
- private ProgressIndex progressIndex;
-
public DeleteDataNode(
PlanNodeId id, List<MeasurementPath> pathList, long deleteStartTime,
long deleteEndTime) {
super(id);
@@ -182,9 +178,11 @@ public class DeleteDataNode extends SearchNode implements
WALEntryValue {
planNodeId, pathList, deleteStartTime, deleteEndTime,
deserializedIndex);
}
+ @Override
public ByteBuffer serializeToDAL() {
try (PublicBAOS byteArrayOutputStream = new PublicBAOS();
DataOutputStream outputStream = new
DataOutputStream(byteArrayOutputStream)) {
+ DeleteNodeType.TREE_DELETE_NODE.serialize(outputStream);
serializeAttributes(outputStream);
progressIndex.serialize(outputStream);
id.serialize(outputStream);
@@ -209,24 +207,6 @@ public class DeleteDataNode extends SearchNode implements
WALEntryValue {
return deleteEndTime;
}
- @Override
- public ProgressIndex getProgressIndex() {
- return progressIndex;
- }
-
- @Override
- public void setProgressIndex(ProgressIndex progressIndex) {
- this.progressIndex = progressIndex;
- }
-
- @Override
- public List<PlanNode> getChildren() {
- return new ArrayList<>();
- }
-
- @Override
- public void addChild(PlanNode child) {}
-
@Override
public PlanNodeType getType() {
return PlanNodeType.DELETE_DATA;
@@ -237,16 +217,6 @@ public class DeleteDataNode extends SearchNode implements
WALEntryValue {
return new DeleteDataNode(getPlanNodeId(), pathList, deleteStartTime,
deleteEndTime);
}
- @Override
- public int allowedChildCount() {
- return NO_CHILD_ALLOWED;
- }
-
- @Override
- public List<String> getOutputColumnNames() {
- return null;
- }
-
@Override
public int serializedSize() {
int size = FIXED_SERIALIZED_SIZE;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/DeleteNodeType.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/DeleteNodeType.java
new file mode 100644
index 00000000000..a0942b03015
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/DeleteNodeType.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.planner.plan.node.write;
+
+import org.apache.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+/**
+ * Similar to <code>PlanNodeType</code>, this class is used to provide
serialization and
+ * deserialization methods for Delete Ahead Log(DAL) which used for
IoTConsensusV2 and pipe.
+ */
+public enum DeleteNodeType {
+ TREE_DELETE_NODE((short) 1),
+ RELATIONAL_DELETE_NODE((short) 2),
+ ;
+
+ public static final int BYTES = Short.BYTES;
+
+ private final short nodeType;
+
+ DeleteNodeType(short nodeType) {
+ this.nodeType = nodeType;
+ }
+
+ public short getNodeType() {
+ return nodeType;
+ }
+
+ public void serialize(ByteBuffer buffer) {
+ ReadWriteIOUtils.write(nodeType, buffer);
+ }
+
+ public void serialize(DataOutputStream stream) throws IOException {
+ ReadWriteIOUtils.write(nodeType, stream);
+ }
+
+ public static AbstractDeleteDataNode deserializeFromDAL(ByteBuffer buffer) {
+ short nodeType = buffer.getShort();
+ switch (nodeType) {
+ case 1:
+ return DeleteDataNode.deserializeFromDAL(buffer);
+ case 2:
+ return RelationalDeleteDataNode.deserializeFromDAL(buffer);
+ default:
+ throw new IllegalArgumentException("Invalid node type: " + nodeType);
+ }
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalDeleteDataNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalDeleteDataNode.java
index 6bd1db45273..3bda6d51443 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalDeleteDataNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalDeleteDataNode.java
@@ -33,7 +33,6 @@ import
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Delete;
import org.apache.iotdb.db.storageengine.dataregion.modification.ModEntry;
import
org.apache.iotdb.db.storageengine.dataregion.modification.TableDeletionEntry;
import
org.apache.iotdb.db.storageengine.dataregion.wal.buffer.IWALByteBufferView;
-import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntryValue;
import org.apache.tsfile.utils.PublicBAOS;
import org.apache.tsfile.utils.ReadWriteForEncodingUtils;
@@ -53,7 +52,7 @@ import java.util.Objects;
import java.util.stream.Collectors;
@SuppressWarnings({"java:S1854", "unused"})
-public class RelationalDeleteDataNode extends SearchNode implements
WALEntryValue {
+public class RelationalDeleteDataNode extends AbstractDeleteDataNode {
private static final Logger LOGGER =
LoggerFactory.getLogger(RelationalDeleteDataNode.class);
/** byte: type */
@@ -63,9 +62,6 @@ public class RelationalDeleteDataNode extends SearchNode
implements WALEntryValu
private Collection<TRegionReplicaSet> replicaSets;
- private TRegionReplicaSet regionReplicaSet;
- private ProgressIndex progressIndex;
-
public RelationalDeleteDataNode(PlanNodeId id, Delete delete) {
super(id);
this.modEntries = delete.getTableDeletionEntries();
@@ -165,9 +161,11 @@ public class RelationalDeleteDataNode extends SearchNode
implements WALEntryValu
return relationalDeleteDataNode;
}
+ @Override
public ByteBuffer serializeToDAL() {
try (PublicBAOS byteArrayOutputStream = new PublicBAOS();
DataOutputStream outputStream = new
DataOutputStream(byteArrayOutputStream)) {
+ DeleteNodeType.RELATIONAL_DELETE_NODE.serialize(outputStream);
serializeAttributes(outputStream);
progressIndex.serialize(outputStream);
id.serialize(outputStream);
@@ -179,26 +177,6 @@ public class RelationalDeleteDataNode extends SearchNode
implements WALEntryValu
}
}
- @Override
- public ProgressIndex getProgressIndex() {
- return progressIndex;
- }
-
- @Override
- public void setProgressIndex(ProgressIndex progressIndex) {
- this.progressIndex = progressIndex;
- }
-
- @Override
- public List<PlanNode> getChildren() {
- return new ArrayList<>();
- }
-
- @Override
- public void addChild(PlanNode child) {
- throw new UnsupportedOperationException("Not supported.");
- }
-
@Override
public PlanNodeType getType() {
return PlanNodeType.RELATIONAL_DELETE_DATA;
@@ -210,16 +188,6 @@ public class RelationalDeleteDataNode extends SearchNode
implements WALEntryValu
return new RelationalDeleteDataNode(getPlanNodeId(), modEntries);
}
- @Override
- public int allowedChildCount() {
- return NO_CHILD_ALLOWED;
- }
-
- @Override
- public List<String> getOutputColumnNames() {
- return null;
- }
-
@Override
public int serializedSize() {
int size = FIXED_SERIALIZED_SIZE +
ReadWriteForEncodingUtils.varIntSize(modEntries.size());
@@ -288,7 +256,7 @@ public class RelationalDeleteDataNode extends SearchNode
implements WALEntryValu
@Override
public int hashCode() {
- return Objects.hash(getPlanNodeId(), modEntries);
+ return Objects.hash(getPlanNodeId(), modEntries, progressIndex);
}
public String toString() {
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/consensus/DeletionRecoverTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/consensus/DeletionRecoverTest.java
index cf23f763d20..f2add749724 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/consensus/DeletionRecoverTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/consensus/DeletionRecoverTest.java
@@ -26,29 +26,32 @@ import org.apache.iotdb.commons.utils.FileUtils;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.pipe.consensus.deletion.DeletionResourceManager;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.AbstractDeleteDataNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalDeleteDataNode;
+import
org.apache.iotdb.db.storageengine.dataregion.modification.DeletionPredicate;
+import
org.apache.iotdb.db.storageengine.dataregion.modification.IDPredicate.NOP;
+import
org.apache.iotdb.db.storageengine.dataregion.modification.TableDeletionEntry;
+import org.apache.tsfile.read.common.TimeRange;
import org.junit.After;
import org.junit.Assert;
-import org.junit.Before;
import org.junit.Test;
import java.io.File;
import java.util.Collections;
public class DeletionRecoverTest {
- private static final String FAKE_DATA_REGION_ID = "1";
+ private static final String[] FAKE_DATA_REGION_IDS = {"2", "3"};
private static final int THIS_DATANODE_ID =
IoTDBDescriptor.getInstance().getConfig().getDataNodeId();
private static final String DELETION_BASE_DIR =
IoTDBDescriptor.getInstance().getConfig().getIotConsensusV2DeletionFileDir();
- private static final String BASE_PATH = DELETION_BASE_DIR + File.separator +
FAKE_DATA_REGION_ID;
private final int deletionCount = 10;
private DeletionResourceManager deletionResourceManager;
- @Before
- public void setUp() throws Exception {
- File baseDir = new File(BASE_PATH);
+ public void setUp(boolean isRelational, String FAKE_DATA_REGION_ID) throws
Exception {
+ File baseDir = new File(DELETION_BASE_DIR + File.separator +
FAKE_DATA_REGION_ID);
if (baseDir.exists()) {
FileUtils.deleteFileOrDirectory(baseDir);
}
@@ -58,8 +61,18 @@ public class DeletionRecoverTest {
int rebootTimes = 0;
MeasurementPath path = new MeasurementPath("root.vehicle.d2.s0");
for (int i = 0; i < deletionCount; i++) {
- DeleteDataNode deleteDataNode =
- new DeleteDataNode(new PlanNodeId("1"),
Collections.singletonList(path), 50, 150);
+ AbstractDeleteDataNode deleteDataNode;
+ if (isRelational) {
+ deleteDataNode =
+ new RelationalDeleteDataNode(
+ new PlanNodeId("testPlan"),
+ Collections.singletonList(
+ new TableDeletionEntry(
+ new DeletionPredicate("table1", new NOP()), new
TimeRange(0, 10))));
+ } else {
+ deleteDataNode =
+ new DeleteDataNode(new PlanNodeId("1"),
Collections.singletonList(path), 50, 150);
+ }
deleteDataNode.setProgressIndex(
new RecoverProgressIndex(THIS_DATANODE_ID, new
SimpleProgressIndex(rebootTimes, i)));
deletionResourceManager.registerDeletionResource(deleteDataNode);
@@ -70,14 +83,25 @@ public class DeletionRecoverTest {
@After
public void tearDown() throws Exception {
- File baseDir = new File(DELETION_BASE_DIR + File.separator +
FAKE_DATA_REGION_ID);
- if (baseDir.exists()) {
- FileUtils.deleteFileOrDirectory(baseDir);
+ for (String FAKE_DATA_REGION_ID : FAKE_DATA_REGION_IDS) {
+ File baseDir = new File(DELETION_BASE_DIR + File.separator +
FAKE_DATA_REGION_ID);
+ if (baseDir.exists()) {
+ FileUtils.deleteFileOrDirectory(baseDir);
+ }
}
}
@Test
- public void testDeletionRecover() throws Exception {
+ public void testDeletionRecoverTreeModel() throws Exception {
+ setUp(false, FAKE_DATA_REGION_IDS[0]);
+ Assert.assertEquals(0,
deletionResourceManager.getAllDeletionResources().size());
+ deletionResourceManager.recoverForTest();
+ Assert.assertEquals(deletionCount,
deletionResourceManager.getAllDeletionResources().size());
+ }
+
+ @Test
+ public void testDeletionRecoverTableModel() throws Exception {
+ setUp(true, FAKE_DATA_REGION_IDS[1]);
Assert.assertEquals(0,
deletionResourceManager.getAllDeletionResources().size());
deletionResourceManager.recoverForTest();
Assert.assertEquals(deletionCount,
deletionResourceManager.getAllDeletionResources().size());
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/consensus/DeletionResourceTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/consensus/DeletionResourceTest.java
index 1b62d4b63a1..39ab2f8e889 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/consensus/DeletionResourceTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/consensus/DeletionResourceTest.java
@@ -36,9 +36,15 @@ import
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.PipeRealtimeDataRe
import
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.PipeRealtimeDataRegionHybridExtractor;
import
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.listener.PipeInsertionDataNodeListener;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.AbstractDeleteDataNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalDeleteDataNode;
+import
org.apache.iotdb.db.storageengine.dataregion.modification.DeletionPredicate;
+import
org.apache.iotdb.db.storageengine.dataregion.modification.IDPredicate.NOP;
+import
org.apache.iotdb.db.storageengine.dataregion.modification.TableDeletionEntry;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
+import org.apache.tsfile.read.common.TimeRange;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -91,14 +97,30 @@ public class DeletionResourceTest {
@Test
public void testAddBatchDeletionResource()
+ throws IllegalPathException, IOException, InterruptedException {
+ addBatchDeletionResource(true);
+ addBatchDeletionResource(false);
+ }
+
+ public void addBatchDeletionResource(boolean isRelational)
throws IllegalPathException, InterruptedException, IOException {
deletionResourceManager =
DeletionResourceManager.getInstance(FAKE_DATA_REGION_IDS[1]);
int deletionCount = 10;
int rebootTimes = 0;
MeasurementPath path = new MeasurementPath("root.vehicle.d2.s0");
for (int i = 0; i < deletionCount; i++) {
- DeleteDataNode deleteDataNode =
- new DeleteDataNode(new PlanNodeId("1"),
Collections.singletonList(path), 50, 150);
+ AbstractDeleteDataNode deleteDataNode;
+ if (isRelational) {
+ deleteDataNode =
+ new RelationalDeleteDataNode(
+ new PlanNodeId("testPlan"),
+ Collections.singletonList(
+ new TableDeletionEntry(
+ new DeletionPredicate("table1", new NOP()), new
TimeRange(0, 10))));
+ } else {
+ deleteDataNode =
+ new DeleteDataNode(new PlanNodeId("1"),
Collections.singletonList(path), 50, 150);
+ }
deleteDataNode.setProgressIndex(
new RecoverProgressIndex(THIS_DATANODE_ID, new
SimpleProgressIndex(rebootTimes, i)));
deletionResourceManager.registerDeletionResource(deleteDataNode);
@@ -111,12 +133,28 @@ public class DeletionResourceTest {
@Test
public void testAddDeletionResourceTimeout()
+ throws IllegalPathException, IOException, InterruptedException {
+ addDeletionResourceTimeout(true);
+ addDeletionResourceTimeout(false);
+ }
+
+ public void addDeletionResourceTimeout(boolean isRelational)
throws IllegalPathException, InterruptedException, IOException {
deletionResourceManager =
DeletionResourceManager.getInstance(FAKE_DATA_REGION_IDS[2]);
int rebootTimes = 0;
MeasurementPath path = new MeasurementPath("root.vehicle.d2.s0");
- DeleteDataNode deleteDataNode =
- new DeleteDataNode(new PlanNodeId("1"),
Collections.singletonList(path), 50, 150);
+ AbstractDeleteDataNode deleteDataNode;
+ if (isRelational) {
+ deleteDataNode =
+ new RelationalDeleteDataNode(
+ new PlanNodeId("testPlan"),
+ Collections.singletonList(
+ new TableDeletionEntry(
+ new DeletionPredicate("table1", new NOP()), new
TimeRange(0, 10))));
+ } else {
+ deleteDataNode =
+ new DeleteDataNode(new PlanNodeId("1"),
Collections.singletonList(path), 50, 150);
+ }
deleteDataNode.setProgressIndex(
new RecoverProgressIndex(THIS_DATANODE_ID, new
SimpleProgressIndex(rebootTimes, 1)));
// Only register one deletionResource
@@ -132,15 +170,31 @@ public class DeletionResourceTest {
@Test
public void testDeletionRemove() throws IllegalPathException,
InterruptedException, IOException {
+ deletionRemove(true);
+ deletionRemove(false);
+ }
+
+ public void deletionRemove(boolean isRelational)
+ throws IllegalPathException, InterruptedException, IOException {
deletionResourceManager =
DeletionResourceManager.getInstance(FAKE_DATA_REGION_IDS[3]);
// new a deletion
int rebootTimes = 0;
- int deletionCount = 100;
+ int deletionCount = 20;
MeasurementPath path = new MeasurementPath("root.vehicle.d2.s0");
List<PipeDeleteDataNodeEvent> deletionEvents = new ArrayList<>();
for (int i = 0; i < deletionCount; i++) {
- DeleteDataNode deleteDataNode =
- new DeleteDataNode(new PlanNodeId("1"),
Collections.singletonList(path), 50, 150);
+ AbstractDeleteDataNode deleteDataNode;
+ if (isRelational) {
+ deleteDataNode =
+ new RelationalDeleteDataNode(
+ new PlanNodeId("testPlan"),
+ Collections.singletonList(
+ new TableDeletionEntry(
+ new DeletionPredicate("table1", new NOP()), new
TimeRange(0, 10))));
+ } else {
+ deleteDataNode =
+ new DeleteDataNode(new PlanNodeId("1"),
Collections.singletonList(path), 50, 150);
+ }
deleteDataNode.setProgressIndex(
new RecoverProgressIndex(THIS_DATANODE_ID, new
SimpleProgressIndex(rebootTimes, i)));
PipeDeleteDataNodeEvent deletionEvent = new
PipeDeleteDataNodeEvent(deleteDataNode, true);
@@ -196,7 +250,7 @@ public class DeletionResourceTest {
deletionResourceManager =
DeletionResourceManager.getInstance(FAKE_DATA_REGION_IDS[4]);
int rebootTimes = 0;
MeasurementPath path = new MeasurementPath("root.vehicle.d2.s0");
- DeleteDataNode deleteDataNode =
+ AbstractDeleteDataNode deleteDataNode =
new DeleteDataNode(new PlanNodeId("1"),
Collections.singletonList(path), 50, 150);
deleteDataNode.setProgressIndex(
new RecoverProgressIndex(THIS_DATANODE_ID, new
SimpleProgressIndex(rebootTimes, 1)));
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalDeleteDataNodeTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalDeleteDataNodeTest.java
index 6a2220e4adb..7ba8d2c5544 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalDeleteDataNodeTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalDeleteDataNodeTest.java
@@ -96,7 +96,7 @@ public class RelationalDeleteDataNodeTest {
assertEquals(relationalDeleteDataNode, planNode);
buffer = relationalDeleteDataNode.serializeToDAL();
- RelationalDeleteDataNode deserialized =
RelationalDeleteDataNode.deserializeFromDAL(buffer);
+ AbstractDeleteDataNode deserialized =
DeleteNodeType.deserializeFromDAL(buffer);
// plan node id is not serialized to DAL, manually set it to pass
comparison
deserialized.setPlanNodeId(relationalDeleteDataNode.getPlanNodeId());
assertEquals(relationalDeleteDataNode, deserialized);