jsancio commented on code in PR #15859:
URL: https://github.com/apache/kafka/pull/15859#discussion_r1591319255


##########
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##########
@@ -1700,16 +1710,16 @@ private void handleResponse(RaftResponse.Inbound 
response, long currentTimeMs) {
     }
 
     /**
-     * Validate a request which is only valid between voters. If an error is
-     * present in the returned value, it should be returned in the response.
+     * Validate common state for requests to establish leadership.
+     *
+     * These include the Vote, BeginQuorumEpoch rnd EndQuorumEpoch RPCs. If an 
error is present in
+     * the returned value, it should be returned in the response.
      */
     private Optional<Errors> validateVoterOnlyRequest(int remoteNodeId, int 
requestEpoch) {
         if (requestEpoch < quorum.epoch()) {
             return Optional.of(Errors.FENCED_LEADER_EPOCH);
         } else if (remoteNodeId < 0) {
             return Optional.of(Errors.INVALID_REQUEST);
-        } else if (quorum.isObserver() || !quorum.isVoter(remoteNodeId)) {
-            return Optional.of(Errors.INCONSISTENT_VOTER_SET);

Review Comment:
   In KIP-853, `INCONSISTEN_VOTER_SET` is deprecated and replicas will not 
return this error anymore.
   
   In this case replicas that think they are observer need to be allowed to 
vote if the leader thinks they are voters. This can happen if a voter is added 
to the set of voters right before an election cycle and the VotersRecord has 
been replicated to the new voter.



##########
raft/src/main/java/org/apache/kafka/raft/QuorumState.java:
##########
@@ -112,45 +120,30 @@ public void initialize(OffsetAndEpoch 
logEndOffsetAndEpoch) throws IllegalStateE
         // when we send Vote or BeginEpoch requests.
 
         ElectionState election;
-        try {
-            election = store.readElectionState();
-            if (election == null) {
-                election = ElectionState.withUnknownLeader(0, voters);
-            }
-        } catch (final UncheckedIOException e) {
-            // For exceptions during state file loading (missing or not 
readable),
-            // we could assume the file is corrupted already and should be 
cleaned up.
-            log.warn("Clearing local quorum state store after error loading 
state {}",
-                store, e);
-            store.clear();
-            election = ElectionState.withUnknownLeader(0, voters);
-        }
+        election = store
+            .readElectionState()
+            .orElseGet(() -> ElectionState.withUnknownLeader(0, 
latestVoterSet.get().voterIds()));
 
         final EpochState initialState;
-        if (!election.voters().isEmpty() && !voters.equals(election.voters())) 
{
-            throw new IllegalStateException("Configured voter set: " + voters
-                + " is different from the voter set read from the state file: 
" + election.voters()
-                + ". Check if the quorum configuration is up to date, "
-                + "or wipe out the local state file if necessary");
-        } else if (election.hasVoted() && !isVoter()) {
-            String localIdDescription = localId.isPresent() ?
-                localId.getAsInt() + " is not a voter" :
-                "is undefined";
-            throw new IllegalStateException("Initialized quorum state " + 
election
-                + " with a voted candidate, which indicates this node was 
previously "
-                + " a voter, but the local id " + localIdDescription);

Review Comment:
   In KIP-853, replicas that think they are observer need to be allowed to vote 
if the leader thinks they are voters. This can happen if a voter is added to 
the set of voters right before an election cycle and the VotersRecord has been 
replicated to the new voter.



##########
raft/src/main/java/org/apache/kafka/raft/QuorumState.java:
##########
@@ -336,40 +346,54 @@ public void transitionToUnattached(int epoch) {
      */
     public void transitionToVoted(
         int epoch,
-        int candidateId
+        ReplicaKey candidateKey
     ) {
-        if (localId.isPresent() && candidateId == localId.getAsInt()) {
-            throw new IllegalStateException("Cannot transition to Voted with 
votedId=" + candidateId +
-                " and epoch=" + epoch + " since it matches the local 
broker.id");
-        } else if (isObserver()) {
-            throw new IllegalStateException("Cannot transition to Voted with 
votedId=" + candidateId +
-                " and epoch=" + epoch + " since the local broker.id=" + 
localId + " is not a voter");
-        } else if (!isVoter(candidateId)) {
-            throw new IllegalStateException("Cannot transition to Voted with 
voterId=" + candidateId +
-                " and epoch=" + epoch + " since it is not one of the voters " 
+ voters);
-        }

Review Comment:
   In KIP-853, replicas that think they are observer need to be allowed to vote 
if the leader thinks they are voters. Similarly, replicas need to be allowed to 
vote for replicas even if they think the candidate is not a voter.
   
   In short, only the local set of voters is consider when transitioning to the 
candidate state.



##########
raft/src/main/java/org/apache/kafka/raft/QuorumState.java:
##########
@@ -379,16 +403,11 @@ public void transitionToFollower(
         int epoch,
         int leaderId
     ) {
+        int currentEpoch = state.epoch();
         if (localId.isPresent() && leaderId == localId.getAsInt()) {
             throw new IllegalStateException("Cannot transition to Follower 
with leaderId=" + leaderId +
                 " and epoch=" + epoch + " since it matches the local 
broker.id=" + localId);
-        } else if (!isVoter(leaderId)) {
-            throw new IllegalStateException("Cannot transition to Follower 
with leaderId=" + leaderId +
-                " and epoch=" + epoch + " since it is not one of the voters " 
+ voters);
-        }

Review Comment:
   In KIP-853, the leader may not be in the local set of voters. This can 
happen if there is an election right after a replica has been added to the set 
of voters and that state has not been replicated to all of the replicas.



##########
raft/src/main/java/org/apache/kafka/raft/QuorumState.java:
##########
@@ -112,45 +120,30 @@ public void initialize(OffsetAndEpoch 
logEndOffsetAndEpoch) throws IllegalStateE
         // when we send Vote or BeginEpoch requests.
 
         ElectionState election;
-        try {
-            election = store.readElectionState();
-            if (election == null) {
-                election = ElectionState.withUnknownLeader(0, voters);
-            }
-        } catch (final UncheckedIOException e) {
-            // For exceptions during state file loading (missing or not 
readable),
-            // we could assume the file is corrupted already and should be 
cleaned up.
-            log.warn("Clearing local quorum state store after error loading 
state {}",
-                store, e);
-            store.clear();
-            election = ElectionState.withUnknownLeader(0, voters);
-        }
+        election = store
+            .readElectionState()
+            .orElseGet(() -> ElectionState.withUnknownLeader(0, 
latestVoterSet.get().voterIds()));
 
         final EpochState initialState;
-        if (!election.voters().isEmpty() && !voters.equals(election.voters())) 
{
-            throw new IllegalStateException("Configured voter set: " + voters
-                + " is different from the voter set read from the state file: 
" + election.voters()
-                + ". Check if the quorum configuration is up to date, "
-                + "or wipe out the local state file if necessary");

Review Comment:
   In KIP-853, this check is not useful since the set of voters can change and 
it is stored in the partition log segments and snapshot.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to