This is an automated email from the ASF dual-hosted git repository. jsancio pushed a commit to branch 3.9 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.9 by this push: new 5c97ab0a345 KAFKA-17067; Fix KRaft transition to CandidateState (#16820) 5c97ab0a345 is described below commit 5c97ab0a345189bef8d1778943ebf4d6d243f5bf Author: Alyssa Huang <ahu...@confluent.io> AuthorDate: Sat Aug 10 04:43:16 2024 -0700 KAFKA-17067; Fix KRaft transition to CandidateState (#16820) Only voters should be able to transition to Candidate state. This removes VotedState as one of the EpochStates and moves voted information into UnattachedState. Reviewers: José Armando García Sancio <jsan...@apache.org> --- .../java/org/apache/kafka/raft/EpochState.java | 2 +- .../org/apache/kafka/raft/KafkaRaftClient.java | 30 +--- .../java/org/apache/kafka/raft/QuorumState.java | 54 +++--- .../org/apache/kafka/raft/UnattachedState.java | 31 +++- .../java/org/apache/kafka/raft/VotedState.java | 133 -------------- .../kafka/raft/internals/KafkaRaftMetrics.java | 10 +- .../org/apache/kafka/raft/KafkaRaftClientTest.java | 83 ++++++++- .../org/apache/kafka/raft/QuorumStateTest.java | 198 ++++++++++++++------- .../org/apache/kafka/raft/UnattachedStateTest.java | 1 + ...eTest.java => UnattachedStateWithVoteTest.java} | 18 +- .../kafka/raft/internals/KafkaRaftMetricsTest.java | 2 +- 11 files changed, 287 insertions(+), 275 deletions(-) diff --git a/raft/src/main/java/org/apache/kafka/raft/EpochState.java b/raft/src/main/java/org/apache/kafka/raft/EpochState.java index 64642becce7..a0d643999b8 100644 --- a/raft/src/main/java/org/apache/kafka/raft/EpochState.java +++ b/raft/src/main/java/org/apache/kafka/raft/EpochState.java @@ -31,7 +31,7 @@ public interface EpochState extends Closeable { * Decide whether to grant a vote to a candidate. * * It is the responsibility of the caller to invoke - * {@link QuorumState#transitionToVoted(int, ReplicaKey)} if vote is granted. + * {@link QuorumState#transitionToUnattachedVotedState(int, ReplicaKey)} if vote is granted. * * @param candidateKey the id and directory of the candidate * @param isLogUpToDate whether the candidate’s log is at least as up-to-date as receiver’s log diff --git a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java index 9c5035af297..ad650fadb53 100644 --- a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java +++ b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java @@ -670,10 +670,8 @@ public final class KafkaRaftClient<T> implements RaftClient<T> { resetConnections(); } - private void transitionToVoted(ReplicaKey candidateKey, int epoch) { - quorum.transitionToVoted(epoch, candidateKey); - maybeFireLeaderChange(); - resetConnections(); + private void transitionToUnattachedVoted(ReplicaKey candidateKey, int epoch) { + quorum.transitionToUnattachedVotedState(epoch, candidateKey); } private void onBecomeFollower(long currentTimeMs) { @@ -816,8 +814,8 @@ public final class KafkaRaftClient<T> implements RaftClient<T> { lastEpochEndOffsetAndEpoch.compareTo(endOffset()) >= 0 ); - if (voteGranted && quorum.isUnattached()) { - transitionToVoted(candidateKey, candidateEpoch); + if (voteGranted && quorum.isUnattachedNotVoted()) { + transitionToUnattachedVoted(candidateKey, candidateEpoch); } logger.info("Vote request {} with epoch {} is {}", request, candidateEpoch, voteGranted ? "granted" : "rejected"); @@ -3095,24 +3093,6 @@ public final class KafkaRaftClient<T> implements RaftClient<T> { ); } - private long pollVoted(long currentTimeMs) { - VotedState state = quorum.votedStateOrThrow(); - GracefulShutdown shutdown = this.shutdown.get(); - - if (shutdown != null) { - // If shutting down, then remain in this state until either the - // shutdown completes or an epoch bump forces another state transition - return shutdown.remainingTimeMs(); - } else if (state.hasElectionTimeoutExpired(currentTimeMs)) { - // KAFKA-17067 is going to fix this. VotedState doesn't mean that the replica is a voter - // we need to treat VotedState similar to UnattachedState. - transitionToCandidate(currentTimeMs); - return 0L; - } else { - return state.remainingElectionTimeMs(currentTimeMs); - } - } - private long pollUnattached(long currentTimeMs) { UnattachedState state = quorum.unattachedStateOrThrow(); if (quorum.isVoter()) { @@ -3148,8 +3128,6 @@ public final class KafkaRaftClient<T> implements RaftClient<T> { return pollCandidate(currentTimeMs); } else if (quorum.isFollower()) { return pollFollower(currentTimeMs); - } else if (quorum.isVoted()) { - return pollVoted(currentTimeMs); } else if (quorum.isUnattached()) { return pollUnattached(currentTimeMs); } else if (quorum.isResigned()) { diff --git a/raft/src/main/java/org/apache/kafka/raft/QuorumState.java b/raft/src/main/java/org/apache/kafka/raft/QuorumState.java index ac22fb65d1b..f61bdab0db8 100644 --- a/raft/src/main/java/org/apache/kafka/raft/QuorumState.java +++ b/raft/src/main/java/org/apache/kafka/raft/QuorumState.java @@ -40,9 +40,13 @@ import java.util.Random; * only valid state transitions. Below we define the possible state transitions and * how they are triggered: * - * Unattached|Resigned transitions to: + * Resigned transitions to: * Unattached: After learning of a new election with a higher epoch - * Voted: After granting a vote to a candidate + * Candidate: After expiration of the election timeout + * Follower: After discovering a leader with an equal or larger epoch + * + * Unattached transitions to: + * Unattached: After learning of a new election with a higher epoch or after voting * Candidate: After expiration of the election timeout * Follower: After discovering a leader with an equal or larger epoch * @@ -157,6 +161,7 @@ public class QuorumState { time, logEndOffsetAndEpoch.epoch(), OptionalInt.empty(), + Optional.empty(), partitionState.lastVoterSet().voterIds(), Optional.empty(), randomElectionTimeoutMs(), @@ -195,10 +200,11 @@ public class QuorumState { logContext ); } else if (election.hasVoted()) { - initialState = new VotedState( + initialState = new UnattachedState( time, election.epoch(), - election.votedKey(), + OptionalInt.empty(), + Optional.of(election.votedKey()), partitionState.lastVoterSet().voterIds(), Optional.empty(), randomElectionTimeoutMs(), @@ -226,6 +232,7 @@ public class QuorumState { time, election.epoch(), OptionalInt.of(election.leaderId()), + Optional.empty(), partitionState.lastVoterSet().voterIds(), Optional.empty(), randomElectionTimeoutMs(), @@ -248,6 +255,7 @@ public class QuorumState { time, election.epoch(), OptionalInt.empty(), + Optional.empty(), partitionState.lastVoterSet().voterIds(), Optional.empty(), randomElectionTimeoutMs(), @@ -381,8 +389,6 @@ public class QuorumState { electionTimeoutMs = Long.MAX_VALUE; } else if (isCandidate()) { electionTimeoutMs = candidateStateOrThrow().remainingElectionTimeMs(time.milliseconds()); - } else if (isVoted()) { - electionTimeoutMs = votedStateOrThrow().remainingElectionTimeMs(time.milliseconds()); } else if (isUnattached()) { electionTimeoutMs = unattachedStateOrThrow().remainingElectionTimeMs(time.milliseconds()); } else { @@ -393,6 +399,7 @@ public class QuorumState { time, epoch, OptionalInt.empty(), + Optional.empty(), partitionState.lastVoterSet().voterIds(), state.highWatermark(), electionTimeoutMs, @@ -401,12 +408,12 @@ public class QuorumState { } /** - * Grant a vote to a candidate and become a follower for this epoch. We will remain in this + * Grant a vote to a candidate. We will transition/remain in Unattached * state until either the election timeout expires or a leader is elected. In particular, * we do not begin fetching until the election has concluded and * {@link #transitionToFollower(int, int, Endpoints)} is invoked. */ - public void transitionToVoted( + public void transitionToUnattachedVotedState( int epoch, ReplicaKey candidateKey ) { @@ -432,7 +439,7 @@ public class QuorumState { currentEpoch ) ); - } else if (epoch == currentEpoch && !isUnattached()) { + } else if (epoch == currentEpoch && !isUnattachedNotVoted()) { throw new IllegalStateException( String.format( "Cannot transition to Voted for %s and epoch %d from the current state (%s)", @@ -446,16 +453,18 @@ public class QuorumState { // Note that we reset the election timeout after voting for a candidate because we // know that the candidate has at least as good of a chance of getting elected as us durableTransitionTo( - new VotedState( + new UnattachedState( time, epoch, - candidateKey, + OptionalInt.empty(), + Optional.of(candidateKey), partitionState.lastVoterSet().voterIds(), state.highWatermark(), randomElectionTimeoutMs(), logContext ) ); + log.debug("Voted for candidate {} in epoch {}", candidateKey, epoch); } /** @@ -605,15 +614,10 @@ public class QuorumState { throw new IllegalStateException("Expected to be Follower, but the current state is " + state); } - public VotedState votedStateOrThrow() { - return maybeVotedState() - .orElseThrow(() -> new IllegalStateException("Expected to be Voted, but current state is " + state)); - } - - public Optional<VotedState> maybeVotedState() { + public Optional<UnattachedState> maybeUnattachedState() { EpochState fixedState = state; - if (fixedState instanceof VotedState) { - return Optional.of((VotedState) fixedState); + if (fixedState instanceof UnattachedState) { + return Optional.of((UnattachedState) fixedState); } else { return Optional.empty(); } @@ -661,14 +665,18 @@ public class QuorumState { return state instanceof FollowerState; } - public boolean isVoted() { - return state instanceof VotedState; - } - public boolean isUnattached() { return state instanceof UnattachedState; } + public boolean isUnattachedNotVoted() { + return maybeUnattachedState().filter(unattached -> !unattached.votedKey().isPresent()).isPresent(); + } + + public boolean isUnattachedAndVoted() { + return maybeUnattachedState().flatMap(UnattachedState::votedKey).isPresent(); + } + public boolean isLeader() { return state instanceof LeaderState; } diff --git a/raft/src/main/java/org/apache/kafka/raft/UnattachedState.java b/raft/src/main/java/org/apache/kafka/raft/UnattachedState.java index 1dba6a70f28..999a32f8663 100644 --- a/raft/src/main/java/org/apache/kafka/raft/UnattachedState.java +++ b/raft/src/main/java/org/apache/kafka/raft/UnattachedState.java @@ -39,9 +39,11 @@ import java.util.Set; * either through random Fetch requests to the bootstrap servers or through BeginQuorumEpoch * request from the leader. */ + public class UnattachedState implements EpochState { private final int epoch; private final OptionalInt leaderId; + private final Optional<ReplicaKey> votedKey; private final Set<Integer> voters; private final long electionTimeoutMs; private final Timer electionTimer; @@ -52,6 +54,7 @@ public class UnattachedState implements EpochState { Time time, int epoch, OptionalInt leaderId, + Optional<ReplicaKey> votedKey, Set<Integer> voters, Optional<LogOffsetMetadata> highWatermark, long electionTimeoutMs, @@ -59,6 +62,7 @@ public class UnattachedState implements EpochState { ) { this.epoch = epoch; this.leaderId = leaderId; + this.votedKey = votedKey; this.voters = voters; this.highWatermark = highWatermark; this.electionTimeoutMs = electionTimeoutMs; @@ -68,7 +72,9 @@ public class UnattachedState implements EpochState { @Override public ElectionState election() { - if (leaderId.isPresent()) { + if (votedKey.isPresent()) { + return ElectionState.withVotedCandidate(epoch, votedKey().get(), voters); + } else if (leaderId.isPresent()) { return ElectionState.withElectedLeader(epoch, leaderId.getAsInt(), voters); } else { return ElectionState.withUnknownLeader(epoch, voters); @@ -90,6 +96,10 @@ public class UnattachedState implements EpochState { return "Unattached"; } + public Optional<ReplicaKey> votedKey() { + return votedKey; + } + public long electionTimeoutMs() { return electionTimeoutMs; } @@ -111,8 +121,21 @@ public class UnattachedState implements EpochState { @Override public boolean canGrantVote(ReplicaKey candidateKey, boolean isLogUpToDate) { - if (leaderId.isPresent()) { - // If the leader id known it should behave similar to the follower state + if (votedKey.isPresent()) { + ReplicaKey votedReplicaKey = votedKey.get(); + if (votedReplicaKey.id() == candidateKey.id()) { + return !votedReplicaKey.directoryId().isPresent() || votedReplicaKey.directoryId().equals(candidateKey.directoryId()); + } + log.debug( + "Rejecting vote request from candidate ({}), already have voted for another " + + "candidate ({}) in epoch {}", + candidateKey, + votedKey, + epoch + ); + return false; + } else if (leaderId.isPresent()) { + // If the leader id is known it should behave similar to the follower state log.debug( "Rejecting vote request from candidate ({}) since we already have a leader {} in epoch {}", candidateKey, @@ -134,8 +157,10 @@ public class UnattachedState implements EpochState { public String toString() { return "Unattached(" + "epoch=" + epoch + + ", votedKey=" + votedKey.map(ReplicaKey::toString).orElse("null") + ", voters=" + voters + ", electionTimeoutMs=" + electionTimeoutMs + + ", highWatermark=" + highWatermark + ')'; } diff --git a/raft/src/main/java/org/apache/kafka/raft/VotedState.java b/raft/src/main/java/org/apache/kafka/raft/VotedState.java deleted file mode 100644 index c7710903399..00000000000 --- a/raft/src/main/java/org/apache/kafka/raft/VotedState.java +++ /dev/null @@ -1,133 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.raft; - -import org.apache.kafka.common.utils.LogContext; -import org.apache.kafka.common.utils.Time; -import org.apache.kafka.common.utils.Timer; -import org.apache.kafka.raft.internals.ReplicaKey; - -import org.slf4j.Logger; - -import java.util.Optional; -import java.util.Set; - -/** - * The "voted" state is for voters who have cast their vote for a specific candidate. - * - * Once a vote has been cast, it is not possible for a voter to change its vote until a - * new election is started. If the election timeout expires before a new leader is elected, - * then the voter will become a candidate. - */ -public class VotedState implements EpochState { - private final int epoch; - private final ReplicaKey votedKey; - private final Set<Integer> voters; - private final int electionTimeoutMs; - private final Timer electionTimer; - private final Optional<LogOffsetMetadata> highWatermark; - private final Logger log; - - public VotedState( - Time time, - int epoch, - ReplicaKey votedKey, - Set<Integer> voters, - Optional<LogOffsetMetadata> highWatermark, - int electionTimeoutMs, - LogContext logContext - ) { - this.epoch = epoch; - this.votedKey = votedKey; - this.voters = voters; - this.highWatermark = highWatermark; - this.electionTimeoutMs = electionTimeoutMs; - this.electionTimer = time.timer(electionTimeoutMs); - this.log = logContext.logger(VotedState.class); - } - - @Override - public ElectionState election() { - return ElectionState.withVotedCandidate(epoch, votedKey, voters); - } - - public ReplicaKey votedKey() { - return votedKey; - } - - @Override - public int epoch() { - return epoch; - } - - @Override - public Endpoints leaderEndpoints() { - return Endpoints.empty(); - } - - @Override - public String name() { - return "Voted"; - } - - public long remainingElectionTimeMs(long currentTimeMs) { - electionTimer.update(currentTimeMs); - return electionTimer.remainingMs(); - } - - public boolean hasElectionTimeoutExpired(long currentTimeMs) { - electionTimer.update(currentTimeMs); - return electionTimer.isExpired(); - } - - @Override - public boolean canGrantVote(ReplicaKey candidateKey, boolean isLogUpToDate) { - if (votedKey.id() == candidateKey.id()) { - return !votedKey.directoryId().isPresent() || votedKey.directoryId().equals(candidateKey.directoryId()); - } - - log.debug( - "Rejecting vote request from candidate ({}), already have voted for another " + - "candidate ({}) in epoch {}", - candidateKey, - votedKey, - epoch - ); - - return false; - } - - @Override - public Optional<LogOffsetMetadata> highWatermark() { - return highWatermark; - } - - @Override - public String toString() { - return String.format( - "Voted(epoch=%d, votedKey=%s, voters=%s, electionTimeoutMs=%d, highWatermark=%s)", - epoch, - votedKey, - voters, - electionTimeoutMs, - highWatermark - ); - } - - @Override - public void close() {} -} diff --git a/raft/src/main/java/org/apache/kafka/raft/internals/KafkaRaftMetrics.java b/raft/src/main/java/org/apache/kafka/raft/internals/KafkaRaftMetrics.java index 2b7617df359..49abcd0005f 100644 --- a/raft/src/main/java/org/apache/kafka/raft/internals/KafkaRaftMetrics.java +++ b/raft/src/main/java/org/apache/kafka/raft/internals/KafkaRaftMetrics.java @@ -72,7 +72,7 @@ public class KafkaRaftMetrics implements AutoCloseable { return "leader"; } else if (state.isCandidate()) { return "candidate"; - } else if (state.isVoted()) { + } else if (state.isUnattachedAndVoted()) { return "voted"; } else if (state.isFollower()) { // a broker is special kind of follower, as not being a voter, it's an observer @@ -95,8 +95,8 @@ public class KafkaRaftMetrics implements AutoCloseable { if (state.isLeader() || state.isCandidate()) { return state.localIdOrThrow(); } else { - return (double) state.maybeVotedState() - .map(votedState -> votedState.votedKey().id()) + return (double) state.maybeUnattachedState() + .flatMap(votedState -> votedState.votedKey().map(ReplicaKey::id)) .orElse(-1); } }); @@ -110,8 +110,8 @@ public class KafkaRaftMetrics implements AutoCloseable { if (state.isLeader() || state.isCandidate()) { return state.localDirectoryId().toString(); } else { - return state.maybeVotedState() - .flatMap(votedState -> votedState.votedKey().directoryId()) + return state.maybeUnattachedState() + .flatMap(votedState -> votedState.votedKey().flatMap(ReplicaKey::directoryId)) .orElse(Uuid.ZERO_UUID) .toString(); } diff --git a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java index e74e32fb79a..6ac11baf571 100644 --- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java @@ -210,7 +210,7 @@ public class KafkaRaftClientTest { context.client.poll(); // We will first transition to unattached and then grant vote and then transition to voted - assertTrue(context.client.quorum().isVoted()); + assertTrue(context.client.quorum().isUnattachedAndVoted()); context.assertVotedCandidate(epoch + 1, remoteKey.id()); context.assertSentVoteResponse(Errors.NONE, epoch + 1, OptionalInt.empty(), true); } @@ -247,7 +247,7 @@ public class KafkaRaftClientTest { context.client.poll(); // We will first transition to unattached and then grant vote and then transition to voted - assertTrue(context.client.quorum().isVoted()); + assertTrue(context.client.quorum().isUnattachedAndVoted()); context.assertVotedCandidate(epoch + 1, remoteKey.id()); context.assertSentVoteResponse(Errors.NONE, epoch + 1, OptionalInt.empty(), true); } @@ -284,7 +284,7 @@ public class KafkaRaftClientTest { // We will first transition to unattached and then grant vote and then transition to voted assertTrue( - context.client.quorum().isVoted(), + context.client.quorum().isUnattachedAndVoted(), "Local Id: " + localId + " Remote Id: " + remoteId + " Quorum local Id: " + context.client.quorum().localIdOrSentinel() + @@ -1692,13 +1692,78 @@ public class KafkaRaftClientTest { context.assertSentFetchRequest(epoch, 1L, lastEpoch); context.time.sleep(context.fetchTimeoutMs); - context.pollUntilRequest(); - context.assertSentVoteRequest(epoch + 1, lastEpoch, 1L, 1); context.assertVotedCandidate(epoch + 1, localId); } + @ParameterizedTest + @ValueSource(booleans = { true, false }) + public void testFollowerAsObserverDoesNotBecomeCandidateAfterFetchTimeout(boolean withKip853Rpc) throws Exception { + int localId = randomReplicaId(); + int otherNodeId = localId + 1; + int epoch = 5; + int lastEpoch = 3; + Set<Integer> voters = Utils.mkSet(otherNodeId); + + RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters) + .withElectedLeader(epoch, otherNodeId) + .appendToLog(lastEpoch, singletonList("foo")) + .withKip853Rpc(withKip853Rpc) + .build(); + context.assertElectedLeader(epoch, otherNodeId); + + context.pollUntilRequest(); + context.assertSentFetchRequest(epoch, 1L, lastEpoch); + + context.time.sleep(context.fetchTimeoutMs); + context.pollUntilRequest(); + assertTrue(context.client.quorum().isFollower()); + + // transitions to unattached + context.deliverRequest(context.voteRequest(epoch + 1, replicaKey(otherNodeId, withKip853Rpc), epoch, 1)); + context.pollUntilResponse(); + context.assertSentVoteResponse(Errors.NONE, epoch + 1, OptionalInt.empty(), true); + assertTrue(context.client.quorum().isUnattached()); + } + + @ParameterizedTest + @ValueSource(booleans = { true, false }) + public void testUnattachedAsObserverDoesNotBecomeCandidateAfterElectionTimeout(boolean withKip853Rpc) throws Exception { + int localId = randomReplicaId(); + int otherNodeId = localId + 1; + int epoch = 5; + Set<Integer> voters = Utils.mkSet(otherNodeId); + + RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters) + .withUnknownLeader(epoch) + .withKip853Rpc(withKip853Rpc) + .build(); + + context.pollUntilRequest(); + context.assertSentFetchRequest(epoch, 0L, 0); + assertTrue(context.client.quorum().isUnattached()); + + context.time.sleep(context.electionTimeoutMs() * 2); + context.pollUntilRequest(); + assertTrue(context.client.quorum().isUnattached()); + context.assertSentFetchRequest(epoch, 0L, 0); + // confirm no vote request was sent + assertEquals(0, context.channel.drainSendQueue().size()); + + context.deliverRequest(context.voteRequest(epoch + 1, replicaKey(otherNodeId, withKip853Rpc), epoch, 0)); + context.pollUntilResponse(); + // observer can vote + context.assertSentVoteResponse(Errors.NONE, epoch + 1, OptionalInt.empty(), true); + + context.time.sleep(context.electionTimeoutMs() * 2); + context.pollUntilRequest(); + // observer cannot transition to candidate though + assertTrue(context.client.quorum().isUnattached()); + context.assertSentFetchRequest(epoch + 1, 0L, 0); + assertEquals(0, context.channel.drainSendQueue().size()); + } + @ParameterizedTest @ValueSource(booleans = { true, false }) public void testInitializeObserverNoPreviousState(boolean withKip853Rpc) throws Exception { @@ -2259,7 +2324,7 @@ public class KafkaRaftClientTest { @ParameterizedTest @ValueSource(booleans = { true, false }) - public void testLeaderAcceptVoteFromNonVoter(boolean withKip853Rpc) throws Exception { + public void testLeaderAcceptVoteFromObserver(boolean withKip853Rpc) throws Exception { int localId = randomReplicaId(); int otherNodeId = localId + 1; Set<Integer> voters = Utils.mkSet(localId, otherNodeId); @@ -2272,12 +2337,12 @@ public class KafkaRaftClientTest { context.becomeLeader(); int epoch = context.currentEpoch(); - ReplicaKey nonVoterKey = replicaKey(localId + 2, withKip853Rpc); - context.deliverRequest(context.voteRequest(epoch - 1, nonVoterKey, 0, 0)); + ReplicaKey observerKey = replicaKey(localId + 2, withKip853Rpc); + context.deliverRequest(context.voteRequest(epoch - 1, observerKey, 0, 0)); context.client.poll(); context.assertSentVoteResponse(Errors.FENCED_LEADER_EPOCH, epoch, OptionalInt.of(localId), false); - context.deliverRequest(context.voteRequest(epoch, nonVoterKey, 0, 0)); + context.deliverRequest(context.voteRequest(epoch, observerKey, 0, 0)); context.client.poll(); context.assertSentVoteResponse(Errors.NONE, epoch, OptionalInt.of(localId), false); } diff --git a/raft/src/test/java/org/apache/kafka/raft/QuorumStateTest.java b/raft/src/test/java/org/apache/kafka/raft/QuorumStateTest.java index 77f7234d337..a22cf16123e 100644 --- a/raft/src/test/java/org/apache/kafka/raft/QuorumStateTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/QuorumStateTest.java @@ -141,6 +141,16 @@ public class QuorumStateTest { ); } + private VoterSet withRemoteVoterSet(IntStream remoteIds, KRaftVersion kraftVersion) { + boolean withDirectoryid = kraftVersion.featureLevel() > 0; + + Stream<ReplicaKey> remoteKeys = remoteIds + .boxed() + .map(id -> replicaKey(id, withDirectoryid)); + + return VoterSetTest.voterSet(remoteKeys); + } + private ReplicaKey replicaKey(int id, boolean withDirectoryId) { Uuid directoryId = withDirectoryId ? Uuid.randomUuid() : ReplicaKey.NO_DIRECTORY_ID; return ReplicaKey.of(id, directoryId); @@ -176,7 +186,7 @@ public class QuorumStateTest { QuorumState state = buildQuorumState(OptionalInt.of(localId), voters, kraftVersion); state.initialize(new OffsetAndEpoch(0L, 0)); - assertTrue(state.isUnattached()); + assertTrue(state.isUnattachedNotVoted()); UnattachedState unattachedState = state.unattachedStateOrThrow(); assertEquals(epoch, unattachedState.epoch()); assertEquals( @@ -242,12 +252,12 @@ public class QuorumStateTest { QuorumState state = buildQuorumState(OptionalInt.of(localId), voters, kraftVersion); state.initialize(new OffsetAndEpoch(0L, logEndEpoch)); - assertTrue(state.isVoted()); + assertTrue(state.isUnattachedAndVoted()); assertEquals(epoch, state.epoch()); - VotedState votedState = state.votedStateOrThrow(); + UnattachedState votedState = state.unattachedStateOrThrow(); assertEquals(epoch, votedState.epoch()); - assertEquals(persistedVotedKey(nodeKey1, kraftVersion), votedState.votedKey()); + assertEquals(persistedVotedKey(nodeKey1, kraftVersion), votedState.votedKey().get()); assertEquals( electionTimeoutMs + jitterMs, @@ -472,19 +482,19 @@ public class QuorumStateTest { @ParameterizedTest @EnumSource(value = KRaftVersion.class) - public void testCandidateToVoted(KRaftVersion kraftVersion) { + public void testCandidateToUnattachedVoted(KRaftVersion kraftVersion) { ReplicaKey otherNodeKey = ReplicaKey.of(1, Uuid.randomUuid()); VoterSet voters = VoterSetTest.voterSet(Stream.of(localVoterKey, otherNodeKey)); QuorumState state = initializeEmptyState(voters, kraftVersion); state.initialize(new OffsetAndEpoch(0L, logEndEpoch)); state.transitionToCandidate(); - state.transitionToVoted(5, otherNodeKey); + state.transitionToUnattachedVotedState(5, otherNodeKey); assertEquals(5, state.epoch()); assertEquals(OptionalInt.empty(), state.leaderId()); - VotedState followerState = state.votedStateOrThrow(); - assertEquals(otherNodeKey, followerState.votedKey()); + UnattachedState votedState = state.unattachedStateOrThrow(); + assertEquals(otherNodeKey, votedState.votedKey().get()); assertEquals( Optional.of( @@ -507,7 +517,7 @@ public class QuorumStateTest { state.transitionToUnattached(5); state.transitionToCandidate(); assertThrows(IllegalStateException.class, () -> state.transitionToUnattached(4)); - assertThrows(IllegalStateException.class, () -> state.transitionToVoted(4, otherNodeKey)); + assertThrows(IllegalStateException.class, () -> state.transitionToUnattachedVotedState(4, otherNodeKey)); assertThrows( IllegalStateException.class, () -> state.transitionToFollower( @@ -642,7 +652,7 @@ public class QuorumStateTest { @ParameterizedTest @EnumSource(value = KRaftVersion.class) - public void testLeaderToVoted(KRaftVersion kraftVersion) { + public void testLeaderToUnattachedVoted(KRaftVersion kraftVersion) { ReplicaKey otherNodeKey = ReplicaKey.of(1, Uuid.randomUuid()); VoterSet voters = VoterSetTest.voterSet(Stream.of(localVoterKey, otherNodeKey)); QuorumState state = initializeEmptyState(voters, kraftVersion); @@ -650,13 +660,13 @@ public class QuorumStateTest { state.transitionToCandidate(); state.candidateStateOrThrow().recordGrantedVote(otherNodeKey.id()); state.transitionToLeader(0L, accumulator); - state.transitionToVoted(5, otherNodeKey); + state.transitionToUnattachedVotedState(5, otherNodeKey); assertEquals(5, state.epoch()); assertEquals(OptionalInt.empty(), state.leaderId()); - VotedState votedState = state.votedStateOrThrow(); - assertEquals(otherNodeKey, votedState.votedKey()); + UnattachedState votedState = state.unattachedStateOrThrow(); + assertEquals(otherNodeKey, votedState.votedKey().get()); assertEquals( Optional.of( @@ -682,7 +692,7 @@ public class QuorumStateTest { state.candidateStateOrThrow().recordGrantedVote(otherNodeKey.id()); state.transitionToLeader(0L, accumulator); assertThrows(IllegalStateException.class, () -> state.transitionToUnattached(4)); - assertThrows(IllegalStateException.class, () -> state.transitionToVoted(4, otherNodeKey)); + assertThrows(IllegalStateException.class, () -> state.transitionToUnattachedVotedState(4, otherNodeKey)); assertThrows( IllegalStateException.class, () -> state.transitionToFollower( @@ -719,7 +729,7 @@ public class QuorumStateTest { voters.listeners(localId) ) ); - assertThrows(IllegalStateException.class, () -> state.transitionToVoted(0, localVoterKey)); + assertThrows(IllegalStateException.class, () -> state.transitionToUnattachedVotedState(0, localVoterKey)); } @ParameterizedTest @@ -741,7 +751,7 @@ public class QuorumStateTest { @ParameterizedTest @EnumSource(value = KRaftVersion.class) - public void testUnattachedToVotedSameEpoch(KRaftVersion kraftVersion) { + public void testUnattachedVotedSameEpoch(KRaftVersion kraftVersion) { ReplicaKey otherNodeKey = ReplicaKey.of(1, Uuid.randomUuid()); VoterSet voters = VoterSetTest.voterSet(Stream.of(localVoterKey, otherNodeKey)); QuorumState state = initializeEmptyState(voters, kraftVersion); @@ -750,11 +760,11 @@ public class QuorumStateTest { int jitterMs = 2500; random.mockNextInt(electionTimeoutMs, jitterMs); - state.transitionToVoted(5, otherNodeKey); + state.transitionToUnattachedVotedState(5, otherNodeKey); - VotedState votedState = state.votedStateOrThrow(); + UnattachedState votedState = state.unattachedStateOrThrow(); assertEquals(5, votedState.epoch()); - assertEquals(otherNodeKey, votedState.votedKey()); + assertEquals(otherNodeKey, votedState.votedKey().get()); assertEquals( Optional.of( @@ -774,17 +784,20 @@ public class QuorumStateTest { @ParameterizedTest @EnumSource(value = KRaftVersion.class) - public void testUnattachedToVotedHigherEpoch(KRaftVersion kraftVersion) { + public void testUnattachedVotedHigherEpoch(KRaftVersion kraftVersion) { ReplicaKey otherNodeKey = ReplicaKey.of(1, Uuid.randomUuid()); VoterSet voters = VoterSetTest.voterSet(Stream.of(localVoterKey, otherNodeKey)); QuorumState state = initializeEmptyState(voters, kraftVersion); state.initialize(new OffsetAndEpoch(0L, logEndEpoch)); state.transitionToUnattached(5); - state.transitionToVoted(8, otherNodeKey); + assertTrue(state.isUnattachedNotVoted()); - VotedState votedState = state.votedStateOrThrow(); + state.transitionToUnattachedVotedState(8, otherNodeKey); + assertTrue(state.isUnattachedAndVoted()); + + UnattachedState votedState = state.unattachedStateOrThrow(); assertEquals(8, votedState.epoch()); - assertEquals(otherNodeKey, votedState.votedKey()); + assertEquals(otherNodeKey, votedState.votedKey().get()); assertEquals( Optional.of( @@ -826,11 +839,13 @@ public class QuorumStateTest { QuorumState state = initializeEmptyState(voters, kraftVersion); state.initialize(new OffsetAndEpoch(0L, logEndEpoch)); state.transitionToUnattached(5); + assertTrue(state.isUnattachedNotVoted()); long remainingElectionTimeMs = state.unattachedStateOrThrow().remainingElectionTimeMs(time.milliseconds()); time.sleep(1000); state.transitionToUnattached(6); + assertTrue(state.isUnattachedNotVoted()); UnattachedState unattachedState = state.unattachedStateOrThrow(); assertEquals(6, unattachedState.epoch()); @@ -896,7 +911,7 @@ public class QuorumStateTest { state.initialize(new OffsetAndEpoch(0L, logEndEpoch)); state.transitionToUnattached(5); assertThrows(IllegalStateException.class, () -> state.transitionToUnattached(4)); - assertThrows(IllegalStateException.class, () -> state.transitionToVoted(4, otherNodeKey)); + assertThrows(IllegalStateException.class, () -> state.transitionToUnattachedVotedState(4, otherNodeKey)); assertThrows( IllegalStateException.class, () -> state.transitionToFollower( @@ -919,26 +934,26 @@ public class QuorumStateTest { @ParameterizedTest @EnumSource(value = KRaftVersion.class) - public void testVotedToInvalidLeaderOrResigned(KRaftVersion kraftVersion) { + public void testUnattachedVotedToInvalidLeaderOrResigned(KRaftVersion kraftVersion) { int node1 = 1; int node2 = 2; VoterSet voters = localWithRemoteVoterSet(IntStream.of(node1, node2), kraftVersion); QuorumState state = initializeEmptyState(voters, kraftVersion); state.initialize(new OffsetAndEpoch(0L, logEndEpoch)); - state.transitionToVoted(5, ReplicaKey.of(node1, ReplicaKey.NO_DIRECTORY_ID)); + state.transitionToUnattachedVotedState(5, ReplicaKey.of(node1, ReplicaKey.NO_DIRECTORY_ID)); assertThrows(IllegalStateException.class, () -> state.transitionToLeader(0, accumulator)); assertThrows(IllegalStateException.class, () -> state.transitionToResigned(Collections.emptyList())); } @ParameterizedTest @EnumSource(value = KRaftVersion.class) - public void testVotedToCandidate(KRaftVersion kraftVersion) { + public void testUnattachedVotedToCandidate(KRaftVersion kraftVersion) { int node1 = 1; int node2 = 2; VoterSet voters = localWithRemoteVoterSet(IntStream.of(node1, node2), kraftVersion); QuorumState state = initializeEmptyState(voters, kraftVersion); state.initialize(new OffsetAndEpoch(0L, logEndEpoch)); - state.transitionToVoted(5, ReplicaKey.of(node1, ReplicaKey.NO_DIRECTORY_ID)); + state.transitionToUnattachedVotedState(5, ReplicaKey.of(node1, ReplicaKey.NO_DIRECTORY_ID)); int jitterMs = 2500; random.mockNextInt(electionTimeoutMs, jitterMs); @@ -952,33 +967,47 @@ public class QuorumStateTest { @ParameterizedTest @EnumSource(value = KRaftVersion.class) - public void testVotedToVotedSameEpoch(KRaftVersion kraftVersion) { + public void testObserverFromUnattachedVotedToCandidate(KRaftVersion kraftVersion) { + int voter1 = 1; + int voter2 = 2; + VoterSet voters = withRemoteVoterSet(IntStream.of(voter1, voter2), kraftVersion); + QuorumState state = initializeEmptyState(voters, kraftVersion); + state.initialize(new OffsetAndEpoch(0L, logEndEpoch)); + state.transitionToUnattachedVotedState(5, ReplicaKey.of(voter1, ReplicaKey.NO_DIRECTORY_ID)); + + assertThrows(IllegalStateException.class, () -> state.transitionToCandidate()); + assertTrue(state.isUnattached()); + } + + @ParameterizedTest + @EnumSource(value = KRaftVersion.class) + public void testUnattachedVotedToUnattachedVotedSameEpoch(KRaftVersion kraftVersion) { int node1 = 1; int node2 = 2; VoterSet voters = localWithRemoteVoterSet(IntStream.of(node1, node2), kraftVersion); QuorumState state = initializeEmptyState(voters, kraftVersion); state.initialize(new OffsetAndEpoch(0L, logEndEpoch)); state.transitionToUnattached(5); - state.transitionToVoted(8, ReplicaKey.of(node1, Uuid.randomUuid())); + state.transitionToUnattachedVotedState(8, ReplicaKey.of(node1, Uuid.randomUuid())); assertThrows( IllegalStateException.class, - () -> state.transitionToVoted(8, ReplicaKey.of(node1, ReplicaKey.NO_DIRECTORY_ID)) + () -> state.transitionToUnattachedVotedState(8, ReplicaKey.of(node1, ReplicaKey.NO_DIRECTORY_ID)) ); assertThrows( IllegalStateException.class, - () -> state.transitionToVoted(8, ReplicaKey.of(node2, ReplicaKey.NO_DIRECTORY_ID)) + () -> state.transitionToUnattachedVotedState(8, ReplicaKey.of(node2, ReplicaKey.NO_DIRECTORY_ID)) ); } @ParameterizedTest @EnumSource(value = KRaftVersion.class) - public void testVotedToFollowerSameEpoch(KRaftVersion kraftVersion) { + public void testUnattachedVotedToFollowerSameEpoch(KRaftVersion kraftVersion) { int node1 = 1; int node2 = 2; VoterSet voters = localWithRemoteVoterSet(IntStream.of(node1, node2), kraftVersion); QuorumState state = initializeEmptyState(voters, kraftVersion); state.initialize(new OffsetAndEpoch(0L, logEndEpoch)); - state.transitionToVoted(5, ReplicaKey.of(node1, ReplicaKey.NO_DIRECTORY_ID)); + state.transitionToUnattachedVotedState(5, ReplicaKey.of(node1, ReplicaKey.NO_DIRECTORY_ID)); state.transitionToFollower( 5, node2, @@ -1005,13 +1034,13 @@ public class QuorumStateTest { @ParameterizedTest @EnumSource(value = KRaftVersion.class) - public void testVotedToFollowerHigherEpoch(KRaftVersion kraftVersion) { + public void testUnattachedVotedToFollowerHigherEpoch(KRaftVersion kraftVersion) { int node1 = 1; int node2 = 2; VoterSet voters = localWithRemoteVoterSet(IntStream.of(node1, node2), kraftVersion); QuorumState state = initializeEmptyState(voters, kraftVersion); state.initialize(new OffsetAndEpoch(0L, logEndEpoch)); - state.transitionToVoted(5, ReplicaKey.of(node1, ReplicaKey.NO_DIRECTORY_ID)); + state.transitionToUnattachedVotedState(5, ReplicaKey.of(node1, ReplicaKey.NO_DIRECTORY_ID)); state.transitionToFollower( 8, node2, @@ -1038,26 +1067,26 @@ public class QuorumStateTest { @ParameterizedTest @EnumSource(value = KRaftVersion.class) - public void testVotedToUnattachedSameEpoch(KRaftVersion kraftVersion) { + public void testUnattachedVotedToUnattachedSameEpoch(KRaftVersion kraftVersion) { int node1 = 1; int node2 = 2; VoterSet voters = localWithRemoteVoterSet(IntStream.of(node1, node2), kraftVersion); QuorumState state = initializeEmptyState(voters, kraftVersion); state.initialize(new OffsetAndEpoch(0L, logEndEpoch)); - state.transitionToVoted(5, ReplicaKey.of(node1, ReplicaKey.NO_DIRECTORY_ID)); + state.transitionToUnattachedVotedState(5, ReplicaKey.of(node1, ReplicaKey.NO_DIRECTORY_ID)); assertThrows(IllegalStateException.class, () -> state.transitionToUnattached(5)); } @ParameterizedTest @EnumSource(value = KRaftVersion.class) - public void testVotedToUnattachedHigherEpoch(KRaftVersion kraftVersion) { + public void testUnattachedVotedToUnattachedHigherEpoch(KRaftVersion kraftVersion) { int otherNodeId = 1; VoterSet voters = localWithRemoteVoterSet(IntStream.of(otherNodeId), kraftVersion); QuorumState state = initializeEmptyState(voters, kraftVersion); state.initialize(new OffsetAndEpoch(0L, logEndEpoch)); - state.transitionToVoted(5, ReplicaKey.of(otherNodeId, ReplicaKey.NO_DIRECTORY_ID)); + state.transitionToUnattachedVotedState(5, ReplicaKey.of(otherNodeId, ReplicaKey.NO_DIRECTORY_ID)); - long remainingElectionTimeMs = state.votedStateOrThrow().remainingElectionTimeMs(time.milliseconds()); + long remainingElectionTimeMs = state.unattachedStateOrThrow().remainingElectionTimeMs(time.milliseconds()); time.sleep(1000); state.transitionToUnattached(6); @@ -1071,14 +1100,14 @@ public class QuorumStateTest { @ParameterizedTest @EnumSource(value = KRaftVersion.class) - public void testVotedToAnyStateLowerEpoch(KRaftVersion kraftVersion) { + public void testUnattachedVotedToAnyStateLowerEpoch(KRaftVersion kraftVersion) { ReplicaKey otherNodeKey = ReplicaKey.of(1, Uuid.randomUuid()); VoterSet voters = VoterSetTest.voterSet(Stream.of(localVoterKey, otherNodeKey)); QuorumState state = initializeEmptyState(voters, kraftVersion); state.initialize(new OffsetAndEpoch(0L, logEndEpoch)); - state.transitionToVoted(5, otherNodeKey); + state.transitionToUnattachedVotedState(5, otherNodeKey); assertThrows(IllegalStateException.class, () -> state.transitionToUnattached(4)); - assertThrows(IllegalStateException.class, () -> state.transitionToVoted(4, otherNodeKey)); + assertThrows(IllegalStateException.class, () -> state.transitionToUnattachedVotedState(4, otherNodeKey)); assertThrows( IllegalStateException.class, () -> state.transitionToFollower( @@ -1100,6 +1129,43 @@ public class QuorumStateTest { ); } + @ParameterizedTest + @EnumSource(value = KRaftVersion.class) + public void testAllStatesToUnattachedFailInSameEpoch(KRaftVersion kraftVersion) { + ReplicaKey voter1 = ReplicaKey.of(1, Uuid.randomUuid()); + ReplicaKey voter2 = ReplicaKey.of(2, Uuid.randomUuid()); + VoterSet voters = VoterSetTest.voterSet(Stream.of(localVoterKey, voter1, voter2)); + QuorumState state = initializeEmptyState(voters, kraftVersion); + state.initialize(new OffsetAndEpoch(0L, logEndEpoch)); + + // unattached to unattached + state.unattachedStateOrThrow(); + state.transitionToUnattachedVotedState(5, voter1); + // cannot vote for same or different node in same epoch + assertThrows(IllegalStateException.class, () -> state.transitionToUnattachedVotedState(5, voter1)); + assertThrows(IllegalStateException.class, () -> state.transitionToUnattachedVotedState(5, voter2)); + // can vote for same or different node in larger epoch + state.transitionToUnattachedVotedState(10, voter1); + state.transitionToUnattachedVotedState(15, voter2); + + // follower to unattached + state.transitionToFollower(20, voter1.id(), voters.listeners(voter1.id())); + assertThrows(IllegalStateException.class, () -> state.transitionToUnattachedVotedState(state.epoch(), voter1)); + state.transitionToUnattachedVotedState(state.epoch() + 1, voter1); + + // candidate + state.transitionToCandidate(); + assertThrows(IllegalStateException.class, () -> state.transitionToUnattachedVotedState(state.epoch(), voter1)); + state.transitionToUnattachedVotedState(state.epoch() + 1, voter1); + + // leader + state.transitionToCandidate(); + state.candidateStateOrThrow().recordGrantedVote(voter1.id()); + state.transitionToLeader(0L, accumulator); + assertThrows(IllegalStateException.class, () -> state.transitionToUnattachedVotedState(state.epoch(), voter1)); + state.transitionToUnattachedVotedState(state.epoch() + 1, voter1); + } + @ParameterizedTest @EnumSource(value = KRaftVersion.class) public void testFollowerToFollowerSameEpoch(KRaftVersion kraftVersion) { @@ -1268,7 +1334,7 @@ public class QuorumStateTest { @ParameterizedTest @EnumSource(value = KRaftVersion.class) - public void testFollowerToVotedSameEpoch(KRaftVersion kraftVersion) { + public void testFollowerToUnattachedVotedSameEpoch(KRaftVersion kraftVersion) { int node1 = 1; int node2 = 2; VoterSet voters = localWithRemoteVoterSet(IntStream.of(node1, node2), kraftVersion); @@ -1282,21 +1348,21 @@ public class QuorumStateTest { assertThrows( IllegalStateException.class, - () -> state.transitionToVoted(8, ReplicaKey.of(node1, ReplicaKey.NO_DIRECTORY_ID)) + () -> state.transitionToUnattachedVotedState(8, ReplicaKey.of(node1, ReplicaKey.NO_DIRECTORY_ID)) ); assertThrows( IllegalStateException.class, - () -> state.transitionToVoted(8, ReplicaKey.of(localId, ReplicaKey.NO_DIRECTORY_ID)) + () -> state.transitionToUnattachedVotedState(8, ReplicaKey.of(localId, ReplicaKey.NO_DIRECTORY_ID)) ); assertThrows( IllegalStateException.class, - () -> state.transitionToVoted(8, ReplicaKey.of(node2, ReplicaKey.NO_DIRECTORY_ID)) + () -> state.transitionToUnattachedVotedState(8, ReplicaKey.of(node2, ReplicaKey.NO_DIRECTORY_ID)) ); } @ParameterizedTest @EnumSource(value = KRaftVersion.class) - public void testFollowerToVotedHigherEpoch(KRaftVersion kraftVersion) { + public void testFollowerToUnattachedVotedHigherEpoch(KRaftVersion kraftVersion) { ReplicaKey nodeKey1 = ReplicaKey.of(1, Uuid.randomUuid()); ReplicaKey nodeKey2 = ReplicaKey.of(2, Uuid.randomUuid()); @@ -1312,12 +1378,12 @@ public class QuorumStateTest { int jitterMs = 2500; random.mockNextInt(electionTimeoutMs, jitterMs); - state.transitionToVoted(9, nodeKey1); - assertTrue(state.isVoted()); + state.transitionToUnattachedVotedState(9, nodeKey1); + assertTrue(state.isUnattachedAndVoted()); - VotedState votedState = state.votedStateOrThrow(); + UnattachedState votedState = state.unattachedStateOrThrow(); assertEquals(9, votedState.epoch()); - assertEquals(nodeKey1, votedState.votedKey()); + assertEquals(nodeKey1, votedState.votedKey().get()); assertEquals(electionTimeoutMs + jitterMs, votedState.remainingElectionTimeMs(time.milliseconds())); @@ -1338,7 +1404,7 @@ public class QuorumStateTest { assertThrows(IllegalStateException.class, () -> state.transitionToUnattached(4)); assertThrows( IllegalStateException.class, - () -> state.transitionToVoted(4, ReplicaKey.of(otherNodeId, ReplicaKey.NO_DIRECTORY_ID)) + () -> state.transitionToUnattachedVotedState(4, ReplicaKey.of(otherNodeId, ReplicaKey.NO_DIRECTORY_ID)) ); assertThrows( IllegalStateException.class, @@ -1370,13 +1436,13 @@ public class QuorumStateTest { QuorumState state = initializeEmptyState(voters, kraftVersion); state.initialize(new OffsetAndEpoch(0L, logEndEpoch)); - // Transition to voted - state.transitionToVoted(4, nonVoterKey); - assertTrue(state.isVoted()); + // Add voted state + state.transitionToUnattachedVotedState(4, nonVoterKey); + assertTrue(state.isUnattachedAndVoted()); - VotedState votedState = state.votedStateOrThrow(); + UnattachedState votedState = state.unattachedStateOrThrow(); assertEquals(4, votedState.epoch()); - assertEquals(nonVoterKey, votedState.votedKey()); + assertEquals(nonVoterKey, votedState.votedKey().get()); // Transition to follower state.transitionToFollower( @@ -1420,12 +1486,12 @@ public class QuorumStateTest { state.initialize(new OffsetAndEpoch(0L, logEndEpoch)); assertTrue(state.isObserver()); - state.transitionToVoted(5, otherNodeKey); - assertTrue(state.isVoted()); + state.transitionToUnattachedVotedState(5, otherNodeKey); + assertTrue(state.isUnattachedAndVoted()); - VotedState votedState = state.votedStateOrThrow(); + UnattachedState votedState = state.unattachedStateOrThrow(); assertEquals(5, votedState.epoch()); - assertEquals(otherNodeKey, votedState.votedKey()); + assertEquals(otherNodeKey, votedState.votedKey().get()); } @ParameterizedTest @@ -1518,7 +1584,7 @@ public class QuorumStateTest { state.transitionToUnattached(state.epoch() + 1); assertFalse(state.hasRemoteLeader()); - state.transitionToVoted(state.epoch() + 1, otherNodeKey); + state.transitionToUnattachedVotedState(state.epoch() + 1, otherNodeKey); assertFalse(state.hasRemoteLeader()); state.transitionToFollower( @@ -1551,7 +1617,7 @@ public class QuorumStateTest { state.transitionToUnattached(6); assertEquals(highWatermark, state.highWatermark()); - state.transitionToVoted(7, otherNodeKey); + state.transitionToUnattachedVotedState(7, otherNodeKey); assertEquals(highWatermark, state.highWatermark()); state.transitionToCandidate(); @@ -1581,7 +1647,7 @@ public class QuorumStateTest { assertThrows(IllegalStateException.class, state::transitionToCandidate); assertThrows( IllegalStateException.class, - () -> state.transitionToVoted(1, ReplicaKey.of(1, ReplicaKey.NO_DIRECTORY_ID)) + () -> state.transitionToUnattachedVotedState(1, ReplicaKey.of(1, ReplicaKey.NO_DIRECTORY_ID)) ); assertThrows(IllegalStateException.class, () -> state.transitionToLeader(0L, accumulator)); diff --git a/raft/src/test/java/org/apache/kafka/raft/UnattachedStateTest.java b/raft/src/test/java/org/apache/kafka/raft/UnattachedStateTest.java index 38033c14ef0..113b24c8a6b 100644 --- a/raft/src/test/java/org/apache/kafka/raft/UnattachedStateTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/UnattachedStateTest.java @@ -48,6 +48,7 @@ public class UnattachedStateTest { time, epoch, leaderId, + Optional.empty(), voters, Optional.empty(), electionTimeoutMs, diff --git a/raft/src/test/java/org/apache/kafka/raft/VotedStateTest.java b/raft/src/test/java/org/apache/kafka/raft/UnattachedStateWithVoteTest.java similarity index 87% rename from raft/src/test/java/org/apache/kafka/raft/VotedStateTest.java rename to raft/src/test/java/org/apache/kafka/raft/UnattachedStateWithVoteTest.java index b8a184917a1..fdeded219fc 100644 --- a/raft/src/test/java/org/apache/kafka/raft/VotedStateTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/UnattachedStateWithVoteTest.java @@ -27,6 +27,7 @@ import org.junit.jupiter.params.provider.ValueSource; import java.util.Collections; import java.util.Optional; +import java.util.OptionalInt; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -40,13 +41,14 @@ class VotedStateTest { private final int votedId = 1; private final int electionTimeoutMs = 10000; - private VotedState newVotedState( + private UnattachedState newUnattachedVotedState( Uuid votedDirectoryId ) { - return new VotedState( + return new UnattachedState( time, epoch, - ReplicaKey.of(votedId, votedDirectoryId), + OptionalInt.empty(), + Optional.of(ReplicaKey.of(votedId, votedDirectoryId)), Collections.emptySet(), Optional.empty(), electionTimeoutMs, @@ -56,11 +58,11 @@ class VotedStateTest { @Test public void testElectionTimeout() { - VotedState state = newVotedState(ReplicaKey.NO_DIRECTORY_ID); + UnattachedState state = newUnattachedVotedState(ReplicaKey.NO_DIRECTORY_ID); ReplicaKey votedKey = ReplicaKey.of(votedId, ReplicaKey.NO_DIRECTORY_ID); assertEquals(epoch, state.epoch()); - assertEquals(votedKey, state.votedKey()); + assertEquals(votedKey, state.votedKey().get()); assertEquals( ElectionState.withVotedCandidate(epoch, votedKey, Collections.emptySet()), state.election() @@ -80,7 +82,7 @@ class VotedStateTest { @ParameterizedTest @ValueSource(booleans = {true, false}) public void testCanGrantVoteWithoutDirectoryId(boolean isLogUpToDate) { - VotedState state = newVotedState(ReplicaKey.NO_DIRECTORY_ID); + UnattachedState state = newUnattachedVotedState(ReplicaKey.NO_DIRECTORY_ID); assertTrue( state.canGrantVote(ReplicaKey.of(votedId, ReplicaKey.NO_DIRECTORY_ID), isLogUpToDate) @@ -100,7 +102,7 @@ class VotedStateTest { @Test void testCanGrantVoteWithDirectoryId() { Uuid votedDirectoryId = Uuid.randomUuid(); - VotedState state = newVotedState(votedDirectoryId); + UnattachedState state = newUnattachedVotedState(votedDirectoryId); assertTrue(state.canGrantVote(ReplicaKey.of(votedId, votedDirectoryId), false)); @@ -116,7 +118,7 @@ class VotedStateTest { @Test void testLeaderEndpoints() { Uuid votedDirectoryId = Uuid.randomUuid(); - VotedState state = newVotedState(votedDirectoryId); + UnattachedState state = newUnattachedVotedState(votedDirectoryId); assertEquals(Endpoints.empty(), state.leaderEndpoints()); } diff --git a/raft/src/test/java/org/apache/kafka/raft/internals/KafkaRaftMetricsTest.java b/raft/src/test/java/org/apache/kafka/raft/internals/KafkaRaftMetricsTest.java index bd65bbe993f..3e399de3c22 100644 --- a/raft/src/test/java/org/apache/kafka/raft/internals/KafkaRaftMetricsTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/internals/KafkaRaftMetricsTest.java @@ -183,7 +183,7 @@ public class KafkaRaftMetricsTest { state.followerStateOrThrow().updateHighWatermark(OptionalLong.of(10L)); assertEquals((double) 10L, getMetric(metrics, "high-watermark").metricValue()); - state.transitionToVoted(3, ReplicaKey.of(2, ReplicaKey.NO_DIRECTORY_ID)); + state.transitionToUnattachedVotedState(3, ReplicaKey.of(2, ReplicaKey.NO_DIRECTORY_ID)); assertEquals("voted", getMetric(metrics, "current-state").metricValue()); assertEquals((double) -1, getMetric(metrics, "current-leader").metricValue()); assertEquals((double) 2, getMetric(metrics, "current-vote").metricValue());