This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 62a04dc1676 Properly handle the case where a single IndexedRequest
contains multiple “delete PlanNode” operations on the receiving end of
IoTConsensus. #13418
62a04dc1676 is described below
commit 62a04dc16763f03b1b26691db91d939500006724
Author: Li Yu Heng <[email protected]>
AuthorDate: Fri Sep 6 10:18:41 2024 +0800
Properly handle the case where a single IndexedRequest contains multiple
“delete PlanNode” operations on the receiving end of IoTConsensus. #13418
---
.../dataregion/DataRegionStateMachine.java | 55 +++++++++++++++++++---
.../IoTConsensusDataRegionStateMachine.java | 4 +-
2 files changed, 51 insertions(+), 8 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataRegionStateMachine.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataRegionStateMachine.java
index 042d9e11e1b..3236199aa5c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataRegionStateMachine.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataRegionStateMachine.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.commons.consensus.DataRegionId;
+import org.apache.iotdb.commons.path.MeasurementPath;
import org.apache.iotdb.consensus.common.DataSet;
import org.apache.iotdb.consensus.common.request.IConsensusRequest;
import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
@@ -33,6 +34,7 @@ import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
import
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceManager;
import org.apache.iotdb.db.queryengine.plan.planner.plan.FragmentInstance;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertMultiTabletsNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
@@ -56,6 +58,7 @@ import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import java.util.stream.Collectors;
public class DataRegionStateMachine extends BaseStateMachine {
@@ -151,8 +154,9 @@ public class DataRegionStateMachine extends
BaseStateMachine {
}
}
- protected PlanNode grabInsertNode(IndexedConsensusRequest indexedRequest) {
+ protected PlanNode grabPlanNode(IndexedConsensusRequest indexedRequest) {
List<InsertNode> insertNodes = new
ArrayList<>(indexedRequest.getRequests().size());
+ List<DeleteDataNode> deleteDataNodes = new ArrayList<>();
for (IConsensusRequest req : indexedRequest.getRequests()) {
// PlanNode in IndexedConsensusRequest should always be InsertNode
PlanNode planNode = getPlanNode(req);
@@ -160,8 +164,9 @@ public class DataRegionStateMachine extends
BaseStateMachine {
((SearchNode)
planNode).setSearchIndex(indexedRequest.getSearchIndex());
}
if (planNode instanceof InsertNode) {
- InsertNode innerNode = (InsertNode) planNode;
- insertNodes.add(innerNode);
+ insertNodes.add((InsertNode) planNode);
+ } else if (planNode instanceof DeleteDataNode) {
+ deleteDataNodes.add((DeleteDataNode) planNode);
} else if (indexedRequest.getRequests().size() == 1) {
// If the planNode is not InsertNode, it is expected that the
IndexedConsensusRequest only
// contains one request
@@ -172,7 +177,45 @@ public class DataRegionStateMachine extends
BaseStateMachine {
+ "the size of requests are larger than 1");
}
}
- return mergeInsertNodes(insertNodes);
+ if (!insertNodes.isEmpty()) {
+ if (!deleteDataNodes.isEmpty()) {
+ throw new IllegalArgumentException(
+ "One indexedRequest cannot contain InsertNode and DeleteDataNode
at the same time");
+ }
+ return mergeInsertNodes(insertNodes);
+ }
+ return mergeDeleteDataNode(deleteDataNodes);
+ }
+
+ private DeleteDataNode mergeDeleteDataNode(List<DeleteDataNode>
deleteDataNodes) {
+ int size = deleteDataNodes.size();
+ if (size == 0) {
+ throw new IllegalArgumentException("deleteDataNodes is empty");
+ }
+ DeleteDataNode firstOne = deleteDataNodes.get(0);
+ if (size == 1) {
+ return firstOne;
+ }
+ if (!deleteDataNodes.stream()
+ .allMatch(
+ deleteDataNode ->
+ firstOne.getDeleteStartTime() ==
deleteDataNode.getDeleteStartTime()
+ && firstOne.getDeleteEndTime() ==
deleteDataNode.getDeleteEndTime())) {
+ throw new IllegalArgumentException(
+ "DeleteDataNodes which start time or end time are not same cannot be
merged");
+ }
+ List<MeasurementPath> pathList =
+ deleteDataNodes.stream()
+ .flatMap(deleteDataNode -> deleteDataNode.getPathList().stream())
+ // Some time the deleteDataNode list contains a path for multiple
times, so use
+ // distinct() to clear them
+ .distinct()
+ .collect(Collectors.toList());
+ return new DeleteDataNode(
+ firstOne.getPlanNodeId(),
+ pathList,
+ firstOne.getDeleteStartTime(),
+ firstOne.getDeleteEndTime());
}
/**
@@ -181,12 +224,12 @@ public class DataRegionStateMachine extends
BaseStateMachine {
* Notice: the continuity of insert nodes sharing same search index should
be protected by the
* upper layer.
*
- * @exception RuntimeException when insertNodes is empty
+ * @exception IllegalArgumentException when insertNodes is empty
*/
protected InsertNode mergeInsertNodes(List<InsertNode> insertNodes) {
int size = insertNodes.size();
if (size == 0) {
- throw new RuntimeException();
+ throw new IllegalArgumentException("insertNodes should never be empty");
}
if (size == 1) {
return insertNodes.get(0);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/IoTConsensusDataRegionStateMachine.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/IoTConsensusDataRegionStateMachine.java
index 775f1bf6f49..240c1b1caa0 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/IoTConsensusDataRegionStateMachine.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/IoTConsensusDataRegionStateMachine.java
@@ -79,7 +79,7 @@ public class IoTConsensusDataRegionStateMachine extends
DataRegionStateMachine {
IConsensusRequest result;
if (request instanceof IndexedConsensusRequest) {
IndexedConsensusRequest indexedRequest = (IndexedConsensusRequest)
request;
- result = grabInsertNode(indexedRequest);
+ result = grabPlanNode(indexedRequest);
} else if (request instanceof BatchIndexedConsensusRequest) {
BatchIndexedConsensusRequest batchRequest =
(BatchIndexedConsensusRequest) request;
DeserializedBatchIndexedConsensusRequest deserializedRequest =
@@ -88,7 +88,7 @@ public class IoTConsensusDataRegionStateMachine extends
DataRegionStateMachine {
batchRequest.getEndSyncIndex(),
batchRequest.getRequests().size());
for (IndexedConsensusRequest indexedRequest :
batchRequest.getRequests()) {
- final PlanNode planNode = grabInsertNode(indexedRequest);
+ final PlanNode planNode = grabPlanNode(indexedRequest);
if (planNode instanceof ComparableConsensusRequest) {
final IoTProgressIndex ioTProgressIndex =
new IoTProgressIndex(batchRequest.getSourcePeerId(),
indexedRequest.getSyncIndex());