This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 62a04dc1676 Properly handle the case where a single IndexedRequest 
contains multiple “delete PlanNode” operations on the receiving end of 
IoTConsensus. #13418
62a04dc1676 is described below

commit 62a04dc16763f03b1b26691db91d939500006724
Author: Li Yu Heng <[email protected]>
AuthorDate: Fri Sep 6 10:18:41 2024 +0800

    Properly handle the case where a single IndexedRequest contains multiple 
“delete PlanNode” operations on the receiving end of IoTConsensus. #13418
---
 .../dataregion/DataRegionStateMachine.java         | 55 +++++++++++++++++++---
 .../IoTConsensusDataRegionStateMachine.java        |  4 +-
 2 files changed, 51 insertions(+), 8 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataRegionStateMachine.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataRegionStateMachine.java
index 042d9e11e1b..3236199aa5c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataRegionStateMachine.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataRegionStateMachine.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.conf.CommonDescriptor;
 import org.apache.iotdb.commons.consensus.ConsensusGroupId;
 import org.apache.iotdb.commons.consensus.DataRegionId;
+import org.apache.iotdb.commons.path.MeasurementPath;
 import org.apache.iotdb.consensus.common.DataSet;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
@@ -33,6 +34,7 @@ import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
 import 
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceManager;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.FragmentInstance;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertMultiTabletsNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
@@ -56,6 +58,7 @@ import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.stream.Collectors;
 
 public class DataRegionStateMachine extends BaseStateMachine {
 
@@ -151,8 +154,9 @@ public class DataRegionStateMachine extends 
BaseStateMachine {
     }
   }
 
-  protected PlanNode grabInsertNode(IndexedConsensusRequest indexedRequest) {
+  protected PlanNode grabPlanNode(IndexedConsensusRequest indexedRequest) {
     List<InsertNode> insertNodes = new 
ArrayList<>(indexedRequest.getRequests().size());
+    List<DeleteDataNode> deleteDataNodes = new ArrayList<>();
     for (IConsensusRequest req : indexedRequest.getRequests()) {
       // PlanNode in IndexedConsensusRequest should always be InsertNode
       PlanNode planNode = getPlanNode(req);
@@ -160,8 +164,9 @@ public class DataRegionStateMachine extends 
BaseStateMachine {
         ((SearchNode) 
planNode).setSearchIndex(indexedRequest.getSearchIndex());
       }
       if (planNode instanceof InsertNode) {
-        InsertNode innerNode = (InsertNode) planNode;
-        insertNodes.add(innerNode);
+        insertNodes.add((InsertNode) planNode);
+      } else if (planNode instanceof DeleteDataNode) {
+        deleteDataNodes.add((DeleteDataNode) planNode);
       } else if (indexedRequest.getRequests().size() == 1) {
         // If the planNode is not InsertNode, it is expected that the 
IndexedConsensusRequest only
         // contains one request
@@ -172,7 +177,45 @@ public class DataRegionStateMachine extends 
BaseStateMachine {
                 + "the size of requests are larger than 1");
       }
     }
-    return mergeInsertNodes(insertNodes);
+    if (!insertNodes.isEmpty()) {
+      if (!deleteDataNodes.isEmpty()) {
+        throw new IllegalArgumentException(
+            "One indexedRequest cannot contain InsertNode and DeleteDataNode 
at the same time");
+      }
+      return mergeInsertNodes(insertNodes);
+    }
+    return mergeDeleteDataNode(deleteDataNodes);
+  }
+
+  private DeleteDataNode mergeDeleteDataNode(List<DeleteDataNode> 
deleteDataNodes) {
+    int size = deleteDataNodes.size();
+    if (size == 0) {
+      throw new IllegalArgumentException("deleteDataNodes is empty");
+    }
+    DeleteDataNode firstOne = deleteDataNodes.get(0);
+    if (size == 1) {
+      return firstOne;
+    }
+    if (!deleteDataNodes.stream()
+        .allMatch(
+            deleteDataNode ->
+                firstOne.getDeleteStartTime() == 
deleteDataNode.getDeleteStartTime()
+                    && firstOne.getDeleteEndTime() == 
deleteDataNode.getDeleteEndTime())) {
+      throw new IllegalArgumentException(
+          "DeleteDataNodes which start time or end time are not same cannot be 
merged");
+    }
+    List<MeasurementPath> pathList =
+        deleteDataNodes.stream()
+            .flatMap(deleteDataNode -> deleteDataNode.getPathList().stream())
+            // Some time the deleteDataNode list contains a path for multiple 
times, so use
+            // distinct() to clear them
+            .distinct()
+            .collect(Collectors.toList());
+    return new DeleteDataNode(
+        firstOne.getPlanNodeId(),
+        pathList,
+        firstOne.getDeleteStartTime(),
+        firstOne.getDeleteEndTime());
   }
 
   /**
@@ -181,12 +224,12 @@ public class DataRegionStateMachine extends 
BaseStateMachine {
    * Notice: the continuity of insert nodes sharing same search index should 
be protected by the
    * upper layer.
    *
-   * @exception RuntimeException when insertNodes is empty
+   * @exception IllegalArgumentException when insertNodes is empty
    */
   protected InsertNode mergeInsertNodes(List<InsertNode> insertNodes) {
     int size = insertNodes.size();
     if (size == 0) {
-      throw new RuntimeException();
+      throw new IllegalArgumentException("insertNodes should never be empty");
     }
     if (size == 1) {
       return insertNodes.get(0);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/IoTConsensusDataRegionStateMachine.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/IoTConsensusDataRegionStateMachine.java
index 775f1bf6f49..240c1b1caa0 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/IoTConsensusDataRegionStateMachine.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/IoTConsensusDataRegionStateMachine.java
@@ -79,7 +79,7 @@ public class IoTConsensusDataRegionStateMachine extends 
DataRegionStateMachine {
     IConsensusRequest result;
     if (request instanceof IndexedConsensusRequest) {
       IndexedConsensusRequest indexedRequest = (IndexedConsensusRequest) 
request;
-      result = grabInsertNode(indexedRequest);
+      result = grabPlanNode(indexedRequest);
     } else if (request instanceof BatchIndexedConsensusRequest) {
       BatchIndexedConsensusRequest batchRequest = 
(BatchIndexedConsensusRequest) request;
       DeserializedBatchIndexedConsensusRequest deserializedRequest =
@@ -88,7 +88,7 @@ public class IoTConsensusDataRegionStateMachine extends 
DataRegionStateMachine {
               batchRequest.getEndSyncIndex(),
               batchRequest.getRequests().size());
       for (IndexedConsensusRequest indexedRequest : 
batchRequest.getRequests()) {
-        final PlanNode planNode = grabInsertNode(indexedRequest);
+        final PlanNode planNode = grabPlanNode(indexedRequest);
         if (planNode instanceof ComparableConsensusRequest) {
           final IoTProgressIndex ioTProgressIndex =
               new IoTProgressIndex(batchRequest.getSourcePeerId(), 
indexedRequest.getSyncIndex());

Reply via email to