jsancio commented on code in PR #17807:
URL: https://github.com/apache/kafka/pull/17807#discussion_r1878851307
##########
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##########
@@ -779,39 +781,51 @@ private VoteResponseData handleVoteRequest(
VoteRequestData.PartitionData partitionRequest =
request.topics().get(0).partitions().get(0);
- int candidateId = partitionRequest.candidateId();
- int candidateEpoch = partitionRequest.candidateEpoch();
+ int replicaId = partitionRequest.replicaId();
+ int replicaEpoch = partitionRequest.replicaEpoch();
+ boolean preVote = partitionRequest.preVote();
int lastEpoch = partitionRequest.lastOffsetEpoch();
long lastEpochEndOffset = partitionRequest.lastOffset();
- if (lastEpochEndOffset < 0 || lastEpoch < 0 || lastEpoch >=
candidateEpoch) {
+ boolean isIllegalEpoch = preVote ? lastEpoch > replicaEpoch :
lastEpoch >= replicaEpoch;
+ if (isIllegalEpoch) {
+ logger.info(
+ "Received a vote request from replica {} with illegal epoch {}
and last epoch {}",
+ replicaId,
+ replicaEpoch,
+ lastEpoch
+ );
+ }
+ if (lastEpochEndOffset < 0 || lastEpoch < 0 || isIllegalEpoch) {
return buildVoteResponse(
requestMetadata.listenerName(),
requestMetadata.apiVersion(),
Errors.INVALID_REQUEST,
- false
+ false,
+ preVote
);
}
- Optional<Errors> errorOpt = validateVoterOnlyRequest(candidateId,
candidateEpoch);
+ Optional<Errors> errorOpt = validateVoterOnlyRequest(replicaId,
replicaEpoch);
if (errorOpt.isPresent()) {
return buildVoteResponse(
requestMetadata.listenerName(),
requestMetadata.apiVersion(),
errorOpt.get(),
- false
+ false,
+ preVote
Review Comment:
I am starting to think that we should have removed `preVote` in the response
schema.
##########
raft/src/main/java/org/apache/kafka/raft/EpochState.java:
##########
@@ -26,16 +26,18 @@ default Optional<LogOffsetMetadata> highWatermark() {
}
/**
- * Decide whether to grant a vote to a candidate.
+ * Decide whether to grant a vote to a replica.
*
* It is the responsibility of the caller to invoke
* {@link QuorumState#transitionToUnattachedVotedState(int, ReplicaKey)}
if vote is granted.
*
- * @param candidateKey the id and directory of the candidate
- * @param isLogUpToDate whether the candidate’s log is at least as
up-to-date as receiver’s log
+ * @param replicaKey the id and directory of the replica requesting the
vote
+ * @param isLogUpToDate whether the replica's log is at least as
up-to-date as receiver’s log
* @return true if it can grant the vote, false otherwise
*/
- boolean canGrantVote(ReplicaKey candidateKey, boolean isLogUpToDate);
+ boolean canGrantVote(ReplicaKey replicaKey, boolean isLogUpToDate);
+
+ boolean canGrantPreVote(ReplicaKey replicaKey, boolean isLogUpToDate);
Review Comment:
Let's add a Java Doc. I am interested to see how this method differs from
`canGrantVote`.
Having said that, did you consider having one method with this signature:
`canGrantVote(ReplicaKey, boolean isLogUpdate, boolean isPreVote)`? If yes, why
did you reject this interface change?
##########
raft/src/main/java/org/apache/kafka/raft/QuorumState.java:
##########
@@ -44,7 +44,7 @@
* Follower: After discovering a leader with an equal or larger epoch
*
* Unattached transitions to:
- * Unattached: After learning of a new election with a higher epoch or
after voting
+ * Unattached: After learning of a new election with a higher epoch or
after giving a binding vote
Review Comment:
This applies to the "Unattached transitions" and "Voted transitions" section.
Should we merge this two and the wording since in a previous PR we merge
these two states.
##########
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##########
@@ -779,39 +781,43 @@ private VoteResponseData handleVoteRequest(
VoteRequestData.PartitionData partitionRequest =
request.topics().get(0).partitions().get(0);
- int candidateId = partitionRequest.candidateId();
- int candidateEpoch = partitionRequest.candidateEpoch();
+ int replicaId = partitionRequest.replicaId();
+ int replicaEpoch = partitionRequest.replicaEpoch();
+ boolean preVote = partitionRequest.preVote();
int lastEpoch = partitionRequest.lastOffsetEpoch();
long lastEpochEndOffset = partitionRequest.lastOffset();
- if (lastEpochEndOffset < 0 || lastEpoch < 0 || lastEpoch >=
candidateEpoch) {
+ boolean isIllegalEpoch = preVote ? lastEpoch > replicaEpoch :
lastEpoch >= replicaEpoch;
Review Comment:
Can you write a comment explaining this check/boolean?
##########
raft/src/main/java/org/apache/kafka/raft/FollowerState.java:
##########
@@ -46,6 +46,7 @@ public class FollowerState implements EpochState {
private final Timer updateVoterPeriodTimer;
private final Logger log;
+ private boolean hasFetchedFromLeader;
Review Comment:
Maybe move this right below `fetchTimer`. Should be useful to document this
field a bit.
##########
raft/src/main/java/org/apache/kafka/raft/QuorumState.java:
##########
@@ -641,8 +641,11 @@ private int randomElectionTimeoutMs() {
return electionTimeoutMs + random.nextInt(electionTimeoutMs);
}
- public boolean canGrantVote(ReplicaKey candidateKey, boolean
isLogUpToDate) {
- return state.canGrantVote(candidateKey, isLogUpToDate);
+ public boolean canGrantVote(ReplicaKey replicaKey, boolean isLogUpToDate,
boolean isPreVote) {
+ if (isPreVote) {
+ return state.canGrantPreVote(replicaKey, isLogUpToDate);
+ }
+ return state.canGrantVote(replicaKey, isLogUpToDate);
Review Comment:
Minor and feel free to ignore but is this more readable?
```java
public boolean canGrantVote(ReplicaKey replicaKey, boolean
isLogUpToDate, boolean isPreVote) {
return isPreVote ?
state.canGrantPreVote(replicaKey, isLogUpToDate) :
state.canGrantVote(replicaKey, isLogUpToDate);
```
##########
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##########
@@ -822,30 +836,36 @@ private VoteResponseData handleVoteRequest(
requestMetadata.listenerName(),
requestMetadata.apiVersion(),
Errors.INVALID_VOTER_KEY,
- false
+ false,
+ preVote
);
}
OffsetAndEpoch lastEpochEndOffsetAndEpoch = new
OffsetAndEpoch(lastEpochEndOffset, lastEpoch);
- ReplicaKey candidateKey = ReplicaKey.of(
- candidateId,
- partitionRequest.candidateDirectoryId()
+ ReplicaKey replicaKey = ReplicaKey.of(
+ replicaId,
+ partitionRequest.replicaDirectoryId()
);
boolean voteGranted = quorum.canGrantVote(
- candidateKey,
- lastEpochEndOffsetAndEpoch.compareTo(endOffset()) >= 0
+ replicaKey,
+ lastEpochEndOffsetAndEpoch.compareTo(endOffset()) >= 0,
+ preVote
);
- if (voteGranted && quorum.isUnattachedNotVoted()) {
- transitionToUnattachedVoted(candidateKey, candidateEpoch);
+ if (!preVote && voteGranted && quorum.isUnattachedNotVoted()) {
+ transitionToUnattachedVoted(replicaKey, replicaEpoch);
}
- logger.info("Vote request {} with epoch {} is {}", request,
candidateEpoch, voteGranted ? "granted" : "rejected");
+ logger.info("Vote request {} with epoch {} is {}",
+ request,
+ replicaEpoch,
+ voteGranted ? "granted" : "rejected");
Review Comment:
How about this formatting:
```java
logger.info(
"Vote request {} with epoch {} is {}",
request,
replicaEpoch,
voteGranted ? "granted" : "rejected"
);
```
##########
raft/src/main/java/org/apache/kafka/raft/ResignedState.java:
##########
@@ -140,10 +140,21 @@ public List<ReplicaKey> preferredSuccessors() {
}
@Override
- public boolean canGrantVote(ReplicaKey candidateKey, boolean
isLogUpToDate) {
+ public boolean canGrantVote(ReplicaKey replicaKey, boolean isLogUpToDate) {
log.debug(
- "Rejecting vote request from candidate ({}) since we have resigned
as candidate/leader in epoch {}",
- candidateKey,
+ "Rejecting Vote request from candidate ({}) since we have resigned
as leader in epoch {}",
+ replicaKey,
+ epoch
+ );
+
+ return false;
+ }
+
+ @Override
+ public boolean canGrantPreVote(ReplicaKey replicaKey, boolean
isLogUpToDate) {
Review Comment:
Did we agree that canGrantPreVote is true in the resigned state if the log
is up to date?
##########
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##########
@@ -779,39 +781,51 @@ private VoteResponseData handleVoteRequest(
VoteRequestData.PartitionData partitionRequest =
request.topics().get(0).partitions().get(0);
- int candidateId = partitionRequest.candidateId();
- int candidateEpoch = partitionRequest.candidateEpoch();
+ int replicaId = partitionRequest.replicaId();
+ int replicaEpoch = partitionRequest.replicaEpoch();
+ boolean preVote = partitionRequest.preVote();
int lastEpoch = partitionRequest.lastOffsetEpoch();
long lastEpochEndOffset = partitionRequest.lastOffset();
- if (lastEpochEndOffset < 0 || lastEpoch < 0 || lastEpoch >=
candidateEpoch) {
+ boolean isIllegalEpoch = preVote ? lastEpoch > replicaEpoch :
lastEpoch >= replicaEpoch;
+ if (isIllegalEpoch) {
+ logger.info(
+ "Received a vote request from replica {} with illegal epoch {}
and last epoch {}",
+ replicaId,
+ replicaEpoch,
+ lastEpoch
Review Comment:
We should also log the value of `preVote`. This is important to determine if
it `isIllegalEpoch`.
##########
raft/src/main/java/org/apache/kafka/raft/FollowerState.java:
##########
@@ -202,16 +205,34 @@ public void
setFetchingSnapshot(Optional<RawSnapshotWriter> newSnapshot) {
}
@Override
- public boolean canGrantVote(ReplicaKey candidateKey, boolean
isLogUpToDate) {
+ public boolean canGrantVote(ReplicaKey replicaKey, boolean isLogUpToDate) {
log.debug(
"Rejecting vote request from candidate ({}) since we already have
a leader {} in epoch {}",
- candidateKey,
+ replicaKey,
leaderId,
epoch
);
return false;
}
+ @Override
+ public boolean canGrantPreVote(ReplicaKey replicaKey, boolean
isLogUpToDate) {
+ if (hasFetchedFromLeader) {
+ log.debug(
+ "Rejecting PreVote request from replica ({}) since we already
have a leader {} in epoch {}",
+ replicaKey,
+ leaderId,
+ epoch
+ );
+ return false;
+ } else if (!isLogUpToDate) {
+ log.debug(
+ "Rejecting PreVote request from replica ({}) since replica
epoch/offset is not up to date with us",
+ replicaKey);
+ }
+ return isLogUpToDate;
+ }
Review Comment:
This method is a lot of lines because we are trying to generate a very
specific log message. I don't think we need that since debug message are for
developers. How about:
```java
boolean granting = !hasFetchedFromLeader && isLogUpToDate;
if (!granting) {
log.debug(
"Rejecting PreVote request from replica ({}) since leader
{}, epoch is {}, isLogUpToDate is {} and hasFetched is {}",
replicaKey,
leaderId,
epoch,
isLogUpToDate,
hasFetchedFromLeader
);
}
return granting;
}
```
##########
raft/src/main/java/org/apache/kafka/raft/RaftUtil.java:
##########
@@ -190,19 +192,24 @@ public static VoteResponseData singletonVoteResponse(
int leaderEpoch,
int leaderId,
boolean voteGranted,
+ boolean preVote,
Endpoints endpoints
) {
+ VoteResponseData.PartitionData partitionData = new
VoteResponseData.PartitionData()
+ .setErrorCode(partitionLevelError.code())
+ .setLeaderId(leaderId)
+ .setLeaderEpoch(leaderEpoch)
+ .setVoteGranted(voteGranted);
+ if (apiVersion >= 2) {
+ partitionData.setPreVote(preVote);
+ }
Review Comment:
I am pretty convinced that we should remove the preVote field from the
response. What do you think?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]