jsancio commented on code in PR #18240:
URL: https://github.com/apache/kafka/pull/18240#discussion_r1899778973
##########
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##########
@@ -985,36 +982,54 @@ private boolean handleVoteResponse(
maybeTransitionForward(state, currentTimeMs);
} else {
state.recordRejectedVote(remoteNodeId);
- maybeCandidateStartBackingOff(currentTimeMs);
+ maybeHandleElectionLoss(currentTimeMs);
}
} else {
- logger.debug("Ignoring vote response {} since we are no longer
a VotingState " +
- "(Prospective or Candidate) in epoch {}",
- partitionResponse, quorum.epoch());
+ logger.debug(
+ "Ignoring vote response {} since we are no longer a
NomineeState " +
+ "(Prospective or Candidate) in epoch {}",
+ partitionResponse,
+ quorum.epoch()
+ );
}
return true;
} else {
return handleUnexpectedError(error, responseMetadata);
}
}
- private void maybeCandidateStartBackingOff(long currentTimeMs) {
- // If in candidate state and vote is rejected, go immediately to a
random, exponential backoff. The
- // backoff starts low to prevent needing to wait the entire election
timeout when the vote
- // result has already been determined. The randomness prevents the
next election from being
- // gridlocked with another nominee due to timing. The exponential
aspect limits epoch churn when
- // the replica has failed multiple elections in succession.
+ /**
+ * On election loss, if replica is prospective it will transition to
unattached or follower state.
+ * If replica is candidate, it will start backing off.
+ */
+ private void maybeHandleElectionLoss(long currentTimeMs) {
if (quorum.isCandidate()) {
CandidateState candidate = quorum.candidateStateOrThrow();
if (candidate.epochElection().isVoteRejected() &&
!candidate.isBackingOff()) {
- logger.info("Insufficient remaining votes to become leader
(rejected by {}). " +
- "We will backoff before retrying election again",
candidate.epochElection().rejectingVoters());
-
+ logger.info(
+ "Insufficient remaining votes to become leader (rejected
by {}). " +
+ "We will backoff before retrying election again",
+ candidate.epochElection().rejectingVoters()
+ );
+ // Go immediately to a random, exponential backoff. The
backoff starts low to prevent
+ // needing to wait the entire election timeout when the vote
result has already been
+ // determined. The randomness prevents the next election from
being gridlocked with
+ // another nominee due to timing. The exponential aspect
limits epoch churn when the
+ // replica has failed multiple elections in succession.
candidate.startBackingOff(
currentTimeMs,
binaryExponentialElectionBackoffMs(candidate.retries())
);
}
+ } else if (quorum.isProspective()) {
+ ProspectiveState prospective = quorum.prospectiveStateOrThrow();
+ if (prospective.epochElection().isVoteRejected()) {
+ logger.info(
+ "Insufficient remaining votes to become candidate
(rejected by {}). ",
+ prospective.epochElection().rejectingVoters()
+ );
+ prospectiveTransitionAfterElectionLoss(prospective,
currentTimeMs);
+ }
}
Review Comment:
Let's add an else case and throw an illegal state exception.
##########
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##########
@@ -985,36 +982,54 @@ private boolean handleVoteResponse(
maybeTransitionForward(state, currentTimeMs);
} else {
state.recordRejectedVote(remoteNodeId);
- maybeCandidateStartBackingOff(currentTimeMs);
+ maybeHandleElectionLoss(currentTimeMs);
}
} else {
- logger.debug("Ignoring vote response {} since we are no longer
a VotingState " +
- "(Prospective or Candidate) in epoch {}",
- partitionResponse, quorum.epoch());
+ logger.debug(
+ "Ignoring vote response {} since we are no longer a
NomineeState " +
+ "(Prospective or Candidate) in epoch {}",
+ partitionResponse,
+ quorum.epoch()
+ );
}
return true;
} else {
return handleUnexpectedError(error, responseMetadata);
}
}
- private void maybeCandidateStartBackingOff(long currentTimeMs) {
- // If in candidate state and vote is rejected, go immediately to a
random, exponential backoff. The
- // backoff starts low to prevent needing to wait the entire election
timeout when the vote
- // result has already been determined. The randomness prevents the
next election from being
- // gridlocked with another nominee due to timing. The exponential
aspect limits epoch churn when
- // the replica has failed multiple elections in succession.
+ /**
+ * On election loss, if replica is prospective it will transition to
unattached or follower state.
+ * If replica is candidate, it will start backing off.
+ */
+ private void maybeHandleElectionLoss(long currentTimeMs) {
if (quorum.isCandidate()) {
CandidateState candidate = quorum.candidateStateOrThrow();
if (candidate.epochElection().isVoteRejected() &&
!candidate.isBackingOff()) {
- logger.info("Insufficient remaining votes to become leader
(rejected by {}). " +
- "We will backoff before retrying election again",
candidate.epochElection().rejectingVoters());
-
+ logger.info(
+ "Insufficient remaining votes to become leader (rejected
by {}). " +
+ "We will backoff before retrying election again",
+ candidate.epochElection().rejectingVoters()
+ );
+ // Go immediately to a random, exponential backoff. The
backoff starts low to prevent
+ // needing to wait the entire election timeout when the vote
result has already been
+ // determined. The randomness prevents the next election from
being gridlocked with
+ // another nominee due to timing. The exponential aspect
limits epoch churn when the
+ // replica has failed multiple elections in succession.
candidate.startBackingOff(
currentTimeMs,
binaryExponentialElectionBackoffMs(candidate.retries())
);
}
+ } else if (quorum.isProspective()) {
+ ProspectiveState prospective = quorum.prospectiveStateOrThrow();
+ if (prospective.epochElection().isVoteRejected()) {
+ logger.info(
+ "Insufficient remaining votes to become candidate
(rejected by {}). ",
+ prospective.epochElection().rejectingVoters()
Review Comment:
How about just printing the entire `epochElection`? It may be useful to know
the state of the entire voter set not just the rejecting voters.
##########
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##########
@@ -3110,27 +3121,32 @@ private long pollProspective(long currentTimeMs) {
if (shutdown != null) {
long minRequestBackoffMs = maybeSendVoteRequests(state,
currentTimeMs);
return Math.min(shutdown.remainingTimeMs(), minRequestBackoffMs);
- } else if (state.epochElection().isVoteRejected() ||
state.hasElectionTimeoutExpired(currentTimeMs)) {
- if (state.election().hasLeader() &&
!state.leaderEndpoints().isEmpty()) {
- logger.info(
- "Election was not granted, transitioning to Follower of
leader {}",
- state.election().leaderId());
- transitionToFollower(
- quorum().epoch(),
- state.election().leaderId(),
- state.leaderEndpoints(),
- currentTimeMs);
- } else {
- logger.info("Election was not granted, transitioning to
Unattached to attempt rediscovering leader");
- transitionToUnattached(quorum().epoch(),
state.election().optionalLeaderId());
- }
+ } else if (state.hasElectionTimeoutExpired(currentTimeMs)) {
+ logger.info("Election timed out before receiving sufficient vote
responses to become candidate");
+ prospectiveTransitionAfterElectionLoss(state, currentTimeMs);
return 0L;
} else {
long minVoteRequestBackoffMs = maybeSendVoteRequests(state,
currentTimeMs);
return Math.min(minVoteRequestBackoffMs,
state.remainingElectionTimeMs(currentTimeMs));
}
}
+ private void prospectiveTransitionAfterElectionLoss(ProspectiveState
prospective, long currentTimeMs) {
+ if (prospective.election().hasLeader() &&
!prospective.leaderEndpoints().isEmpty()) {
+ logger.info(
+ "Transitioning to Follower of leader {}",
+ prospective.election().leaderId());
Review Comment:
I think you can join these lines. If not, there should be a newline before
`);`
##########
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##########
@@ -985,36 +982,54 @@ private boolean handleVoteResponse(
maybeTransitionForward(state, currentTimeMs);
} else {
state.recordRejectedVote(remoteNodeId);
- maybeCandidateStartBackingOff(currentTimeMs);
+ maybeHandleElectionLoss(currentTimeMs);
}
} else {
- logger.debug("Ignoring vote response {} since we are no longer
a VotingState " +
- "(Prospective or Candidate) in epoch {}",
- partitionResponse, quorum.epoch());
+ logger.debug(
+ "Ignoring vote response {} since we are no longer a
NomineeState " +
+ "(Prospective or Candidate) in epoch {}",
+ partitionResponse,
+ quorum.epoch()
+ );
}
return true;
} else {
return handleUnexpectedError(error, responseMetadata);
}
}
- private void maybeCandidateStartBackingOff(long currentTimeMs) {
- // If in candidate state and vote is rejected, go immediately to a
random, exponential backoff. The
- // backoff starts low to prevent needing to wait the entire election
timeout when the vote
- // result has already been determined. The randomness prevents the
next election from being
- // gridlocked with another nominee due to timing. The exponential
aspect limits epoch churn when
- // the replica has failed multiple elections in succession.
+ /**
+ * On election loss, if replica is prospective it will transition to
unattached or follower state.
+ * If replica is candidate, it will start backing off.
+ */
+ private void maybeHandleElectionLoss(long currentTimeMs) {
if (quorum.isCandidate()) {
CandidateState candidate = quorum.candidateStateOrThrow();
if (candidate.epochElection().isVoteRejected() &&
!candidate.isBackingOff()) {
- logger.info("Insufficient remaining votes to become leader
(rejected by {}). " +
- "We will backoff before retrying election again",
candidate.epochElection().rejectingVoters());
-
+ logger.info(
+ "Insufficient remaining votes to become leader (rejected
by {}). " +
+ "We will backoff before retrying election again",
+ candidate.epochElection().rejectingVoters()
Review Comment:
How about just printing the entire `epochElection`? It may be useful to know
the state of the entire voter set not just the rejecting voters.
##########
raft/src/main/java/org/apache/kafka/raft/QuorumState.java:
##########
@@ -529,26 +592,54 @@ public void transitionToFollower(int epoch, int leaderId,
Endpoints endpoints) {
);
}
- public void transitionToCandidate() {
+ /**
+ * Transition to the "prospective" state. This means the replica
experienced a fetch/election timeout or
+ * loss of election as candidate. Note, if the replica is transitioning
from prospective to add voted state
+ * and there is no epoch change, it takes the route of
prospectiveAddVotedState instead.
+ */
+ public void transitionToProspective() {
if (isObserver()) {
throw new IllegalStateException(
String.format(
- "Cannot transition to Candidate since the local id (%s)
and directory id (%s) " +
+ "Cannot transition to Prospective since the local id (%s)
and directory id (%s) " +
"is not one of the voters %s",
localId,
localDirectoryId,
partitionState.lastVoterSet()
)
);
- } else if (isLeader()) {
- throw new IllegalStateException("Cannot transition to Candidate
since the local broker.id=" + localId +
- " since this node is already a Leader with state " + state);
+ } else if (isLeader() || isProspective()) {
+ throw new IllegalStateException("Cannot transition to Prospective
since the local broker.id=" + localId +
+ " is state " + state);
}
int retries = isCandidate() ? candidateStateOrThrow().retries() + 1 :
1;
+
+ durableTransitionTo(
Review Comment:
Transitioning to prospective is not really a durable transition since no
persisted data should have changed, right? You can see this is the case since
the function `transitionToProspective` doesn't take any inputs and it doesn't
increase the epoch.
In other words, the information that is persisted is information that quorum
state already knows and has already been persisted.
##########
raft/src/main/java/org/apache/kafka/raft/QuorumState.java:
##########
@@ -368,14 +369,21 @@ public void transitionToResigned(List<ReplicaKey>
preferredSuccessors) {
}
/**
- * Transition to the "unattached" state. This means we have found an epoch
greater than the current epoch,
- * but we do not yet know of the elected leader.
+ * Transition to the "unattached" state. This means the replica has found
an epoch greater than the current epoch,
+ * or the replica has transitioned from Prospective with the same epoch.
+ * Note, if the replica is transitioning from unattached to add voted
state and there is no epoch change,
+ * it takes the route of unattachedAddVotedState instead.
*/
- public void transitionToUnattached(int epoch) {
+ public void transitionToUnattached(int epoch, OptionalInt leaderId) {
int currentEpoch = state.epoch();
- if (epoch <= currentEpoch) {
- throw new IllegalStateException("Cannot transition to Unattached
with epoch= " + epoch +
- " from current state " + state);
+ if (epoch < currentEpoch || (epoch == currentEpoch &&
!isProspective())) {
+ throw new IllegalStateException(
+ String.format(
+ "Cannot transition to Unattached with epoch= %d from
current state %s",
Review Comment:
Minor but let's just remove the `=` sign.
```java
"Cannot transition to Unattached with epoch %d from
current state %s",
```
##########
raft/src/main/java/org/apache/kafka/raft/internals/EpochElection.java:
##########
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft.internals;
+
+import org.apache.kafka.raft.ReplicaKey;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * Tracks the votes cast by voters in an election held by a Nominee.
+ */
+public class EpochElection {
+ private Map<Integer, VoterState> voterStates;
+
+ public EpochElection(Set<ReplicaKey> voters) {
+ this.voterStates = voters.stream()
+ .collect(
+ Collectors.toMap(
+ ReplicaKey::id,
+ VoterState::new
+ )
+ );
+ }
+
+ /**
+ * Record a vote from a voter.
+ * @param voterId The id of the voter
+ * @param isGranted true if the vote is granted, false if it is rejected
+ * @return true if the voter had not been previously recorded
+ */
+ public boolean recordVote(int voterId, boolean isGranted) {
+ boolean wasUnrecorded = false;
+ VoterState voterState = getVoterStateOrThrow(voterId);
+ if (voterState.state == VoterState.State.UNRECORDED) {
+ wasUnrecorded = true;
+ }
+ if (isGranted) {
+ voterState.setState(VoterState.State.GRANTED);
+ } else {
+ voterState.setState(VoterState.State.REJECTED);
+ }
+ return wasUnrecorded;
+ }
+
+ /**
+ * Returns if a voter has granted the vote.
+ * @param voterId The id of the voter
+ * @throws IllegalArgumentException if the voter is not in the set of
voters
+ */
+ public boolean isGrantedVoter(int voterId) {
+ return getVoterStateOrThrow(voterId).state == VoterState.State.GRANTED;
+ }
+
+ /**
+ * Returns if a voter has rejected the vote.
+ * @param voterId The id of the voter
+ * @throws IllegalArgumentException if the voter is not in the set of
voters
+ */
+ public boolean isRejectedVoter(int voterId) {
+ return getVoterStateOrThrow(voterId).state ==
VoterState.State.REJECTED;
+ }
+
+ /**
+ * The set of voter ids.
+ */
+ public Set<Integer> voterIds() {
+ return Collections.unmodifiableSet(voterStates.keySet());
+ }
+
+ /**
+ * Get the collection of voter states.
+ */
+ public Collection<VoterState> voterStates() {
+ return Collections.unmodifiableCollection(voterStates.values());
+ }
+
+ /**
+ * Check whether we have received enough votes to conclude the election
and become leader.
+ *
+ * @return true if at least a majority of nodes have granted the vote
+ */
+ public boolean isVoteGranted() {
+ return numGranted() >= majoritySize();
+ }
+
+ /**
+ * Check if we have received enough rejections that it is no longer
possible to reach a
+ * majority of grants.
+ *
+ * @return true if the vote is rejected, false if the vote is already or
can still be granted
+ */
+ public boolean isVoteRejected() {
+ return numGranted() + numUnrecorded() < majoritySize();
+ }
+
+ /**
+ * Get the set of voters which have not been counted as granted or
rejected yet.
+ *
+ * @return The set of unrecorded voters
+ */
+ public Set<ReplicaKey> unrecordedVoters() {
+ return
votersOfState(VoterState.State.UNRECORDED).collect(Collectors.toSet());
+ }
+
+ /**
+ * Get the set of voters that have granted our vote requests.
+ *
+ * @return The set of granting voters, which should always contain the
localId
+ */
+ public Set<Integer> grantingVoters() {
+ return
votersOfState(VoterState.State.GRANTED).map(ReplicaKey::id).collect(Collectors.toSet());
+ }
+
+ /**
+ * Get the set of voters that have rejected our candidacy.
+ *
+ * @return The set of rejecting voters
+ */
+ public Set<Integer> rejectingVoters() {
+ return
votersOfState(VoterState.State.REJECTED).map(ReplicaKey::id).collect(Collectors.toSet());
+ }
+
+ private VoterState getVoterStateOrThrow(int voterId) {
+ VoterState voterState = voterStates.get(voterId);
+ if (voterState == null) {
+ throw new IllegalArgumentException("Attempt to access voter state
of non-voter " + voterId);
+ }
+ return voterState;
+ }
+
+ private Stream<ReplicaKey> votersOfState(VoterState.State state) {
+ return voterStates
+ .values()
+ .stream()
+ .filter(voterState -> voterState.state().equals(state))
+ .map(VoterState::replicaKey);
+ }
+
+ private long numGranted() {
+ return votersOfState(VoterState.State.GRANTED).count();
+ }
+
+ private long numUnrecorded() {
+ return votersOfState(VoterState.State.UNRECORDED).count();
+ }
+
+ private int majoritySize() {
+ return voterStates.size() / 2 + 1;
+ }
+
+ @Override
+ public String toString() {
+ return String.format(
+ "EpochElection(%s)",
+ voterStates.values().stream()
+ .map(VoterState::toString)
+ .collect(
+ Collectors.joining(", "))
+ );
Review Comment:
Why not just print the map?
```java
return String.format(
"EpochElection(voterStates=%s)",
voterStates
);
```
##########
raft/src/main/java/org/apache/kafka/raft/internals/EpochElection.java:
##########
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft.internals;
+
+import org.apache.kafka.raft.ReplicaKey;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * Tracks the votes cast by voters in an election held by a Nominee.
+ */
+public class EpochElection {
+ private Map<Integer, VoterState> voterStates;
+
+ public EpochElection(Set<ReplicaKey> voters) {
+ this.voterStates = voters.stream()
+ .collect(
+ Collectors.toMap(
+ ReplicaKey::id,
+ VoterState::new
+ )
+ );
+ }
+
+ /**
+ * Record a vote from a voter.
+ * @param voterId The id of the voter
+ * @param isGranted true if the vote is granted, false if it is rejected
+ * @return true if the voter had not been previously recorded
+ */
+ public boolean recordVote(int voterId, boolean isGranted) {
+ boolean wasUnrecorded = false;
+ VoterState voterState = getVoterStateOrThrow(voterId);
+ if (voterState.state == VoterState.State.UNRECORDED) {
+ wasUnrecorded = true;
+ }
Review Comment:
```java
VoterState voterState = getVoterStateOrThrow(voterId);
boolean wasUnrecorded = voterState.state ==
VoterState.State.UNRECORDED;
```
##########
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##########
@@ -985,36 +982,54 @@ private boolean handleVoteResponse(
maybeTransitionForward(state, currentTimeMs);
} else {
state.recordRejectedVote(remoteNodeId);
- maybeCandidateStartBackingOff(currentTimeMs);
+ maybeHandleElectionLoss(currentTimeMs);
}
} else {
- logger.debug("Ignoring vote response {} since we are no longer
a VotingState " +
- "(Prospective or Candidate) in epoch {}",
- partitionResponse, quorum.epoch());
+ logger.debug(
+ "Ignoring vote response {} since we are no longer a
NomineeState " +
+ "(Prospective or Candidate) in epoch {}",
+ partitionResponse,
+ quorum.epoch()
+ );
}
return true;
} else {
return handleUnexpectedError(error, responseMetadata);
}
}
- private void maybeCandidateStartBackingOff(long currentTimeMs) {
- // If in candidate state and vote is rejected, go immediately to a
random, exponential backoff. The
- // backoff starts low to prevent needing to wait the entire election
timeout when the vote
- // result has already been determined. The randomness prevents the
next election from being
- // gridlocked with another nominee due to timing. The exponential
aspect limits epoch churn when
- // the replica has failed multiple elections in succession.
+ /**
+ * On election loss, if replica is prospective it will transition to
unattached or follower state.
+ * If replica is candidate, it will start backing off.
+ */
+ private void maybeHandleElectionLoss(long currentTimeMs) {
Review Comment:
How about passing the `NomineeState` object, checking the subtype of that
object and casting to the appropriate subtype.
##########
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##########
@@ -3110,27 +3121,32 @@ private long pollProspective(long currentTimeMs) {
if (shutdown != null) {
long minRequestBackoffMs = maybeSendVoteRequests(state,
currentTimeMs);
return Math.min(shutdown.remainingTimeMs(), minRequestBackoffMs);
- } else if (state.epochElection().isVoteRejected() ||
state.hasElectionTimeoutExpired(currentTimeMs)) {
- if (state.election().hasLeader() &&
!state.leaderEndpoints().isEmpty()) {
- logger.info(
- "Election was not granted, transitioning to Follower of
leader {}",
- state.election().leaderId());
- transitionToFollower(
- quorum().epoch(),
- state.election().leaderId(),
- state.leaderEndpoints(),
- currentTimeMs);
- } else {
- logger.info("Election was not granted, transitioning to
Unattached to attempt rediscovering leader");
- transitionToUnattached(quorum().epoch(),
state.election().optionalLeaderId());
- }
+ } else if (state.hasElectionTimeoutExpired(currentTimeMs)) {
+ logger.info("Election timed out before receiving sufficient vote
responses to become candidate");
+ prospectiveTransitionAfterElectionLoss(state, currentTimeMs);
return 0L;
} else {
long minVoteRequestBackoffMs = maybeSendVoteRequests(state,
currentTimeMs);
return Math.min(minVoteRequestBackoffMs,
state.remainingElectionTimeMs(currentTimeMs));
}
}
+ private void prospectiveTransitionAfterElectionLoss(ProspectiveState
prospective, long currentTimeMs) {
+ if (prospective.election().hasLeader() &&
!prospective.leaderEndpoints().isEmpty()) {
+ logger.info(
+ "Transitioning to Follower of leader {}",
+ prospective.election().leaderId());
+ transitionToFollower(
+ quorum().epoch(),
+ prospective.election().leaderId(),
+ prospective.leaderEndpoints(),
+ currentTimeMs);
Review Comment:
Add a newline before `);`.
##########
raft/src/main/java/org/apache/kafka/raft/QuorumState.java:
##########
@@ -369,16 +369,11 @@ public void transitionToResigned(List<ReplicaKey>
preferredSuccessors) {
}
/**
- * Transition to the "unattached" state. This means we have found an epoch
greater than the current epoch
- * and do not yet know of the elected leader, or we have transitioned from
Prospective with the same epoch.
- * Note, if we are transitioning from unattached and there is no epoch
change, we take the path of
- * unattachedAddVotedState instead.
+ * Transition to the "unattached" state. This means the replica has found
an epoch greater than the current epoch,
+ * or the replica has transitioned from Prospective with the same epoch.
+ * Note, if the replica is transitioning from unattached to add voted
state and there is no epoch change,
+ * it takes the route of unattachedAddVotedState instead.
Review Comment:
As we discussed offline, the resigned state also transitions to unattached
with a greater epoch. Let's document that.
Having said that, let's also update the comment at the top of this file that
documents the transitions from resigned:
> Resigned transitions to:
Let's also update the KIP.
##########
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##########
@@ -2997,14 +3014,7 @@ private long pollResigned(long currentTimeMs) {
// until either the shutdown expires or an election bumps the epoch
stateTimeoutMs = shutdown.remainingTimeMs();
} else if (state.hasElectionTimeoutExpired(currentTimeMs)) {
- if (quorum.isVoter()) {
- transitionToProspective(currentTimeMs);
- } else {
- // It is possible that the old leader is not a voter in the
new voter set.
- // In that case increase the epoch and transition to
unattached. The epoch needs
- // to be increased to avoid FETCH responses with the leader
being this replica.
- transitionToUnattached(quorum.epoch() + 1);
- }
Review Comment:
Let add a comment that summarizes our discussion and conclusion. It is good
to document and explain this decision.
##########
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##########
@@ -2811,9 +2828,9 @@ private BeginQuorumEpochRequestData
buildBeginQuorumEpochRequest(ReplicaKey remo
);
}
- private VoteRequestData buildVoteRequest(ReplicaKey remoteVoter) {
+ private VoteRequestData buildVoteRequest(ReplicaKey remoteVoter, boolean
preVote) {
OffsetAndEpoch endOffset = endOffset();
- boolean isPreVote = quorum.isProspective();
+// boolean isPreVote = quorum.isProspective();
Review Comment:
Remove this line. Let's not have commented code.
##########
raft/src/main/java/org/apache/kafka/raft/FollowerState.java:
##########
@@ -75,7 +79,7 @@ public FollowerState(
@Override
public ElectionState election() {
- return ElectionState.withElectedLeader(epoch, leaderId, voters);
+ return new ElectionState(epoch, OptionalInt.of(leaderId), votedKey,
voters);
Review Comment:
Let's keep the previous pattern of using static methods to construct
`ElectionState`. You can add `Optional<ReplicaKey>` parameter to
`withElectedLeader`.
##########
raft/src/main/java/org/apache/kafka/raft/QuorumState.java:
##########
@@ -861,6 +871,17 @@ public boolean isNomineeState() {
return state instanceof NomineeState;
}
+ /**
+ * Determines if replica in unattached or prospective state can grant a
vote request.
+ * @param leaderId local replica's optional leader id.
Review Comment:
Missing new line between these two lines.
##########
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##########
@@ -679,8 +706,22 @@ private void transitionToCandidate(long currentTimeMs) {
onBecomeCandidate(currentTimeMs);
}
- private void transitionToUnattached(int epoch) {
- quorum.transitionToUnattached(epoch);
+ private void onBecomeProspective(long currentTimeMs) {
+ ProspectiveState state = quorum.prospectiveStateOrThrow();
+ if (!maybeTransitionToCandidate(state, currentTimeMs)) {
+ resetConnections();
+ kafkaRaftMetrics.updateElectionStartMs(currentTimeMs);
+ }
+ }
+
+ private void transitionToProspective(long currentTimeMs) {
+ quorum.transitionToProspective();
+ maybeFireLeaderChange();
Review Comment:
Why do you need to call `maybeFireLeaaderChange`? Based on the inputs and
since prospective doesn't increase the epoch, I would assume that the leader
and epoch doesn't change when transitioning to prospective.
##########
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##########
@@ -3026,20 +3099,51 @@ private long pollCandidate(long currentTimeMs) {
return Math.min(shutdown.remainingTimeMs(), minRequestBackoffMs);
} else if (state.isBackingOff()) {
if (state.isBackoffComplete(currentTimeMs)) {
- logger.info("Re-elect as candidate after election backoff has
completed");
- transitionToCandidate(currentTimeMs);
+ logger.info("Transition to prospective after election backoff
has completed");
+ transitionToProspective(currentTimeMs);
return 0L;
}
return state.remainingBackoffMs(currentTimeMs);
} else if (state.hasElectionTimeoutExpired(currentTimeMs)) {
- long backoffDurationMs =
binaryExponentialElectionBackoffMs(state.retries());
- logger.info("Election has timed out, backing off for {}ms before
becoming a candidate again",
- backoffDurationMs);
- state.startBackingOff(currentTimeMs, backoffDurationMs);
- return backoffDurationMs;
+ logger.info("Election was not granted, transitioning to
prospective");
+ transitionToProspective(currentTimeMs);
+ return 0L;
} else {
+ long minVoteRequestBackoffMs = maybeSendVoteRequests(state,
currentTimeMs);
+ return Math.min(minVoteRequestBackoffMs,
state.remainingElectionTimeMs(currentTimeMs));
+ }
+ }
+
+ private long pollProspective(long currentTimeMs) {
+ ProspectiveState state = quorum.prospectiveStateOrThrow();
+ GracefulShutdown shutdown = this.shutdown.get();
+
+ if (shutdown != null) {
long minRequestBackoffMs = maybeSendVoteRequests(state,
currentTimeMs);
- return Math.min(minRequestBackoffMs,
state.remainingElectionTimeMs(currentTimeMs));
+ return Math.min(shutdown.remainingTimeMs(), minRequestBackoffMs);
+ } else if (state.hasElectionTimeoutExpired(currentTimeMs)) {
+ logger.info("Election timed out before receiving sufficient vote
responses to become candidate");
+ prospectiveTransitionAfterElectionLoss(state, currentTimeMs);
+ return 0L;
+ } else {
+ long minVoteRequestBackoffMs = maybeSendVoteRequests(state,
currentTimeMs);
+ return Math.min(minVoteRequestBackoffMs,
state.remainingElectionTimeMs(currentTimeMs));
+ }
+ }
+
+ private void prospectiveTransitionAfterElectionLoss(ProspectiveState
prospective, long currentTimeMs) {
+ if (prospective.election().hasLeader() &&
!prospective.leaderEndpoints().isEmpty()) {
+ logger.info(
+ "Transitioning to Follower of leader {}",
Review Comment:
`QuorumState` already logs all transitions. It logs the "from" and "to"
state. Not sure this add any information.
##########
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##########
@@ -3026,20 +3099,51 @@ private long pollCandidate(long currentTimeMs) {
return Math.min(shutdown.remainingTimeMs(), minRequestBackoffMs);
} else if (state.isBackingOff()) {
if (state.isBackoffComplete(currentTimeMs)) {
- logger.info("Re-elect as candidate after election backoff has
completed");
- transitionToCandidate(currentTimeMs);
+ logger.info("Transition to prospective after election backoff
has completed");
+ transitionToProspective(currentTimeMs);
return 0L;
}
return state.remainingBackoffMs(currentTimeMs);
} else if (state.hasElectionTimeoutExpired(currentTimeMs)) {
- long backoffDurationMs =
binaryExponentialElectionBackoffMs(state.retries());
- logger.info("Election has timed out, backing off for {}ms before
becoming a candidate again",
- backoffDurationMs);
- state.startBackingOff(currentTimeMs, backoffDurationMs);
- return backoffDurationMs;
+ logger.info("Election was not granted, transitioning to
prospective");
+ transitionToProspective(currentTimeMs);
+ return 0L;
} else {
+ long minVoteRequestBackoffMs = maybeSendVoteRequests(state,
currentTimeMs);
+ return Math.min(minVoteRequestBackoffMs,
state.remainingElectionTimeMs(currentTimeMs));
+ }
+ }
+
+ private long pollProspective(long currentTimeMs) {
+ ProspectiveState state = quorum.prospectiveStateOrThrow();
+ GracefulShutdown shutdown = this.shutdown.get();
+
+ if (shutdown != null) {
long minRequestBackoffMs = maybeSendVoteRequests(state,
currentTimeMs);
- return Math.min(minRequestBackoffMs,
state.remainingElectionTimeMs(currentTimeMs));
+ return Math.min(shutdown.remainingTimeMs(), minRequestBackoffMs);
+ } else if (state.hasElectionTimeoutExpired(currentTimeMs)) {
+ logger.info("Election timed out before receiving sufficient vote
responses to become candidate");
+ prospectiveTransitionAfterElectionLoss(state, currentTimeMs);
+ return 0L;
+ } else {
+ long minVoteRequestBackoffMs = maybeSendVoteRequests(state,
currentTimeMs);
+ return Math.min(minVoteRequestBackoffMs,
state.remainingElectionTimeMs(currentTimeMs));
+ }
+ }
+
+ private void prospectiveTransitionAfterElectionLoss(ProspectiveState
prospective, long currentTimeMs) {
+ if (prospective.election().hasLeader() &&
!prospective.leaderEndpoints().isEmpty()) {
+ logger.info(
+ "Transitioning to Follower of leader {}",
+ prospective.election().leaderId());
+ transitionToFollower(
+ quorum().epoch(),
+ prospective.election().leaderId(),
+ prospective.leaderEndpoints(),
+ currentTimeMs);
+ } else {
+ logger.info("Transitioning to Unattached to attempt rediscovering
leader");
Review Comment:
Same here. QuorumState already logs all state transitions.
##########
raft/src/main/java/org/apache/kafka/raft/QuorumState.java:
##########
@@ -249,7 +246,7 @@ public void initialize(OffsetAndEpoch logEndOffsetAndEpoch)
throws IllegalStateE
time,
election.epoch(),
OptionalInt.empty(),
- Optional.empty(),
+ election.optionalVotedKey(),
Review Comment:
Offline you mentioned that you added this because you didn't want to lose
information when transitioning states. I agree with this goal but the voted key
is lost when the replica transitions to the `LeaderState`. Do you agree? If so
can you file a jira to fix this after this PR.
##########
raft/src/main/java/org/apache/kafka/raft/CandidateState.java:
##########
@@ -85,28 +80,14 @@ protected CandidateState(
this.backoffTimer = time.timer(0);
this.log = logContext.logger(CandidateState.class);
- for (ReplicaKey voter : voters.voterKeys()) {
- voteStates.put(voter.id(), new VoterState(voter));
- }
- voteStates.get(localId).setState(State.GRANTED);
+ this.epochElection = new EpochElection(voters.voterKeys());
+ epochElection.recordVote(localId, true);
}
public int localId() {
return localId;
}
Review Comment:
Can you remove this if it is not needed anymore?
##########
raft/src/main/java/org/apache/kafka/raft/NomineeState.java:
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft;
+
+import org.apache.kafka.raft.internals.EpochElection;
+
+interface NomineeState extends EpochState {
+ EpochElection epochElection();
Review Comment:
You don't need this method, right? This method is declared by `EpochState`.
##########
raft/src/main/java/org/apache/kafka/raft/ProspectiveState.java:
##########
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft;
+
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.apache.kafka.raft.internals.EpochElection;
+
+import org.slf4j.Logger;
+
+import java.util.Optional;
+import java.util.OptionalInt;
+
+import static
org.apache.kafka.raft.QuorumState.unattachedOrProspectiveCanGrantVote;
+
+public class ProspectiveState implements NomineeState {
+ private final int localId;
+ private final int epoch;
+ private final OptionalInt leaderId;
+ private final Optional<Endpoints> leaderEndpoints;
+ private final Optional<ReplicaKey> votedKey;
+ private final VoterSet voters;
+ private final EpochElection epochElection;
+ private final Optional<LogOffsetMetadata> highWatermark;
+ private final int retries;
+ private final long electionTimeoutMs;
+ private final Timer electionTimer;
+ private final Logger log;
+
+ /**
+ * The lifetime of a prospective state is the following.
+ *
+ * 1. Once started, it will send prevote requests and keep record of the
received vote responses
+ * 2. If it receives a message denoting a leader with a higher epoch, it
will transition to follower state.
+ * 3. If majority votes granted, it will transition to candidate state.
+ * 4. If majority votes rejected or election times out, it will transition
to unattached or follower state
+ * depending on if it knows the leader id and endpoints or not
+ */
+ public ProspectiveState(
+ Time time,
+ int localId,
+ int epoch,
+ OptionalInt leaderId,
+ Optional<Endpoints> leaderEndpoints,
+ Optional<ReplicaKey> votedKey,
+ VoterSet voters,
+ Optional<LogOffsetMetadata> highWatermark,
+ int retries,
+ int electionTimeoutMs,
+ LogContext logContext
+ ) {
+ this.localId = localId;
+ this.epoch = epoch;
+ this.leaderId = leaderId;
+ this.leaderEndpoints = leaderEndpoints;
+ this.votedKey = votedKey;
+ this.voters = voters;
+ this.highWatermark = highWatermark;
+ this.retries = retries;
+ this.electionTimeoutMs = electionTimeoutMs;
+ this.electionTimer = time.timer(electionTimeoutMs);
+ this.log = logContext.logger(ProspectiveState.class);
+
+ this.epochElection = new EpochElection(voters.voterKeys());
+ epochElection.recordVote(localId, true);
+ }
+
+ public int localId() {
+ return localId;
+ }
Review Comment:
Can you remove this if it is not needed?
##########
raft/src/main/java/org/apache/kafka/raft/QuorumState.java:
##########
@@ -521,6 +583,7 @@ public void transitionToFollower(int epoch, int leaderId,
Endpoints endpoints) {
epoch,
leaderId,
endpoints,
+ retainVotedKey ? votedKey() : Optional.empty(),
Review Comment:
Isn't this the same
```java
epoch == currentEpoch ? votedKey() : Optional.empty(),
```
If so, you can remove the variable `retainVotedKey`.
Similar to the unattached implementation, let's document why this is done.
##########
raft/src/main/java/org/apache/kafka/raft/QuorumState.java:
##########
@@ -683,12 +794,42 @@ public ResignedState resignedStateOrThrow() {
throw new IllegalStateException("Expected to be Resigned, but current
state is " + state);
}
+ public Optional<ProspectiveState> maybeProspectiveState() {
+ EpochState fixedState = state;
+ if (fixedState instanceof ProspectiveState) {
+ return Optional.of((ProspectiveState) fixedState);
+ } else {
+ return Optional.empty();
+ }
+ }
+
+ public ProspectiveState prospectiveStateOrThrow() {
+ if (isProspective())
+ return (ProspectiveState) state;
+ throw new IllegalStateException("Expected to be Prospective, but
current state is " + state);
+ }
Review Comment:
```java
public ProspectiveState prospectiveStateOrThrow() {
return maybeProspectiveState().orElseThrow(
() -> new IllegalStateException("Expected to be Prospective, but
current state is " + state)
);
}
```
##########
raft/src/main/java/org/apache/kafka/raft/QuorumState.java:
##########
@@ -562,6 +653,26 @@ public void transitionToCandidate() {
));
}
+ private void checkValidTransitionToCandidate() {
+ if (isObserver()) {
+ throw new IllegalStateException(
+ String.format(
+ "Cannot transition to Candidate since the local id (%s)
and directory id (%s) " +
+ "is not one of the voters %s",
+ localId,
+ localDirectoryId,
+ partitionState.lastVoterSet()
+ )
+ );
+ }
+ // Leader state can never transition to Candidate state
+ // Only Prospective is allowed to transition to Candidate, unless the
local replica is the only voter
+ if (isLeader() || (!isProspective() && !isOnlyVoter())) {
Review Comment:
You should be able to remove the check for if it is the only voter by making
that case transition to prospective instead. When the replica transitions to
prospective, it already short-circuits that transition. When the replica
transitions to prospective it checks if it can immediately transition to
candidate.
##########
raft/src/main/java/org/apache/kafka/raft/internals/EpochElection.java:
##########
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft.internals;
+
+import org.apache.kafka.raft.ReplicaKey;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * Tracks the votes cast by voters in an election held by a Nominee.
+ */
+public class EpochElection {
+ private Map<Integer, VoterState> voterStates;
+
+ public EpochElection(Set<ReplicaKey> voters) {
+ this.voterStates = voters.stream()
+ .collect(
+ Collectors.toMap(
+ ReplicaKey::id,
+ VoterState::new
+ )
+ );
+ }
+
+ /**
+ * Record a vote from a voter.
+ * @param voterId The id of the voter
+ * @param isGranted true if the vote is granted, false if it is rejected
+ * @return true if the voter had not been previously recorded
+ */
+ public boolean recordVote(int voterId, boolean isGranted) {
+ boolean wasUnrecorded = false;
+ VoterState voterState = getVoterStateOrThrow(voterId);
+ if (voterState.state == VoterState.State.UNRECORDED) {
+ wasUnrecorded = true;
+ }
+ if (isGranted) {
+ voterState.setState(VoterState.State.GRANTED);
+ } else {
+ voterState.setState(VoterState.State.REJECTED);
+ }
+ return wasUnrecorded;
+ }
+
+ /**
+ * Returns if a voter has granted the vote.
+ * @param voterId The id of the voter
+ * @throws IllegalArgumentException if the voter is not in the set of
voters
+ */
+ public boolean isGrantedVoter(int voterId) {
+ return getVoterStateOrThrow(voterId).state == VoterState.State.GRANTED;
+ }
+
+ /**
+ * Returns if a voter has rejected the vote.
+ * @param voterId The id of the voter
+ * @throws IllegalArgumentException if the voter is not in the set of
voters
+ */
+ public boolean isRejectedVoter(int voterId) {
+ return getVoterStateOrThrow(voterId).state ==
VoterState.State.REJECTED;
+ }
+
+ /**
+ * The set of voter ids.
+ */
+ public Set<Integer> voterIds() {
+ return Collections.unmodifiableSet(voterStates.keySet());
+ }
+
+ /**
+ * Get the collection of voter states.
+ */
+ public Collection<VoterState> voterStates() {
+ return Collections.unmodifiableCollection(voterStates.values());
+ }
+
+ /**
+ * Check whether we have received enough votes to conclude the election
and become leader.
+ *
+ * @return true if at least a majority of nodes have granted the vote
+ */
+ public boolean isVoteGranted() {
+ return numGranted() >= majoritySize();
+ }
+
+ /**
+ * Check if we have received enough rejections that it is no longer
possible to reach a
+ * majority of grants.
+ *
+ * @return true if the vote is rejected, false if the vote is already or
can still be granted
+ */
+ public boolean isVoteRejected() {
+ return numGranted() + numUnrecorded() < majoritySize();
+ }
+
+ /**
+ * Get the set of voters which have not been counted as granted or
rejected yet.
+ *
+ * @return The set of unrecorded voters
+ */
+ public Set<ReplicaKey> unrecordedVoters() {
+ return
votersOfState(VoterState.State.UNRECORDED).collect(Collectors.toSet());
+ }
+
+ /**
+ * Get the set of voters that have granted our vote requests.
+ *
+ * @return The set of granting voters, which should always contain the
localId
+ */
+ public Set<Integer> grantingVoters() {
+ return
votersOfState(VoterState.State.GRANTED).map(ReplicaKey::id).collect(Collectors.toSet());
+ }
+
+ /**
+ * Get the set of voters that have rejected our candidacy.
+ *
+ * @return The set of rejecting voters
+ */
+ public Set<Integer> rejectingVoters() {
+ return
votersOfState(VoterState.State.REJECTED).map(ReplicaKey::id).collect(Collectors.toSet());
+ }
+
+ private VoterState getVoterStateOrThrow(int voterId) {
+ VoterState voterState = voterStates.get(voterId);
+ if (voterState == null) {
+ throw new IllegalArgumentException("Attempt to access voter state
of non-voter " + voterId);
+ }
+ return voterState;
+ }
+
+ private Stream<ReplicaKey> votersOfState(VoterState.State state) {
+ return voterStates
+ .values()
+ .stream()
+ .filter(voterState -> voterState.state().equals(state))
+ .map(VoterState::replicaKey);
+ }
+
+ private long numGranted() {
+ return votersOfState(VoterState.State.GRANTED).count();
+ }
+
+ private long numUnrecorded() {
+ return votersOfState(VoterState.State.UNRECORDED).count();
+ }
+
+ private int majoritySize() {
+ return voterStates.size() / 2 + 1;
+ }
+
+ @Override
+ public String toString() {
+ return String.format(
+ "EpochElection(%s)",
+ voterStates.values().stream()
+ .map(VoterState::toString)
+ .collect(
+ Collectors.joining(", "))
+ );
+ }
+
+ private static final class VoterState {
+ private final ReplicaKey replicaKey;
+ private State state = State.UNRECORDED;
+
+ VoterState(ReplicaKey replicaKey) {
+ this.replicaKey = replicaKey;
+ }
+
+ public State state() {
+ return state;
+ }
+
+ public void setState(State state) {
+ this.state = state;
+ }
+
+ public ReplicaKey replicaKey() {
+ return replicaKey;
+ }
+
+ enum State {
+ UNRECORDED,
+ GRANTED,
+ REJECTED
+ }
+
+ @Override
+ public String toString() {
+ return String.format(
+ "VoterState(%s, state=%s)",
Review Comment:
```java
"VoterState(replicaKey=%s, state=%s)",
```
##########
raft/src/main/java/org/apache/kafka/raft/QuorumState.java:
##########
@@ -402,58 +417,103 @@ public void transitionToUnattached(int epoch) {
}
/**
- * Grant a vote to a candidate. We will transition/remain in Unattached
- * state until either the election timeout expires or a leader is elected.
In particular,
- * we do not begin fetching until the election has concluded and
- * {@link #transitionToFollower(int, int, Endpoints)} is invoked.
+ * Grant a vote to a candidate as Unattached. The replica will transition
to Unattached with votedKey
+ * state in the same epoch and remain there until either the election
timeout expires or it discovers the leader.
+ * Note, if the replica discovers a higher epoch or is transitioning from
Prospective, it takes
+ * the route of transitionToUnattached instead.
*/
- public void transitionToUnattachedVotedState(
+ public void unattachedAddVotedState(
int epoch,
ReplicaKey candidateKey
) {
int currentEpoch = state.epoch();
if (localId.isPresent() && candidateKey.id() == localId.getAsInt()) {
throw new IllegalStateException(
String.format(
- "Cannot transition to Voted for %s and epoch %d since it
matches the local " +
+ "Cannot add voted key (%s) to current state (%s) in epoch
%d since it matches the local " +
"broker.id",
candidateKey,
+ state,
epoch
)
);
} else if (localId.isEmpty()) {
- throw new IllegalStateException("Cannot transition to voted
without a replica id");
- } else if (epoch < currentEpoch) {
+ throw new IllegalStateException("Cannot add voted state without a
replica id");
+ } else if (epoch != currentEpoch || !isUnattachedNotVoted()) {
throw new IllegalStateException(
String.format(
- "Cannot transition to Voted for %s and epoch %d since the
current epoch " +
- "(%d) is larger",
+ "Cannot add voted key (%s) to current state (%s) in epoch
%d",
candidateKey,
- epoch,
- currentEpoch
+ state,
+ epoch
)
);
- } else if (epoch == currentEpoch && !isUnattachedNotVoted()) {
+ }
+
+ // Note that we reset the election timeout after voting for a
candidate because we
+ // know that the candidate has at least as good of a chance of getting
elected as us
+ durableTransitionTo(
+ new UnattachedState(
+ time,
+ epoch,
+ state.election().optionalLeaderId(),
+ Optional.of(candidateKey),
+ partitionState.lastVoterSet().voterIds(),
+ state.highWatermark(),
+ randomElectionTimeoutMs(),
+ logContext
+ )
+ );
+ log.debug("Voted for candidate {} in epoch {}", candidateKey, epoch);
Review Comment:
State transition changes are already logged at `INFO` level.
##########
raft/src/main/java/org/apache/kafka/raft/internals/EpochElection.java:
##########
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft.internals;
+
+import org.apache.kafka.raft.ReplicaKey;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * Tracks the votes cast by voters in an election held by a Nominee.
+ */
+public class EpochElection {
+ private Map<Integer, VoterState> voterStates;
+
+ public EpochElection(Set<ReplicaKey> voters) {
+ this.voterStates = voters.stream()
+ .collect(
+ Collectors.toMap(
+ ReplicaKey::id,
+ VoterState::new
+ )
+ );
+ }
+
+ /**
+ * Record a vote from a voter.
+ * @param voterId The id of the voter
+ * @param isGranted true if the vote is granted, false if it is rejected
+ * @return true if the voter had not been previously recorded
+ */
+ public boolean recordVote(int voterId, boolean isGranted) {
+ boolean wasUnrecorded = false;
+ VoterState voterState = getVoterStateOrThrow(voterId);
+ if (voterState.state == VoterState.State.UNRECORDED) {
+ wasUnrecorded = true;
+ }
+ if (isGranted) {
+ voterState.setState(VoterState.State.GRANTED);
+ } else {
+ voterState.setState(VoterState.State.REJECTED);
+ }
+ return wasUnrecorded;
+ }
+
+ /**
+ * Returns if a voter has granted the vote.
+ * @param voterId The id of the voter
+ * @throws IllegalArgumentException if the voter is not in the set of
voters
+ */
+ public boolean isGrantedVoter(int voterId) {
+ return getVoterStateOrThrow(voterId).state == VoterState.State.GRANTED;
+ }
+
+ /**
+ * Returns if a voter has rejected the vote.
+ * @param voterId The id of the voter
+ * @throws IllegalArgumentException if the voter is not in the set of
voters
+ */
+ public boolean isRejectedVoter(int voterId) {
+ return getVoterStateOrThrow(voterId).state ==
VoterState.State.REJECTED;
+ }
+
+ /**
+ * The set of voter ids.
+ */
+ public Set<Integer> voterIds() {
+ return Collections.unmodifiableSet(voterStates.keySet());
+ }
+
+ /**
+ * Get the collection of voter states.
+ */
+ public Collection<VoterState> voterStates() {
+ return Collections.unmodifiableCollection(voterStates.values());
+ }
Review Comment:
Looks like this doesn't need to be public. Looks like this method can be
removed since it is not used.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]