jsancio commented on code in PR #17807:
URL: https://github.com/apache/kafka/pull/17807#discussion_r1848488508


##########
clients/src/main/resources/common/message/VoteRequest.json:
##########
@@ -34,18 +35,20 @@
           "versions": "0+", "fields": [
             { "name": "PartitionIndex", "type": "int32", "versions": "0+",
               "about": "The partition index." },
-            { "name": "CandidateEpoch", "type": "int32", "versions": "0+",
-              "about": "The bumped epoch of the candidate sending the 
request"},
-            { "name": "CandidateId", "type": "int32", "versions": "0+", 
"entityType": "brokerId",
+            { "name": "ReplicaEpoch", "type": "int32", "versions": "0+",
+              "about": "The epoch of the replica sending the request"},

Review Comment:
   How about "The epoch of the voter sending the request"? This matches the 
wording for `ReplicaId` and `ReplicaDirectoryId`.



##########
clients/src/main/resources/common/message/VoteRequest.json:
##########
@@ -34,18 +35,20 @@
           "versions": "0+", "fields": [
             { "name": "PartitionIndex", "type": "int32", "versions": "0+",
               "about": "The partition index." },
-            { "name": "CandidateEpoch", "type": "int32", "versions": "0+",
-              "about": "The bumped epoch of the candidate sending the 
request"},
-            { "name": "CandidateId", "type": "int32", "versions": "0+", 
"entityType": "brokerId",
+            { "name": "ReplicaEpoch", "type": "int32", "versions": "0+",
+              "about": "The epoch of the replica sending the request"},
+            { "name": "ReplicaId", "type": "int32", "versions": "0+", 
"entityType": "brokerId",
               "about": "The replica id of the voter sending the request"},
-            { "name": "CandidateDirectoryId", "type": "uuid", "versions": 
"1+", "ignorable": true,
+            { "name": "ReplicaDirectoryId", "type": "uuid", "versions": "1+", 
"ignorable": true,
               "about": "The directory id of the voter sending the request" },
             { "name": "VoterDirectoryId", "type": "uuid", "versions": "1+", 
"ignorable": true,
-              "about": "The ID of the voter sending the request"},
+              "about": "The directory id of the voter receiving the request"},
             { "name": "LastOffsetEpoch", "type": "int32", "versions": "0+",
               "about": "The epoch of the last record written to the metadata 
log"},
             { "name": "LastOffset", "type": "int64", "versions": "0+",
-              "about": "The offset of the last record written to the metadata 
log"}
+              "about": "The offset of the last record written to the metadata 
log"},
+            { "name": "PreVote", "type": "bool", "versions": "2+",
+              "about": "Whether the request is a PreVote request (no epoch 
increase) or not."}

Review Comment:
   I don't think "increasing the epoch" is the important invariant of this type 
of request. The important invariant, is that votes for pre-vote request are not 
persistent and the replica giving their vote can pre-vote for multiple 
different replicas in the same epoch.



##########
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##########
@@ -779,39 +781,43 @@ private VoteResponseData handleVoteRequest(
         VoteRequestData.PartitionData partitionRequest =
             request.topics().get(0).partitions().get(0);
 
-        int candidateId = partitionRequest.candidateId();
-        int candidateEpoch = partitionRequest.candidateEpoch();
+        int replicaId = partitionRequest.replicaId();
+        int replicaEpoch = partitionRequest.replicaEpoch();
+        boolean preVote = partitionRequest.preVote();
 
         int lastEpoch = partitionRequest.lastOffsetEpoch();
         long lastEpochEndOffset = partitionRequest.lastOffset();
-        if (lastEpochEndOffset < 0 || lastEpoch < 0 || lastEpoch >= 
candidateEpoch) {
+        boolean isIllegalEpoch = preVote ? lastEpoch > replicaEpoch : 
lastEpoch >= replicaEpoch;

Review Comment:
   I see. Let's write a comment explaining this check.
   
   This is my understanding. When the replica is sending a normal vote request 
the replica's epoch must be greater than the largest epoch on their log. This 
is true because the candidate replica will always request a vote for an epoch 
for which there is no previously known leader, or record in the log.
   
   When the pre-vote flag is true then there may be a known leader for the last 
epoch so there may be a record in the log for that epoch.
   
   Both of these cases should be rare and indicate a programming error in the 
remote replica. Let's log an INFO message with all of the relevant information 
when this condition is true.



##########
clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java:
##########
@@ -1181,7 +1181,7 @@ private AbstractResponse getResponse(ApiKeys apikey, 
short version) {
             case ALTER_CLIENT_QUOTAS: return createAlterClientQuotasResponse();
             case DESCRIBE_USER_SCRAM_CREDENTIALS: return 
createDescribeUserScramCredentialsResponse();
             case ALTER_USER_SCRAM_CREDENTIALS: return 
createAlterUserScramCredentialsResponse();
-            case VOTE: return createVoteResponse();
+            case VOTE: return createVoteResponse(version);
             case BEGIN_QUORUM_EPOCH: return createBeginQuorumEpochResponse();
             case END_QUORUM_EPOCH: return createEndQuorumEpochResponse();
             case DESCRIBE_QUORUM: return createDescribeQuorumResponse();

Review Comment:
   I see. I haven't looked at the rest of these tests in detail but should we 
create an issue to test the different versions of the responses for the KRaft 
specific RPCs?



##########
raft/src/main/java/org/apache/kafka/raft/EpochState.java:
##########
@@ -26,16 +26,16 @@ default Optional<LogOffsetMetadata> highWatermark() {
     }
 
     /**
-     * Decide whether to grant a vote to a candidate.
+     * Decide whether to grant a vote to a replica.
      *
      * It is the responsibility of the caller to invoke
      * {@link QuorumState#transitionToUnattachedVotedState(int, ReplicaKey)} 
if vote is granted.
      *
-     * @param candidateKey the id and directory of the candidate
-     * @param isLogUpToDate whether the candidate’s log is at least as 
up-to-date as receiver’s log
+     * @param replicaKey the id and directory of the replica requesting the 
vote
+     * @param isLogUpToDate whether the replica's log is at least as 
up-to-date as receiver’s log
      * @return true if it can grant the vote, false otherwise
      */
-    boolean canGrantVote(ReplicaKey candidateKey, boolean isLogUpToDate);
+    boolean canGrantVote(ReplicaKey replicaKey, boolean isLogUpToDate);

Review Comment:
   The implementation of this method is not correct for all cases. The KIP says 
the following:
   > When servers receive VoteRequests with the PreVote field set to true, they 
will respond with VoteGranted set to
   >  . true if they are not a Follower and the epoch and offsets in the 
Pre-Vote request satisfy the same requirements as a standard vote
   >  . false if otherwise
   
   I know that your PR description says the following:
   > Not included in this PR are changes to Follower state's canGrantVote. This 
may need to change in later PreVote PR to address a ping-pong scenario brought 
up by Jack Vanlightly. (ex: 3 node quorum, leader node A disconnects from 
quorum, node B goes into prospective state first before node C, node B sends 
prevote request to node C still in follower state and receives back that node A 
is leader, node B transitions to follower while node C transitions to 
prospective after election timeout, repeating this cycle) One way to tackle 
this is by having follower state nodes grant prevote requests based off last 
fetch time, which would involve changing canGrantVote, but this makes more 
sense to tackle after the Prospective state exists.
   
   But I think we need to implement and test this logic in this PR. Else the 
handling of this RPC is not correct for all software versions that support this 
RPC.



##########
raft/src/test/java/org/apache/kafka/raft/RaftUtilTest.java:
##########
@@ -187,13 +187,19 @@ private static Stream<Arguments> voteRequestTestCases() {
         return Stream.of(
                 Arguments.of((short) 0,
                         
"{\"clusterId\":\"I4ZmrWqfT2e-upky_4fdPA\",\"topics\":[{\"topicName\":\"topic\","
 +
-                            
"\"partitions\":[{\"partitionIndex\":1,\"candidateEpoch\":1,\"candidateId\":1," 
+
+                            
"\"partitions\":[{\"partitionIndex\":1,\"replicaEpoch\":1,\"replicaId\":1," +
                             
"\"lastOffsetEpoch\":1000,\"lastOffset\":1000}]}]}"),
                 Arguments.of((short) 1,
                         
"{\"clusterId\":\"I4ZmrWqfT2e-upky_4fdPA\",\"voterId\":2,\"topics\":[{" +
-                            
"\"topicName\":\"topic\",\"partitions\":[{\"partitionIndex\":1,\"candidateEpoch\":1,"
 +
-                            
"\"candidateId\":1,\"candidateDirectoryId\":\"AAAAAAAAAAAAAAAAAAAAAQ\"," +
-                            
"\"voterDirectoryId\":\"AAAAAAAAAAAAAAAAAAAAAQ\",\"lastOffsetEpoch\":1000,\"lastOffset\":1000}]}]}")
+                            
"\"topicName\":\"topic\",\"partitions\":[{\"partitionIndex\":1,\"replicaEpoch\":1,"
 +
+                            
"\"replicaId\":1,\"replicaDirectoryId\":\"AAAAAAAAAAAAAAAAAAAAAQ\"," +
+                            
"\"voterDirectoryId\":\"AAAAAAAAAAAAAAAAAAAAAQ\",\"lastOffsetEpoch\":1000,\"lastOffset\":1000}]}]}"),
+                Arguments.of((short) 2,
+                        
"{\"clusterId\":\"I4ZmrWqfT2e-upky_4fdPA\",\"voterId\":2,\"topics\":[{" +
+                            
"\"topicName\":\"topic\",\"partitions\":[{\"partitionIndex\":1,\"replicaEpoch\":1,"
 +
+                            
"\"replicaId\":1,\"replicaDirectoryId\":\"AAAAAAAAAAAAAAAAAAAAAQ\"," +
+                            
"\"voterDirectoryId\":\"AAAAAAAAAAAAAAAAAAAAAQ\",\"lastOffsetEpoch\":1000,\"lastOffset\":1000,"
 +
+                            "\"preVote\":false}]}]}")

Review Comment:
   Let's test the case when prevote is true.



##########
raft/src/test/java/org/apache/kafka/raft/RaftUtilTest.java:
##########
@@ -187,13 +187,19 @@ private static Stream<Arguments> voteRequestTestCases() {
         return Stream.of(
                 Arguments.of((short) 0,
                         
"{\"clusterId\":\"I4ZmrWqfT2e-upky_4fdPA\",\"topics\":[{\"topicName\":\"topic\","
 +
-                            
"\"partitions\":[{\"partitionIndex\":1,\"candidateEpoch\":1,\"candidateId\":1," 
+
+                            
"\"partitions\":[{\"partitionIndex\":1,\"replicaEpoch\":1,\"replicaId\":1," +
                             
"\"lastOffsetEpoch\":1000,\"lastOffset\":1000}]}]}"),
                 Arguments.of((short) 1,
                         
"{\"clusterId\":\"I4ZmrWqfT2e-upky_4fdPA\",\"voterId\":2,\"topics\":[{" +
-                            
"\"topicName\":\"topic\",\"partitions\":[{\"partitionIndex\":1,\"candidateEpoch\":1,"
 +
-                            
"\"candidateId\":1,\"candidateDirectoryId\":\"AAAAAAAAAAAAAAAAAAAAAQ\"," +
-                            
"\"voterDirectoryId\":\"AAAAAAAAAAAAAAAAAAAAAQ\",\"lastOffsetEpoch\":1000,\"lastOffset\":1000}]}]}")
+                            
"\"topicName\":\"topic\",\"partitions\":[{\"partitionIndex\":1,\"replicaEpoch\":1,"
 +
+                            
"\"replicaId\":1,\"replicaDirectoryId\":\"AAAAAAAAAAAAAAAAAAAAAQ\"," +
+                            
"\"voterDirectoryId\":\"AAAAAAAAAAAAAAAAAAAAAQ\",\"lastOffsetEpoch\":1000,\"lastOffset\":1000}]}]}"),
+                Arguments.of((short) 2,
+                        
"{\"clusterId\":\"I4ZmrWqfT2e-upky_4fdPA\",\"voterId\":2,\"topics\":[{" +
+                            
"\"topicName\":\"topic\",\"partitions\":[{\"partitionIndex\":1,\"replicaEpoch\":1,"
 +
+                            
"\"replicaId\":1,\"replicaDirectoryId\":\"AAAAAAAAAAAAAAAAAAAAAQ\"," +
+                            
"\"voterDirectoryId\":\"AAAAAAAAAAAAAAAAAAAAAQ\",\"lastOffsetEpoch\":1000,\"lastOffset\":1000,"
 +

Review Comment:
   Hmm. Outside the scope of this PR but these examples only test the case when 
the directory id is not set or zero.



##########
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##########
@@ -867,6 +878,10 @@ private boolean handleVoteResponse(
         VoteResponseData.PartitionData partitionResponse =
             response.topics().get(0).partitions().get(0);
 
+        if (partitionResponse.preVote()) {
+            throw new UnsupportedOperationException("PreVote=true responses 
are not supported yet");
+        }

Review Comment:
   I would remove this. If the replica is not sending preVote set to true then 
it should just assume that the response will not have the preVote field set to 
true.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to