jsancio commented on code in PR #15859: URL: https://github.com/apache/kafka/pull/15859#discussion_r1591319255
########## raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java: ########## @@ -1700,16 +1710,16 @@ private void handleResponse(RaftResponse.Inbound response, long currentTimeMs) { } /** - * Validate a request which is only valid between voters. If an error is - * present in the returned value, it should be returned in the response. + * Validate common state for requests to establish leadership. + * + * These include the Vote, BeginQuorumEpoch rnd EndQuorumEpoch RPCs. If an error is present in + * the returned value, it should be returned in the response. */ private Optional<Errors> validateVoterOnlyRequest(int remoteNodeId, int requestEpoch) { if (requestEpoch < quorum.epoch()) { return Optional.of(Errors.FENCED_LEADER_EPOCH); } else if (remoteNodeId < 0) { return Optional.of(Errors.INVALID_REQUEST); - } else if (quorum.isObserver() || !quorum.isVoter(remoteNodeId)) { - return Optional.of(Errors.INCONSISTENT_VOTER_SET); Review Comment: In KIP-853, `INCONSISTEN_VOTER_SET` is deprecated and replicas will not return this error anymore. In this case replicas that think they are observer need to be allowed to vote if the leader thinks they are voters. This can happen if a voter is added to the set of voters right before an election cycle and the VotersRecord has been replicated to the new voter. ########## raft/src/main/java/org/apache/kafka/raft/QuorumState.java: ########## @@ -112,45 +120,30 @@ public void initialize(OffsetAndEpoch logEndOffsetAndEpoch) throws IllegalStateE // when we send Vote or BeginEpoch requests. ElectionState election; - try { - election = store.readElectionState(); - if (election == null) { - election = ElectionState.withUnknownLeader(0, voters); - } - } catch (final UncheckedIOException e) { - // For exceptions during state file loading (missing or not readable), - // we could assume the file is corrupted already and should be cleaned up. - log.warn("Clearing local quorum state store after error loading state {}", - store, e); - store.clear(); - election = ElectionState.withUnknownLeader(0, voters); - } + election = store + .readElectionState() + .orElseGet(() -> ElectionState.withUnknownLeader(0, latestVoterSet.get().voterIds())); final EpochState initialState; - if (!election.voters().isEmpty() && !voters.equals(election.voters())) { - throw new IllegalStateException("Configured voter set: " + voters - + " is different from the voter set read from the state file: " + election.voters() - + ". Check if the quorum configuration is up to date, " - + "or wipe out the local state file if necessary"); - } else if (election.hasVoted() && !isVoter()) { - String localIdDescription = localId.isPresent() ? - localId.getAsInt() + " is not a voter" : - "is undefined"; - throw new IllegalStateException("Initialized quorum state " + election - + " with a voted candidate, which indicates this node was previously " - + " a voter, but the local id " + localIdDescription); Review Comment: In KIP-853, replicas that think they are observer need to be allowed to vote if the leader thinks they are voters. This can happen if a voter is added to the set of voters right before an election cycle and the VotersRecord has been replicated to the new voter. ########## raft/src/main/java/org/apache/kafka/raft/QuorumState.java: ########## @@ -336,40 +346,54 @@ public void transitionToUnattached(int epoch) { */ public void transitionToVoted( int epoch, - int candidateId + ReplicaKey candidateKey ) { - if (localId.isPresent() && candidateId == localId.getAsInt()) { - throw new IllegalStateException("Cannot transition to Voted with votedId=" + candidateId + - " and epoch=" + epoch + " since it matches the local broker.id"); - } else if (isObserver()) { - throw new IllegalStateException("Cannot transition to Voted with votedId=" + candidateId + - " and epoch=" + epoch + " since the local broker.id=" + localId + " is not a voter"); - } else if (!isVoter(candidateId)) { - throw new IllegalStateException("Cannot transition to Voted with voterId=" + candidateId + - " and epoch=" + epoch + " since it is not one of the voters " + voters); - } Review Comment: In KIP-853, replicas that think they are observer need to be allowed to vote if the leader thinks they are voters. Similarly, replicas need to be allowed to vote for replicas even if they think the candidate is not a voter. In short, only the local set of voters is consider when transitioning to the candidate state. ########## raft/src/main/java/org/apache/kafka/raft/QuorumState.java: ########## @@ -379,16 +403,11 @@ public void transitionToFollower( int epoch, int leaderId ) { + int currentEpoch = state.epoch(); if (localId.isPresent() && leaderId == localId.getAsInt()) { throw new IllegalStateException("Cannot transition to Follower with leaderId=" + leaderId + " and epoch=" + epoch + " since it matches the local broker.id=" + localId); - } else if (!isVoter(leaderId)) { - throw new IllegalStateException("Cannot transition to Follower with leaderId=" + leaderId + - " and epoch=" + epoch + " since it is not one of the voters " + voters); - } Review Comment: In KIP-853, the leader may not be in the local set of voters. This can happen if there is an election right after a replica has been added to the set of voters and that state has not been replicated to all of the replicas. ########## raft/src/main/java/org/apache/kafka/raft/QuorumState.java: ########## @@ -112,45 +120,30 @@ public void initialize(OffsetAndEpoch logEndOffsetAndEpoch) throws IllegalStateE // when we send Vote or BeginEpoch requests. ElectionState election; - try { - election = store.readElectionState(); - if (election == null) { - election = ElectionState.withUnknownLeader(0, voters); - } - } catch (final UncheckedIOException e) { - // For exceptions during state file loading (missing or not readable), - // we could assume the file is corrupted already and should be cleaned up. - log.warn("Clearing local quorum state store after error loading state {}", - store, e); - store.clear(); - election = ElectionState.withUnknownLeader(0, voters); - } + election = store + .readElectionState() + .orElseGet(() -> ElectionState.withUnknownLeader(0, latestVoterSet.get().voterIds())); final EpochState initialState; - if (!election.voters().isEmpty() && !voters.equals(election.voters())) { - throw new IllegalStateException("Configured voter set: " + voters - + " is different from the voter set read from the state file: " + election.voters() - + ". Check if the quorum configuration is up to date, " - + "or wipe out the local state file if necessary"); Review Comment: In KIP-853, this check is not useful since the set of voters can change and it is stored in the partition log segments and snapshot. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org