hachikuji commented on a change in pull request #10393: URL: https://github.com/apache/kafka/pull/10393#discussion_r605812919
########## File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java ########## @@ -592,44 +592,17 @@ private VoteResponseData handleVoteRequest( transitionToUnattached(candidateEpoch); } - final boolean voteGranted; - if (quorum.isLeader()) { - logger.debug("Rejecting vote request {} with epoch {} since we are already leader on that epoch", - request, candidateEpoch); - voteGranted = false; - } else if (quorum.isCandidate()) { - logger.debug("Rejecting vote request {} with epoch {} since we are already candidate on that epoch", - request, candidateEpoch); - voteGranted = false; - } else if (quorum.isResigned()) { - logger.debug("Rejecting vote request {} with epoch {} since we have resigned as candidate/leader in this epoch", - request, candidateEpoch); - voteGranted = false; - } else if (quorum.isFollower()) { - FollowerState state = quorum.followerStateOrThrow(); - logger.debug("Rejecting vote request {} with epoch {} since we already have a leader {} on that epoch", - request, candidateEpoch, state.leaderId()); - voteGranted = false; - } else if (quorum.isVoted()) { - VotedState state = quorum.votedStateOrThrow(); - voteGranted = state.votedId() == candidateId; - - if (!voteGranted) { - logger.debug("Rejecting vote request {} with epoch {} since we already have voted for " + - "another candidate {} on that epoch", request, candidateEpoch, state.votedId()); - } - } else if (quorum.isUnattached()) { + Supplier<Boolean> logComparator = () -> { Review comment: It seems a little simpler to compute this value in all cases and pass it through. We could even consider doing the check up-front and rejecting the vote outright if the epoch/offset is behind us. What do you think? ########## File path: raft/src/main/java/org/apache/kafka/raft/EpochState.java ########## @@ -18,13 +18,22 @@ import java.io.Closeable; import java.util.Optional; +import java.util.function.Supplier; public interface EpochState extends Closeable { default Optional<LogOffsetMetadata> highWatermark() { return Optional.empty(); } + /** + * Decide whether to grant a vote to a candidate, it is the responsibility of the caller to invoke + * {@link QuorumState##transitionToVoted(int, int)} if vote is granted. + * + * @return true If grant vote. + */ + boolean grantVote(int candidateId, Supplier<Boolean> logComparator); Review comment: Since this does not actually result in any state changes, maybe it would be better to name this `canGrantVote` or perhaps `isVoteAllowed`? ########## File path: raft/src/main/java/org/apache/kafka/raft/ResignedState.java ########## @@ -125,6 +131,12 @@ public long remainingElectionTimeMs(long currentTimeMs) { return preferredSuccessors; } + @Override + public boolean grantVote(int candidateId, Supplier<Boolean> logComparator) { + log.debug("Rejecting vote request since we have resigned as candidate/leader in this epoch"); + return false; Review comment: I'm sure there must have been a reason for this in the original implementation, but it still surprised me to see it. I think it makes sense for the resigned leader to help another candidate get elected. I'll file a separate JIRA about this. ########## File path: raft/src/main/java/org/apache/kafka/raft/LeaderState.java ########## @@ -304,6 +305,12 @@ public String toString() { } } + @Override + public boolean grantVote(int candidateId, Supplier<Boolean> logComparator) { + log.debug("Rejecting vote request since we are already leader on that epoch"); Review comment: Maybe we can improve some of these log messages by adding the `candidateId`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org