jt2594838 commented on code in PR #15569:
URL: https://github.com/apache/iotdb/pull/15569#discussion_r2103755381


##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java:
##########
@@ -1393,8 +1393,9 @@ public void closeSelf(ConsensusPipeName 
consensusPipeName) {
    * although events can arrive receiver in a random sequence.
    */
   private class RequestExecutor {
-    private static final String THIS_NODE = "this node";
-    private static final String PIPE_TASK = "pipe task";
+    private static final String THIS_NODE = "sender dn restarts before this 
event was sent here";
+    private static final String PIPE_TASK = "pipe task restarts before this 
event was sent here";
+    private static final String REPLICATE_INDEX = "replicate index is out 
dated";

Review Comment:
   Give clearer variable names, 
   like MSG_NODE_RESTART_INDEX_STALE, MSG_PIPE_RESTART_INDEX_STALE, 
MSG_DUPLICATED_REPICATE_INDEX



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java:
##########
@@ -1415,13 +1416,42 @@ public RequestExecutor(
       this.reqExecutionOrderBuffer =
           new TreeSet<>(
               Comparator.comparingInt(RequestMeta::getDataNodeRebootTimes)
+                  .thenComparingInt(RequestMeta::getPipeTaskRestartTimes)
                   .thenComparingLong(RequestMeta::getReplicateIndex));
       this.lock = new ReentrantLock();
       this.condition = lock.newCondition();
       this.metric = metric;
       this.tsFileWriterPool = tsFileWriterPool;
     }
 
+    private TPipeConsensusTransferResp preCheck(TCommitId tCommitId) {
+      // if a req is deprecated, we will discard it
+      // This case may happen in this scenario: leader has transferred {1,2} 
and is intending to
+      // transfer {3, 4, 5, 6}. And in one moment, follower has received {4, 
5, 6}, {3} is still
+      // transferring due to some network latency.
+      // At this time, leader restarts, and it will resend {3, 4, 5, 6} with 
incremental
+      // rebootTimes. If the {3} sent before the leader restart arrives after 
the follower
+      // receives
+      // the request with incremental rebootTimes, the {3} sent before the 
leader restart needs to
+      // be discarded.
+      if (tCommitId.getDataNodeRebootTimes() < connectorRebootTimes) {
+        return deprecatedResp(THIS_NODE, tCommitId);
+      }
+      // Similarly, check pipeTask restartTimes
+      if (tCommitId.getDataNodeRebootTimes() == connectorRebootTimes
+          && tCommitId.getPipeTaskRestartTimes() < pipeTaskRestartTimes) {
+        return deprecatedResp(PIPE_TASK, tCommitId);
+      }
+      // Similarly, check replicationIndex
+      if (tCommitId.getDataNodeRebootTimes() == connectorRebootTimes
+          && tCommitId.getPipeTaskRestartTimes() == pipeTaskRestartTimes
+          && tCommitId.getReplicateIndex() < onSyncedReplicateIndex + 1) {
+        return deprecatedResp(REPLICATE_INDEX, tCommitId);
+      }

Review Comment:
   If the receiver receives a request with a stale reboot time but a correct 
replicate index, why can it not be accepted?
   For example:
   1. receiver.onSyncedReplicateIndex = 2
   2. sender sends {3, 4, 5, 6} with rebootTimes = 0
   3. sender reboots
   4. sender sends {3, 4, 5, 6} with rebootTimes = 1
   5. receiver receives {3} with rebootTimes = 1
   6. receiver receives {4} with rebootTimes = 0
     



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to