jt2594838 commented on code in PR #15569:
URL: https://github.com/apache/iotdb/pull/15569#discussion_r2103755381
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java:
##########
@@ -1393,8 +1393,9 @@ public void closeSelf(ConsensusPipeName
consensusPipeName) {
* although events can arrive receiver in a random sequence.
*/
private class RequestExecutor {
- private static final String THIS_NODE = "this node";
- private static final String PIPE_TASK = "pipe task";
+ private static final String THIS_NODE = "sender dn restarts before this
event was sent here";
+ private static final String PIPE_TASK = "pipe task restarts before this
event was sent here";
+ private static final String REPLICATE_INDEX = "replicate index is out
dated";
Review Comment:
Give clearer variable names,
like MSG_NODE_RESTART_INDEX_STALE, MSG_PIPE_RESTART_INDEX_STALE,
MSG_DUPLICATED_REPICATE_INDEX
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java:
##########
@@ -1415,13 +1416,42 @@ public RequestExecutor(
this.reqExecutionOrderBuffer =
new TreeSet<>(
Comparator.comparingInt(RequestMeta::getDataNodeRebootTimes)
+ .thenComparingInt(RequestMeta::getPipeTaskRestartTimes)
.thenComparingLong(RequestMeta::getReplicateIndex));
this.lock = new ReentrantLock();
this.condition = lock.newCondition();
this.metric = metric;
this.tsFileWriterPool = tsFileWriterPool;
}
+ private TPipeConsensusTransferResp preCheck(TCommitId tCommitId) {
+ // if a req is deprecated, we will discard it
+ // This case may happen in this scenario: leader has transferred {1,2}
and is intending to
+ // transfer {3, 4, 5, 6}. And in one moment, follower has received {4,
5, 6}, {3} is still
+ // transferring due to some network latency.
+ // At this time, leader restarts, and it will resend {3, 4, 5, 6} with
incremental
+ // rebootTimes. If the {3} sent before the leader restart arrives after
the follower
+ // receives
+ // the request with incremental rebootTimes, the {3} sent before the
leader restart needs to
+ // be discarded.
+ if (tCommitId.getDataNodeRebootTimes() < connectorRebootTimes) {
+ return deprecatedResp(THIS_NODE, tCommitId);
+ }
+ // Similarly, check pipeTask restartTimes
+ if (tCommitId.getDataNodeRebootTimes() == connectorRebootTimes
+ && tCommitId.getPipeTaskRestartTimes() < pipeTaskRestartTimes) {
+ return deprecatedResp(PIPE_TASK, tCommitId);
+ }
+ // Similarly, check replicationIndex
+ if (tCommitId.getDataNodeRebootTimes() == connectorRebootTimes
+ && tCommitId.getPipeTaskRestartTimes() == pipeTaskRestartTimes
+ && tCommitId.getReplicateIndex() < onSyncedReplicateIndex + 1) {
+ return deprecatedResp(REPLICATE_INDEX, tCommitId);
+ }
Review Comment:
If the receiver receives a request with a stale reboot time but a correct
replicate index, why can it not be accepted?
For example:
1. receiver.onSyncedReplicateIndex = 2
2. sender sends {3, 4, 5, 6} with rebootTimes = 0
3. sender reboots
4. sender sends {3, 4, 5, 6} with rebootTimes = 1
5. receiver receives {3} with rebootTimes = 1
6. receiver receives {4} with rebootTimes = 0
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]