jsancio commented on code in PR #17807:
URL: https://github.com/apache/kafka/pull/17807#discussion_r1848488508
##########
clients/src/main/resources/common/message/VoteRequest.json:
##########
@@ -34,18 +35,20 @@
"versions": "0+", "fields": [
{ "name": "PartitionIndex", "type": "int32", "versions": "0+",
"about": "The partition index." },
- { "name": "CandidateEpoch", "type": "int32", "versions": "0+",
- "about": "The bumped epoch of the candidate sending the
request"},
- { "name": "CandidateId", "type": "int32", "versions": "0+",
"entityType": "brokerId",
+ { "name": "ReplicaEpoch", "type": "int32", "versions": "0+",
+ "about": "The epoch of the replica sending the request"},
Review Comment:
How about "The epoch of the voter sending the request"? This matches the
wording for `ReplicaId` and `ReplicaDirectoryId`.
##########
clients/src/main/resources/common/message/VoteRequest.json:
##########
@@ -34,18 +35,20 @@
"versions": "0+", "fields": [
{ "name": "PartitionIndex", "type": "int32", "versions": "0+",
"about": "The partition index." },
- { "name": "CandidateEpoch", "type": "int32", "versions": "0+",
- "about": "The bumped epoch of the candidate sending the
request"},
- { "name": "CandidateId", "type": "int32", "versions": "0+",
"entityType": "brokerId",
+ { "name": "ReplicaEpoch", "type": "int32", "versions": "0+",
+ "about": "The epoch of the replica sending the request"},
+ { "name": "ReplicaId", "type": "int32", "versions": "0+",
"entityType": "brokerId",
"about": "The replica id of the voter sending the request"},
- { "name": "CandidateDirectoryId", "type": "uuid", "versions":
"1+", "ignorable": true,
+ { "name": "ReplicaDirectoryId", "type": "uuid", "versions": "1+",
"ignorable": true,
"about": "The directory id of the voter sending the request" },
{ "name": "VoterDirectoryId", "type": "uuid", "versions": "1+",
"ignorable": true,
- "about": "The ID of the voter sending the request"},
+ "about": "The directory id of the voter receiving the request"},
{ "name": "LastOffsetEpoch", "type": "int32", "versions": "0+",
"about": "The epoch of the last record written to the metadata
log"},
{ "name": "LastOffset", "type": "int64", "versions": "0+",
- "about": "The offset of the last record written to the metadata
log"}
+ "about": "The offset of the last record written to the metadata
log"},
+ { "name": "PreVote", "type": "bool", "versions": "2+",
+ "about": "Whether the request is a PreVote request (no epoch
increase) or not."}
Review Comment:
I don't think "increasing the epoch" is the important invariant of this type
of request. The important invariant, is that votes for pre-vote request are not
persistent and the replica giving their vote can pre-vote for multiple
different replicas in the same epoch.
##########
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##########
@@ -779,39 +781,43 @@ private VoteResponseData handleVoteRequest(
VoteRequestData.PartitionData partitionRequest =
request.topics().get(0).partitions().get(0);
- int candidateId = partitionRequest.candidateId();
- int candidateEpoch = partitionRequest.candidateEpoch();
+ int replicaId = partitionRequest.replicaId();
+ int replicaEpoch = partitionRequest.replicaEpoch();
+ boolean preVote = partitionRequest.preVote();
int lastEpoch = partitionRequest.lastOffsetEpoch();
long lastEpochEndOffset = partitionRequest.lastOffset();
- if (lastEpochEndOffset < 0 || lastEpoch < 0 || lastEpoch >=
candidateEpoch) {
+ boolean isIllegalEpoch = preVote ? lastEpoch > replicaEpoch :
lastEpoch >= replicaEpoch;
Review Comment:
I see. Let's write a comment explaining this check.
This is my understanding. When the replica is sending a normal vote request
the replica's epoch must be greater than the largest epoch on their log. This
is true because the candidate replica will always request a vote for an epoch
for which there is no previously known leader, or record in the log.
When the pre-vote flag is true then there may be a known leader for the last
epoch so there may be a record in the log for that epoch.
Both of these cases should be rare and indicate a programming error in the
remote replica. Let's log an INFO message with all of the relevant information
when this condition is true.
##########
clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java:
##########
@@ -1181,7 +1181,7 @@ private AbstractResponse getResponse(ApiKeys apikey,
short version) {
case ALTER_CLIENT_QUOTAS: return createAlterClientQuotasResponse();
case DESCRIBE_USER_SCRAM_CREDENTIALS: return
createDescribeUserScramCredentialsResponse();
case ALTER_USER_SCRAM_CREDENTIALS: return
createAlterUserScramCredentialsResponse();
- case VOTE: return createVoteResponse();
+ case VOTE: return createVoteResponse(version);
case BEGIN_QUORUM_EPOCH: return createBeginQuorumEpochResponse();
case END_QUORUM_EPOCH: return createEndQuorumEpochResponse();
case DESCRIBE_QUORUM: return createDescribeQuorumResponse();
Review Comment:
I see. I haven't looked at the rest of these tests in detail but should we
create an issue to test the different versions of the responses for the KRaft
specific RPCs?
##########
raft/src/main/java/org/apache/kafka/raft/EpochState.java:
##########
@@ -26,16 +26,16 @@ default Optional<LogOffsetMetadata> highWatermark() {
}
/**
- * Decide whether to grant a vote to a candidate.
+ * Decide whether to grant a vote to a replica.
*
* It is the responsibility of the caller to invoke
* {@link QuorumState#transitionToUnattachedVotedState(int, ReplicaKey)}
if vote is granted.
*
- * @param candidateKey the id and directory of the candidate
- * @param isLogUpToDate whether the candidate’s log is at least as
up-to-date as receiver’s log
+ * @param replicaKey the id and directory of the replica requesting the
vote
+ * @param isLogUpToDate whether the replica's log is at least as
up-to-date as receiver’s log
* @return true if it can grant the vote, false otherwise
*/
- boolean canGrantVote(ReplicaKey candidateKey, boolean isLogUpToDate);
+ boolean canGrantVote(ReplicaKey replicaKey, boolean isLogUpToDate);
Review Comment:
The implementation of this method is not correct for all cases. The KIP says
the following:
> When servers receive VoteRequests with the PreVote field set to true, they
will respond with VoteGranted set to
> . true if they are not a Follower and the epoch and offsets in the
Pre-Vote request satisfy the same requirements as a standard vote
> . false if otherwise
I know that your PR description says the following:
> Not included in this PR are changes to Follower state's canGrantVote. This
may need to change in later PreVote PR to address a ping-pong scenario brought
up by Jack Vanlightly. (ex: 3 node quorum, leader node A disconnects from
quorum, node B goes into prospective state first before node C, node B sends
prevote request to node C still in follower state and receives back that node A
is leader, node B transitions to follower while node C transitions to
prospective after election timeout, repeating this cycle) One way to tackle
this is by having follower state nodes grant prevote requests based off last
fetch time, which would involve changing canGrantVote, but this makes more
sense to tackle after the Prospective state exists.
But I think we need to implement and test this logic in this PR. Else the
handling of this RPC is not correct for all software versions that support this
RPC.
##########
raft/src/test/java/org/apache/kafka/raft/RaftUtilTest.java:
##########
@@ -187,13 +187,19 @@ private static Stream<Arguments> voteRequestTestCases() {
return Stream.of(
Arguments.of((short) 0,
"{\"clusterId\":\"I4ZmrWqfT2e-upky_4fdPA\",\"topics\":[{\"topicName\":\"topic\","
+
-
"\"partitions\":[{\"partitionIndex\":1,\"candidateEpoch\":1,\"candidateId\":1,"
+
+
"\"partitions\":[{\"partitionIndex\":1,\"replicaEpoch\":1,\"replicaId\":1," +
"\"lastOffsetEpoch\":1000,\"lastOffset\":1000}]}]}"),
Arguments.of((short) 1,
"{\"clusterId\":\"I4ZmrWqfT2e-upky_4fdPA\",\"voterId\":2,\"topics\":[{" +
-
"\"topicName\":\"topic\",\"partitions\":[{\"partitionIndex\":1,\"candidateEpoch\":1,"
+
-
"\"candidateId\":1,\"candidateDirectoryId\":\"AAAAAAAAAAAAAAAAAAAAAQ\"," +
-
"\"voterDirectoryId\":\"AAAAAAAAAAAAAAAAAAAAAQ\",\"lastOffsetEpoch\":1000,\"lastOffset\":1000}]}]}")
+
"\"topicName\":\"topic\",\"partitions\":[{\"partitionIndex\":1,\"replicaEpoch\":1,"
+
+
"\"replicaId\":1,\"replicaDirectoryId\":\"AAAAAAAAAAAAAAAAAAAAAQ\"," +
+
"\"voterDirectoryId\":\"AAAAAAAAAAAAAAAAAAAAAQ\",\"lastOffsetEpoch\":1000,\"lastOffset\":1000}]}]}"),
+ Arguments.of((short) 2,
+
"{\"clusterId\":\"I4ZmrWqfT2e-upky_4fdPA\",\"voterId\":2,\"topics\":[{" +
+
"\"topicName\":\"topic\",\"partitions\":[{\"partitionIndex\":1,\"replicaEpoch\":1,"
+
+
"\"replicaId\":1,\"replicaDirectoryId\":\"AAAAAAAAAAAAAAAAAAAAAQ\"," +
+
"\"voterDirectoryId\":\"AAAAAAAAAAAAAAAAAAAAAQ\",\"lastOffsetEpoch\":1000,\"lastOffset\":1000,"
+
+ "\"preVote\":false}]}]}")
Review Comment:
Let's test the case when prevote is true.
##########
raft/src/test/java/org/apache/kafka/raft/RaftUtilTest.java:
##########
@@ -187,13 +187,19 @@ private static Stream<Arguments> voteRequestTestCases() {
return Stream.of(
Arguments.of((short) 0,
"{\"clusterId\":\"I4ZmrWqfT2e-upky_4fdPA\",\"topics\":[{\"topicName\":\"topic\","
+
-
"\"partitions\":[{\"partitionIndex\":1,\"candidateEpoch\":1,\"candidateId\":1,"
+
+
"\"partitions\":[{\"partitionIndex\":1,\"replicaEpoch\":1,\"replicaId\":1," +
"\"lastOffsetEpoch\":1000,\"lastOffset\":1000}]}]}"),
Arguments.of((short) 1,
"{\"clusterId\":\"I4ZmrWqfT2e-upky_4fdPA\",\"voterId\":2,\"topics\":[{" +
-
"\"topicName\":\"topic\",\"partitions\":[{\"partitionIndex\":1,\"candidateEpoch\":1,"
+
-
"\"candidateId\":1,\"candidateDirectoryId\":\"AAAAAAAAAAAAAAAAAAAAAQ\"," +
-
"\"voterDirectoryId\":\"AAAAAAAAAAAAAAAAAAAAAQ\",\"lastOffsetEpoch\":1000,\"lastOffset\":1000}]}]}")
+
"\"topicName\":\"topic\",\"partitions\":[{\"partitionIndex\":1,\"replicaEpoch\":1,"
+
+
"\"replicaId\":1,\"replicaDirectoryId\":\"AAAAAAAAAAAAAAAAAAAAAQ\"," +
+
"\"voterDirectoryId\":\"AAAAAAAAAAAAAAAAAAAAAQ\",\"lastOffsetEpoch\":1000,\"lastOffset\":1000}]}]}"),
+ Arguments.of((short) 2,
+
"{\"clusterId\":\"I4ZmrWqfT2e-upky_4fdPA\",\"voterId\":2,\"topics\":[{" +
+
"\"topicName\":\"topic\",\"partitions\":[{\"partitionIndex\":1,\"replicaEpoch\":1,"
+
+
"\"replicaId\":1,\"replicaDirectoryId\":\"AAAAAAAAAAAAAAAAAAAAAQ\"," +
+
"\"voterDirectoryId\":\"AAAAAAAAAAAAAAAAAAAAAQ\",\"lastOffsetEpoch\":1000,\"lastOffset\":1000,"
+
Review Comment:
Hmm. Outside the scope of this PR but these examples only test the case when
the directory id is not set or zero.
##########
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##########
@@ -867,6 +878,10 @@ private boolean handleVoteResponse(
VoteResponseData.PartitionData partitionResponse =
response.topics().get(0).partitions().get(0);
+ if (partitionResponse.preVote()) {
+ throw new UnsupportedOperationException("PreVote=true responses
are not supported yet");
+ }
Review Comment:
I would remove this. If the replica is not sending preVote set to true then
it should just assume that the response will not have the preVote field set to
true.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]