This is an automated email from the ASF dual-hosted git repository.

jsancio pushed a commit to branch 3.9
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.9 by this push:
     new 5c97ab0a345 KAFKA-17067; Fix KRaft transition to CandidateState 
(#16820)
5c97ab0a345 is described below

commit 5c97ab0a345189bef8d1778943ebf4d6d243f5bf
Author: Alyssa Huang <ahu...@confluent.io>
AuthorDate: Sat Aug 10 04:43:16 2024 -0700

    KAFKA-17067; Fix KRaft transition to CandidateState (#16820)
    
    Only voters should be able to transition to Candidate state. This removes 
VotedState as one of the EpochStates and moves voted information into 
UnattachedState.
    
    Reviewers: José Armando García Sancio <jsan...@apache.org>
---
 .../java/org/apache/kafka/raft/EpochState.java     |   2 +-
 .../org/apache/kafka/raft/KafkaRaftClient.java     |  30 +---
 .../java/org/apache/kafka/raft/QuorumState.java    |  54 +++---
 .../org/apache/kafka/raft/UnattachedState.java     |  31 +++-
 .../java/org/apache/kafka/raft/VotedState.java     | 133 --------------
 .../kafka/raft/internals/KafkaRaftMetrics.java     |  10 +-
 .../org/apache/kafka/raft/KafkaRaftClientTest.java |  83 ++++++++-
 .../org/apache/kafka/raft/QuorumStateTest.java     | 198 ++++++++++++++-------
 .../org/apache/kafka/raft/UnattachedStateTest.java |   1 +
 ...eTest.java => UnattachedStateWithVoteTest.java} |  18 +-
 .../kafka/raft/internals/KafkaRaftMetricsTest.java |   2 +-
 11 files changed, 287 insertions(+), 275 deletions(-)

diff --git a/raft/src/main/java/org/apache/kafka/raft/EpochState.java 
b/raft/src/main/java/org/apache/kafka/raft/EpochState.java
index 64642becce7..a0d643999b8 100644
--- a/raft/src/main/java/org/apache/kafka/raft/EpochState.java
+++ b/raft/src/main/java/org/apache/kafka/raft/EpochState.java
@@ -31,7 +31,7 @@ public interface EpochState extends Closeable {
      * Decide whether to grant a vote to a candidate.
      *
      * It is the responsibility of the caller to invoke
-     * {@link QuorumState#transitionToVoted(int, ReplicaKey)} if vote is 
granted.
+     * {@link QuorumState#transitionToUnattachedVotedState(int, ReplicaKey)} 
if vote is granted.
      *
      * @param candidateKey the id and directory of the candidate
      * @param isLogUpToDate whether the candidate’s log is at least as 
up-to-date as receiver’s log
diff --git a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java 
b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
index 9c5035af297..ad650fadb53 100644
--- a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
+++ b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
@@ -670,10 +670,8 @@ public final class KafkaRaftClient<T> implements 
RaftClient<T> {
         resetConnections();
     }
 
-    private void transitionToVoted(ReplicaKey candidateKey, int epoch) {
-        quorum.transitionToVoted(epoch, candidateKey);
-        maybeFireLeaderChange();
-        resetConnections();
+    private void transitionToUnattachedVoted(ReplicaKey candidateKey, int 
epoch) {
+        quorum.transitionToUnattachedVotedState(epoch, candidateKey);
     }
 
     private void onBecomeFollower(long currentTimeMs) {
@@ -816,8 +814,8 @@ public final class KafkaRaftClient<T> implements 
RaftClient<T> {
             lastEpochEndOffsetAndEpoch.compareTo(endOffset()) >= 0
         );
 
-        if (voteGranted && quorum.isUnattached()) {
-            transitionToVoted(candidateKey, candidateEpoch);
+        if (voteGranted && quorum.isUnattachedNotVoted()) {
+            transitionToUnattachedVoted(candidateKey, candidateEpoch);
         }
 
         logger.info("Vote request {} with epoch {} is {}", request, 
candidateEpoch, voteGranted ? "granted" : "rejected");
@@ -3095,24 +3093,6 @@ public final class KafkaRaftClient<T> implements 
RaftClient<T> {
         );
     }
 
-    private long pollVoted(long currentTimeMs) {
-        VotedState state = quorum.votedStateOrThrow();
-        GracefulShutdown shutdown = this.shutdown.get();
-
-        if (shutdown != null) {
-            // If shutting down, then remain in this state until either the
-            // shutdown completes or an epoch bump forces another state 
transition
-            return shutdown.remainingTimeMs();
-        } else if (state.hasElectionTimeoutExpired(currentTimeMs)) {
-            // KAFKA-17067 is going to fix this. VotedState doesn't mean that 
the replica is a voter
-            // we need to treat VotedState similar to UnattachedState.
-            transitionToCandidate(currentTimeMs);
-            return 0L;
-        } else {
-            return state.remainingElectionTimeMs(currentTimeMs);
-        }
-    }
-
     private long pollUnattached(long currentTimeMs) {
         UnattachedState state = quorum.unattachedStateOrThrow();
         if (quorum.isVoter()) {
@@ -3148,8 +3128,6 @@ public final class KafkaRaftClient<T> implements 
RaftClient<T> {
             return pollCandidate(currentTimeMs);
         } else if (quorum.isFollower()) {
             return pollFollower(currentTimeMs);
-        } else if (quorum.isVoted()) {
-            return pollVoted(currentTimeMs);
         } else if (quorum.isUnattached()) {
             return pollUnattached(currentTimeMs);
         } else if (quorum.isResigned()) {
diff --git a/raft/src/main/java/org/apache/kafka/raft/QuorumState.java 
b/raft/src/main/java/org/apache/kafka/raft/QuorumState.java
index ac22fb65d1b..f61bdab0db8 100644
--- a/raft/src/main/java/org/apache/kafka/raft/QuorumState.java
+++ b/raft/src/main/java/org/apache/kafka/raft/QuorumState.java
@@ -40,9 +40,13 @@ import java.util.Random;
  * only valid state transitions. Below we define the possible state 
transitions and
  * how they are triggered:
  *
- * Unattached|Resigned transitions to:
+ * Resigned transitions to:
  *    Unattached: After learning of a new election with a higher epoch
- *    Voted: After granting a vote to a candidate
+ *    Candidate: After expiration of the election timeout
+ *    Follower: After discovering a leader with an equal or larger epoch
+ *
+ * Unattached transitions to:
+ *    Unattached: After learning of a new election with a higher epoch or 
after voting
  *    Candidate: After expiration of the election timeout
  *    Follower: After discovering a leader with an equal or larger epoch
  *
@@ -157,6 +161,7 @@ public class QuorumState {
                 time,
                 logEndOffsetAndEpoch.epoch(),
                 OptionalInt.empty(),
+                Optional.empty(),
                 partitionState.lastVoterSet().voterIds(),
                 Optional.empty(),
                 randomElectionTimeoutMs(),
@@ -195,10 +200,11 @@ public class QuorumState {
                 logContext
             );
         } else if (election.hasVoted()) {
-            initialState = new VotedState(
+            initialState = new UnattachedState(
                 time,
                 election.epoch(),
-                election.votedKey(),
+                OptionalInt.empty(),
+                Optional.of(election.votedKey()),
                 partitionState.lastVoterSet().voterIds(),
                 Optional.empty(),
                 randomElectionTimeoutMs(),
@@ -226,6 +232,7 @@ public class QuorumState {
                     time,
                     election.epoch(),
                     OptionalInt.of(election.leaderId()),
+                    Optional.empty(),
                     partitionState.lastVoterSet().voterIds(),
                     Optional.empty(),
                     randomElectionTimeoutMs(),
@@ -248,6 +255,7 @@ public class QuorumState {
                 time,
                 election.epoch(),
                 OptionalInt.empty(),
+                Optional.empty(),
                 partitionState.lastVoterSet().voterIds(),
                 Optional.empty(),
                 randomElectionTimeoutMs(),
@@ -381,8 +389,6 @@ public class QuorumState {
             electionTimeoutMs = Long.MAX_VALUE;
         } else if (isCandidate()) {
             electionTimeoutMs = 
candidateStateOrThrow().remainingElectionTimeMs(time.milliseconds());
-        } else if (isVoted()) {
-            electionTimeoutMs = 
votedStateOrThrow().remainingElectionTimeMs(time.milliseconds());
         } else if (isUnattached()) {
             electionTimeoutMs = 
unattachedStateOrThrow().remainingElectionTimeMs(time.milliseconds());
         } else {
@@ -393,6 +399,7 @@ public class QuorumState {
             time,
             epoch,
             OptionalInt.empty(),
+            Optional.empty(),
             partitionState.lastVoterSet().voterIds(),
             state.highWatermark(),
             electionTimeoutMs,
@@ -401,12 +408,12 @@ public class QuorumState {
     }
 
     /**
-     * Grant a vote to a candidate and become a follower for this epoch. We 
will remain in this
+     * Grant a vote to a candidate. We will transition/remain in Unattached
      * state until either the election timeout expires or a leader is elected. 
In particular,
      * we do not begin fetching until the election has concluded and
      * {@link #transitionToFollower(int, int, Endpoints)} is invoked.
      */
-    public void transitionToVoted(
+    public void transitionToUnattachedVotedState(
         int epoch,
         ReplicaKey candidateKey
     ) {
@@ -432,7 +439,7 @@ public class QuorumState {
                     currentEpoch
                 )
             );
-        } else if (epoch == currentEpoch && !isUnattached()) {
+        } else if (epoch == currentEpoch && !isUnattachedNotVoted()) {
             throw new IllegalStateException(
                 String.format(
                     "Cannot transition to Voted for %s and epoch %d from the 
current state (%s)",
@@ -446,16 +453,18 @@ public class QuorumState {
         // Note that we reset the election timeout after voting for a 
candidate because we
         // know that the candidate has at least as good of a chance of getting 
elected as us
         durableTransitionTo(
-            new VotedState(
+            new UnattachedState(
                 time,
                 epoch,
-                candidateKey,
+                OptionalInt.empty(),
+                Optional.of(candidateKey),
                 partitionState.lastVoterSet().voterIds(),
                 state.highWatermark(),
                 randomElectionTimeoutMs(),
                 logContext
             )
         );
+        log.debug("Voted for candidate {} in epoch {}", candidateKey, epoch);
     }
 
     /**
@@ -605,15 +614,10 @@ public class QuorumState {
         throw new IllegalStateException("Expected to be Follower, but the 
current state is " + state);
     }
 
-    public VotedState votedStateOrThrow() {
-        return maybeVotedState()
-            .orElseThrow(() -> new IllegalStateException("Expected to be 
Voted, but current state is " + state));
-    }
-
-    public Optional<VotedState> maybeVotedState() {
+    public Optional<UnattachedState> maybeUnattachedState() {
         EpochState fixedState = state;
-        if (fixedState instanceof VotedState) {
-            return Optional.of((VotedState) fixedState);
+        if (fixedState instanceof UnattachedState) {
+            return Optional.of((UnattachedState) fixedState);
         } else {
             return Optional.empty();
         }
@@ -661,14 +665,18 @@ public class QuorumState {
         return state instanceof FollowerState;
     }
 
-    public boolean isVoted() {
-        return state instanceof VotedState;
-    }
-
     public boolean isUnattached() {
         return state instanceof UnattachedState;
     }
 
+    public boolean isUnattachedNotVoted() {
+        return maybeUnattachedState().filter(unattached -> 
!unattached.votedKey().isPresent()).isPresent();
+    }
+
+    public boolean isUnattachedAndVoted() {
+        return 
maybeUnattachedState().flatMap(UnattachedState::votedKey).isPresent();
+    }
+
     public boolean isLeader() {
         return state instanceof LeaderState;
     }
diff --git a/raft/src/main/java/org/apache/kafka/raft/UnattachedState.java 
b/raft/src/main/java/org/apache/kafka/raft/UnattachedState.java
index 1dba6a70f28..999a32f8663 100644
--- a/raft/src/main/java/org/apache/kafka/raft/UnattachedState.java
+++ b/raft/src/main/java/org/apache/kafka/raft/UnattachedState.java
@@ -39,9 +39,11 @@ import java.util.Set;
  * either through random Fetch requests to the bootstrap servers or through 
BeginQuorumEpoch
  * request from the leader.
  */
+
 public class UnattachedState implements EpochState {
     private final int epoch;
     private final OptionalInt leaderId;
+    private final Optional<ReplicaKey> votedKey;
     private final Set<Integer> voters;
     private final long electionTimeoutMs;
     private final Timer electionTimer;
@@ -52,6 +54,7 @@ public class UnattachedState implements EpochState {
         Time time,
         int epoch,
         OptionalInt leaderId,
+        Optional<ReplicaKey> votedKey,
         Set<Integer> voters,
         Optional<LogOffsetMetadata> highWatermark,
         long electionTimeoutMs,
@@ -59,6 +62,7 @@ public class UnattachedState implements EpochState {
     ) {
         this.epoch = epoch;
         this.leaderId = leaderId;
+        this.votedKey = votedKey;
         this.voters = voters;
         this.highWatermark = highWatermark;
         this.electionTimeoutMs = electionTimeoutMs;
@@ -68,7 +72,9 @@ public class UnattachedState implements EpochState {
 
     @Override
     public ElectionState election() {
-        if (leaderId.isPresent()) {
+        if (votedKey.isPresent()) {
+            return ElectionState.withVotedCandidate(epoch, votedKey().get(), 
voters);
+        } else if (leaderId.isPresent()) {
             return ElectionState.withElectedLeader(epoch, leaderId.getAsInt(), 
voters);
         } else {
             return ElectionState.withUnknownLeader(epoch, voters);
@@ -90,6 +96,10 @@ public class UnattachedState implements EpochState {
         return "Unattached";
     }
 
+    public Optional<ReplicaKey> votedKey() {
+        return votedKey;
+    }
+
     public long electionTimeoutMs() {
         return electionTimeoutMs;
     }
@@ -111,8 +121,21 @@ public class UnattachedState implements EpochState {
 
     @Override
     public boolean canGrantVote(ReplicaKey candidateKey, boolean 
isLogUpToDate) {
-        if (leaderId.isPresent()) {
-            // If the leader id known it should behave similar to the follower 
state
+        if (votedKey.isPresent()) {
+            ReplicaKey votedReplicaKey = votedKey.get();
+            if (votedReplicaKey.id() == candidateKey.id()) {
+                return !votedReplicaKey.directoryId().isPresent() || 
votedReplicaKey.directoryId().equals(candidateKey.directoryId());
+            }
+            log.debug(
+                "Rejecting vote request from candidate ({}), already have 
voted for another " +
+                    "candidate ({}) in epoch {}",
+                candidateKey,
+                votedKey,
+                epoch
+            );
+            return false;
+        } else if (leaderId.isPresent()) {
+            // If the leader id is known it should behave similar to the 
follower state
             log.debug(
                 "Rejecting vote request from candidate ({}) since we already 
have a leader {} in epoch {}",
                 candidateKey,
@@ -134,8 +157,10 @@ public class UnattachedState implements EpochState {
     public String toString() {
         return "Unattached(" +
             "epoch=" + epoch +
+            ", votedKey=" + votedKey.map(ReplicaKey::toString).orElse("null") +
             ", voters=" + voters +
             ", electionTimeoutMs=" + electionTimeoutMs +
+            ", highWatermark=" + highWatermark +
             ')';
     }
 
diff --git a/raft/src/main/java/org/apache/kafka/raft/VotedState.java 
b/raft/src/main/java/org/apache/kafka/raft/VotedState.java
deleted file mode 100644
index c7710903399..00000000000
--- a/raft/src/main/java/org/apache/kafka/raft/VotedState.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.raft;
-
-import org.apache.kafka.common.utils.LogContext;
-import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.common.utils.Timer;
-import org.apache.kafka.raft.internals.ReplicaKey;
-
-import org.slf4j.Logger;
-
-import java.util.Optional;
-import java.util.Set;
-
-/**
- * The "voted" state is for voters who have cast their vote for a specific 
candidate.
- *
- * Once a vote has been cast, it is not possible for a voter to change its 
vote until a
- * new election is started. If the election timeout expires before a new 
leader is elected,
- * then the voter will become a candidate.
- */
-public class VotedState implements EpochState {
-    private final int epoch;
-    private final ReplicaKey votedKey;
-    private final Set<Integer> voters;
-    private final int electionTimeoutMs;
-    private final Timer electionTimer;
-    private final Optional<LogOffsetMetadata> highWatermark;
-    private final Logger log;
-
-    public VotedState(
-        Time time,
-        int epoch,
-        ReplicaKey votedKey,
-        Set<Integer> voters,
-        Optional<LogOffsetMetadata> highWatermark,
-        int electionTimeoutMs,
-        LogContext logContext
-    ) {
-        this.epoch = epoch;
-        this.votedKey = votedKey;
-        this.voters = voters;
-        this.highWatermark = highWatermark;
-        this.electionTimeoutMs = electionTimeoutMs;
-        this.electionTimer = time.timer(electionTimeoutMs);
-        this.log = logContext.logger(VotedState.class);
-    }
-
-    @Override
-    public ElectionState election() {
-        return ElectionState.withVotedCandidate(epoch, votedKey, voters);
-    }
-
-    public ReplicaKey votedKey() {
-        return votedKey;
-    }
-
-    @Override
-    public int epoch() {
-        return epoch;
-    }
-
-    @Override
-    public Endpoints leaderEndpoints() {
-        return Endpoints.empty();
-    }
-
-    @Override
-    public String name() {
-        return "Voted";
-    }
-
-    public long remainingElectionTimeMs(long currentTimeMs) {
-        electionTimer.update(currentTimeMs);
-        return electionTimer.remainingMs();
-    }
-
-    public boolean hasElectionTimeoutExpired(long currentTimeMs) {
-        electionTimer.update(currentTimeMs);
-        return electionTimer.isExpired();
-    }
-
-    @Override
-    public boolean canGrantVote(ReplicaKey candidateKey, boolean 
isLogUpToDate) {
-        if (votedKey.id() == candidateKey.id()) {
-            return !votedKey.directoryId().isPresent() || 
votedKey.directoryId().equals(candidateKey.directoryId());
-        }
-
-        log.debug(
-            "Rejecting vote request from candidate ({}), already have voted 
for another " +
-            "candidate ({}) in epoch {}",
-            candidateKey,
-            votedKey,
-            epoch
-        );
-
-        return false;
-    }
-
-    @Override
-    public Optional<LogOffsetMetadata> highWatermark() {
-        return highWatermark;
-    }
-
-    @Override
-    public String toString() {
-        return String.format(
-            "Voted(epoch=%d, votedKey=%s, voters=%s, electionTimeoutMs=%d, 
highWatermark=%s)",
-            epoch,
-            votedKey,
-            voters,
-            electionTimeoutMs,
-            highWatermark
-        );
-    }
-
-    @Override
-    public void close() {}
-}
diff --git 
a/raft/src/main/java/org/apache/kafka/raft/internals/KafkaRaftMetrics.java 
b/raft/src/main/java/org/apache/kafka/raft/internals/KafkaRaftMetrics.java
index 2b7617df359..49abcd0005f 100644
--- a/raft/src/main/java/org/apache/kafka/raft/internals/KafkaRaftMetrics.java
+++ b/raft/src/main/java/org/apache/kafka/raft/internals/KafkaRaftMetrics.java
@@ -72,7 +72,7 @@ public class KafkaRaftMetrics implements AutoCloseable {
                 return "leader";
             } else if (state.isCandidate()) {
                 return "candidate";
-            } else if (state.isVoted()) {
+            } else if (state.isUnattachedAndVoted()) {
                 return "voted";
             } else if (state.isFollower()) {
                 // a broker is special kind of follower, as not being a voter, 
it's an observer
@@ -95,8 +95,8 @@ public class KafkaRaftMetrics implements AutoCloseable {
             if (state.isLeader() || state.isCandidate()) {
                 return state.localIdOrThrow();
             } else {
-                return (double) state.maybeVotedState()
-                    .map(votedState -> votedState.votedKey().id())
+                return (double) state.maybeUnattachedState()
+                    .flatMap(votedState -> 
votedState.votedKey().map(ReplicaKey::id))
                     .orElse(-1);
             }
         });
@@ -110,8 +110,8 @@ public class KafkaRaftMetrics implements AutoCloseable {
             if (state.isLeader() || state.isCandidate()) {
                 return state.localDirectoryId().toString();
             } else {
-                return state.maybeVotedState()
-                    .flatMap(votedState -> votedState.votedKey().directoryId())
+                return state.maybeUnattachedState()
+                    .flatMap(votedState -> 
votedState.votedKey().flatMap(ReplicaKey::directoryId))
                     .orElse(Uuid.ZERO_UUID)
                     .toString();
             }
diff --git a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java 
b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
index e74e32fb79a..6ac11baf571 100644
--- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
@@ -210,7 +210,7 @@ public class KafkaRaftClientTest {
         context.client.poll();
 
         // We will first transition to unattached and then grant vote and then 
transition to voted
-        assertTrue(context.client.quorum().isVoted());
+        assertTrue(context.client.quorum().isUnattachedAndVoted());
         context.assertVotedCandidate(epoch + 1, remoteKey.id());
         context.assertSentVoteResponse(Errors.NONE, epoch + 1, 
OptionalInt.empty(), true);
     }
@@ -247,7 +247,7 @@ public class KafkaRaftClientTest {
         context.client.poll();
 
         // We will first transition to unattached and then grant vote and then 
transition to voted
-        assertTrue(context.client.quorum().isVoted());
+        assertTrue(context.client.quorum().isUnattachedAndVoted());
         context.assertVotedCandidate(epoch + 1, remoteKey.id());
         context.assertSentVoteResponse(Errors.NONE, epoch + 1, 
OptionalInt.empty(), true);
     }
@@ -284,7 +284,7 @@ public class KafkaRaftClientTest {
 
         // We will first transition to unattached and then grant vote and then 
transition to voted
         assertTrue(
-            context.client.quorum().isVoted(),
+            context.client.quorum().isUnattachedAndVoted(),
             "Local Id: " + localId +
             " Remote Id: " + remoteId +
             " Quorum local Id: " + context.client.quorum().localIdOrSentinel() 
+
@@ -1692,13 +1692,78 @@ public class KafkaRaftClientTest {
         context.assertSentFetchRequest(epoch, 1L, lastEpoch);
 
         context.time.sleep(context.fetchTimeoutMs);
-
         context.pollUntilRequest();
-
         context.assertSentVoteRequest(epoch + 1, lastEpoch, 1L, 1);
         context.assertVotedCandidate(epoch + 1, localId);
     }
 
+    @ParameterizedTest
+    @ValueSource(booleans = { true, false })
+    public void 
testFollowerAsObserverDoesNotBecomeCandidateAfterFetchTimeout(boolean 
withKip853Rpc) throws Exception {
+        int localId = randomReplicaId();
+        int otherNodeId = localId + 1;
+        int epoch = 5;
+        int lastEpoch = 3;
+        Set<Integer> voters = Utils.mkSet(otherNodeId);
+
+        RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
+            .withElectedLeader(epoch, otherNodeId)
+            .appendToLog(lastEpoch, singletonList("foo"))
+            .withKip853Rpc(withKip853Rpc)
+            .build();
+        context.assertElectedLeader(epoch, otherNodeId);
+
+        context.pollUntilRequest();
+        context.assertSentFetchRequest(epoch, 1L, lastEpoch);
+
+        context.time.sleep(context.fetchTimeoutMs);
+        context.pollUntilRequest();
+        assertTrue(context.client.quorum().isFollower());
+
+        // transitions to unattached
+        context.deliverRequest(context.voteRequest(epoch + 1, 
replicaKey(otherNodeId, withKip853Rpc), epoch, 1));
+        context.pollUntilResponse();
+        context.assertSentVoteResponse(Errors.NONE, epoch + 1, 
OptionalInt.empty(), true);
+        assertTrue(context.client.quorum().isUnattached());
+    }
+
+    @ParameterizedTest
+    @ValueSource(booleans = { true, false })
+    public void 
testUnattachedAsObserverDoesNotBecomeCandidateAfterElectionTimeout(boolean 
withKip853Rpc) throws Exception {
+        int localId = randomReplicaId();
+        int otherNodeId = localId + 1;
+        int epoch = 5;
+        Set<Integer> voters = Utils.mkSet(otherNodeId);
+
+        RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
+            .withUnknownLeader(epoch)
+            .withKip853Rpc(withKip853Rpc)
+            .build();
+
+        context.pollUntilRequest();
+        context.assertSentFetchRequest(epoch, 0L, 0);
+        assertTrue(context.client.quorum().isUnattached());
+
+        context.time.sleep(context.electionTimeoutMs() * 2);
+        context.pollUntilRequest();
+        assertTrue(context.client.quorum().isUnattached());
+        context.assertSentFetchRequest(epoch, 0L, 0);
+        // confirm no vote request was sent
+        assertEquals(0, context.channel.drainSendQueue().size());
+
+        context.deliverRequest(context.voteRequest(epoch + 1, 
replicaKey(otherNodeId, withKip853Rpc), epoch, 0));
+        context.pollUntilResponse();
+        // observer can vote
+        context.assertSentVoteResponse(Errors.NONE, epoch + 1, 
OptionalInt.empty(), true);
+
+        context.time.sleep(context.electionTimeoutMs() * 2);
+        context.pollUntilRequest();
+        // observer cannot transition to candidate though
+        assertTrue(context.client.quorum().isUnattached());
+        context.assertSentFetchRequest(epoch + 1, 0L, 0);
+        assertEquals(0, context.channel.drainSendQueue().size());
+    }
+
     @ParameterizedTest
     @ValueSource(booleans = { true, false })
     public void testInitializeObserverNoPreviousState(boolean withKip853Rpc) 
throws Exception {
@@ -2259,7 +2324,7 @@ public class KafkaRaftClientTest {
 
     @ParameterizedTest
     @ValueSource(booleans = { true, false })
-    public void testLeaderAcceptVoteFromNonVoter(boolean withKip853Rpc) throws 
Exception {
+    public void testLeaderAcceptVoteFromObserver(boolean withKip853Rpc) throws 
Exception {
         int localId = randomReplicaId();
         int otherNodeId = localId + 1;
         Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
@@ -2272,12 +2337,12 @@ public class KafkaRaftClientTest {
         context.becomeLeader();
         int epoch = context.currentEpoch();
 
-        ReplicaKey nonVoterKey = replicaKey(localId + 2, withKip853Rpc);
-        context.deliverRequest(context.voteRequest(epoch - 1, nonVoterKey, 0, 
0));
+        ReplicaKey observerKey = replicaKey(localId + 2, withKip853Rpc);
+        context.deliverRequest(context.voteRequest(epoch - 1, observerKey, 0, 
0));
         context.client.poll();
         context.assertSentVoteResponse(Errors.FENCED_LEADER_EPOCH, epoch, 
OptionalInt.of(localId), false);
 
-        context.deliverRequest(context.voteRequest(epoch, nonVoterKey, 0, 0));
+        context.deliverRequest(context.voteRequest(epoch, observerKey, 0, 0));
         context.client.poll();
         context.assertSentVoteResponse(Errors.NONE, epoch, 
OptionalInt.of(localId), false);
     }
diff --git a/raft/src/test/java/org/apache/kafka/raft/QuorumStateTest.java 
b/raft/src/test/java/org/apache/kafka/raft/QuorumStateTest.java
index 77f7234d337..a22cf16123e 100644
--- a/raft/src/test/java/org/apache/kafka/raft/QuorumStateTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/QuorumStateTest.java
@@ -141,6 +141,16 @@ public class QuorumStateTest {
         );
     }
 
+    private VoterSet withRemoteVoterSet(IntStream remoteIds, KRaftVersion 
kraftVersion) {
+        boolean withDirectoryid = kraftVersion.featureLevel() > 0;
+
+        Stream<ReplicaKey> remoteKeys = remoteIds
+            .boxed()
+            .map(id -> replicaKey(id, withDirectoryid));
+
+        return VoterSetTest.voterSet(remoteKeys);
+    }
+
     private ReplicaKey replicaKey(int id, boolean withDirectoryId) {
         Uuid directoryId = withDirectoryId ? Uuid.randomUuid() : 
ReplicaKey.NO_DIRECTORY_ID;
         return ReplicaKey.of(id, directoryId);
@@ -176,7 +186,7 @@ public class QuorumStateTest {
         QuorumState state = buildQuorumState(OptionalInt.of(localId), voters, 
kraftVersion);
         state.initialize(new OffsetAndEpoch(0L, 0));
 
-        assertTrue(state.isUnattached());
+        assertTrue(state.isUnattachedNotVoted());
         UnattachedState unattachedState = state.unattachedStateOrThrow();
         assertEquals(epoch, unattachedState.epoch());
         assertEquals(
@@ -242,12 +252,12 @@ public class QuorumStateTest {
 
         QuorumState state = buildQuorumState(OptionalInt.of(localId), voters, 
kraftVersion);
         state.initialize(new OffsetAndEpoch(0L, logEndEpoch));
-        assertTrue(state.isVoted());
+        assertTrue(state.isUnattachedAndVoted());
         assertEquals(epoch, state.epoch());
 
-        VotedState votedState = state.votedStateOrThrow();
+        UnattachedState votedState = state.unattachedStateOrThrow();
         assertEquals(epoch, votedState.epoch());
-        assertEquals(persistedVotedKey(nodeKey1, kraftVersion), 
votedState.votedKey());
+        assertEquals(persistedVotedKey(nodeKey1, kraftVersion), 
votedState.votedKey().get());
 
         assertEquals(
             electionTimeoutMs + jitterMs,
@@ -472,19 +482,19 @@ public class QuorumStateTest {
 
     @ParameterizedTest
     @EnumSource(value = KRaftVersion.class)
-    public void testCandidateToVoted(KRaftVersion kraftVersion) {
+    public void testCandidateToUnattachedVoted(KRaftVersion kraftVersion) {
         ReplicaKey otherNodeKey = ReplicaKey.of(1, Uuid.randomUuid());
         VoterSet voters = VoterSetTest.voterSet(Stream.of(localVoterKey, 
otherNodeKey));
         QuorumState state = initializeEmptyState(voters, kraftVersion);
         state.initialize(new OffsetAndEpoch(0L, logEndEpoch));
         state.transitionToCandidate();
 
-        state.transitionToVoted(5, otherNodeKey);
+        state.transitionToUnattachedVotedState(5, otherNodeKey);
         assertEquals(5, state.epoch());
         assertEquals(OptionalInt.empty(), state.leaderId());
 
-        VotedState followerState = state.votedStateOrThrow();
-        assertEquals(otherNodeKey, followerState.votedKey());
+        UnattachedState votedState = state.unattachedStateOrThrow();
+        assertEquals(otherNodeKey, votedState.votedKey().get());
 
         assertEquals(
             Optional.of(
@@ -507,7 +517,7 @@ public class QuorumStateTest {
         state.transitionToUnattached(5);
         state.transitionToCandidate();
         assertThrows(IllegalStateException.class, () -> 
state.transitionToUnattached(4));
-        assertThrows(IllegalStateException.class, () -> 
state.transitionToVoted(4, otherNodeKey));
+        assertThrows(IllegalStateException.class, () -> 
state.transitionToUnattachedVotedState(4, otherNodeKey));
         assertThrows(
             IllegalStateException.class,
             () -> state.transitionToFollower(
@@ -642,7 +652,7 @@ public class QuorumStateTest {
 
     @ParameterizedTest
     @EnumSource(value = KRaftVersion.class)
-    public void testLeaderToVoted(KRaftVersion kraftVersion) {
+    public void testLeaderToUnattachedVoted(KRaftVersion kraftVersion) {
         ReplicaKey otherNodeKey = ReplicaKey.of(1, Uuid.randomUuid());
         VoterSet voters = VoterSetTest.voterSet(Stream.of(localVoterKey, 
otherNodeKey));
         QuorumState state = initializeEmptyState(voters, kraftVersion);
@@ -650,13 +660,13 @@ public class QuorumStateTest {
         state.transitionToCandidate();
         state.candidateStateOrThrow().recordGrantedVote(otherNodeKey.id());
         state.transitionToLeader(0L, accumulator);
-        state.transitionToVoted(5, otherNodeKey);
+        state.transitionToUnattachedVotedState(5, otherNodeKey);
 
         assertEquals(5, state.epoch());
         assertEquals(OptionalInt.empty(), state.leaderId());
 
-        VotedState votedState = state.votedStateOrThrow();
-        assertEquals(otherNodeKey, votedState.votedKey());
+        UnattachedState votedState = state.unattachedStateOrThrow();
+        assertEquals(otherNodeKey, votedState.votedKey().get());
 
         assertEquals(
             Optional.of(
@@ -682,7 +692,7 @@ public class QuorumStateTest {
         state.candidateStateOrThrow().recordGrantedVote(otherNodeKey.id());
         state.transitionToLeader(0L, accumulator);
         assertThrows(IllegalStateException.class, () -> 
state.transitionToUnattached(4));
-        assertThrows(IllegalStateException.class, () -> 
state.transitionToVoted(4, otherNodeKey));
+        assertThrows(IllegalStateException.class, () -> 
state.transitionToUnattachedVotedState(4, otherNodeKey));
         assertThrows(
             IllegalStateException.class,
             () -> state.transitionToFollower(
@@ -719,7 +729,7 @@ public class QuorumStateTest {
                 voters.listeners(localId)
             )
         );
-        assertThrows(IllegalStateException.class, () -> 
state.transitionToVoted(0, localVoterKey));
+        assertThrows(IllegalStateException.class, () -> 
state.transitionToUnattachedVotedState(0, localVoterKey));
     }
 
     @ParameterizedTest
@@ -741,7 +751,7 @@ public class QuorumStateTest {
 
     @ParameterizedTest
     @EnumSource(value = KRaftVersion.class)
-    public void testUnattachedToVotedSameEpoch(KRaftVersion kraftVersion) {
+    public void testUnattachedVotedSameEpoch(KRaftVersion kraftVersion) {
         ReplicaKey otherNodeKey = ReplicaKey.of(1, Uuid.randomUuid());
         VoterSet voters = VoterSetTest.voterSet(Stream.of(localVoterKey, 
otherNodeKey));
         QuorumState state = initializeEmptyState(voters, kraftVersion);
@@ -750,11 +760,11 @@ public class QuorumStateTest {
 
         int jitterMs = 2500;
         random.mockNextInt(electionTimeoutMs, jitterMs);
-        state.transitionToVoted(5, otherNodeKey);
+        state.transitionToUnattachedVotedState(5, otherNodeKey);
 
-        VotedState votedState = state.votedStateOrThrow();
+        UnattachedState votedState = state.unattachedStateOrThrow();
         assertEquals(5, votedState.epoch());
-        assertEquals(otherNodeKey, votedState.votedKey());
+        assertEquals(otherNodeKey, votedState.votedKey().get());
 
         assertEquals(
             Optional.of(
@@ -774,17 +784,20 @@ public class QuorumStateTest {
 
     @ParameterizedTest
     @EnumSource(value = KRaftVersion.class)
-    public void testUnattachedToVotedHigherEpoch(KRaftVersion kraftVersion) {
+    public void testUnattachedVotedHigherEpoch(KRaftVersion kraftVersion) {
         ReplicaKey otherNodeKey = ReplicaKey.of(1, Uuid.randomUuid());
         VoterSet voters = VoterSetTest.voterSet(Stream.of(localVoterKey, 
otherNodeKey));
         QuorumState state = initializeEmptyState(voters, kraftVersion);
         state.initialize(new OffsetAndEpoch(0L, logEndEpoch));
         state.transitionToUnattached(5);
-        state.transitionToVoted(8, otherNodeKey);
+        assertTrue(state.isUnattachedNotVoted());
 
-        VotedState votedState = state.votedStateOrThrow();
+        state.transitionToUnattachedVotedState(8, otherNodeKey);
+        assertTrue(state.isUnattachedAndVoted());
+
+        UnattachedState votedState = state.unattachedStateOrThrow();
         assertEquals(8, votedState.epoch());
-        assertEquals(otherNodeKey, votedState.votedKey());
+        assertEquals(otherNodeKey, votedState.votedKey().get());
 
         assertEquals(
             Optional.of(
@@ -826,11 +839,13 @@ public class QuorumStateTest {
         QuorumState state = initializeEmptyState(voters, kraftVersion);
         state.initialize(new OffsetAndEpoch(0L, logEndEpoch));
         state.transitionToUnattached(5);
+        assertTrue(state.isUnattachedNotVoted());
 
         long remainingElectionTimeMs = 
state.unattachedStateOrThrow().remainingElectionTimeMs(time.milliseconds());
         time.sleep(1000);
 
         state.transitionToUnattached(6);
+        assertTrue(state.isUnattachedNotVoted());
         UnattachedState unattachedState = state.unattachedStateOrThrow();
         assertEquals(6, unattachedState.epoch());
 
@@ -896,7 +911,7 @@ public class QuorumStateTest {
         state.initialize(new OffsetAndEpoch(0L, logEndEpoch));
         state.transitionToUnattached(5);
         assertThrows(IllegalStateException.class, () -> 
state.transitionToUnattached(4));
-        assertThrows(IllegalStateException.class, () -> 
state.transitionToVoted(4, otherNodeKey));
+        assertThrows(IllegalStateException.class, () -> 
state.transitionToUnattachedVotedState(4, otherNodeKey));
         assertThrows(
             IllegalStateException.class,
             () -> state.transitionToFollower(
@@ -919,26 +934,26 @@ public class QuorumStateTest {
 
     @ParameterizedTest
     @EnumSource(value = KRaftVersion.class)
-    public void testVotedToInvalidLeaderOrResigned(KRaftVersion kraftVersion) {
+    public void testUnattachedVotedToInvalidLeaderOrResigned(KRaftVersion 
kraftVersion) {
         int node1 = 1;
         int node2 = 2;
         VoterSet voters = localWithRemoteVoterSet(IntStream.of(node1, node2), 
kraftVersion);
         QuorumState state = initializeEmptyState(voters, kraftVersion);
         state.initialize(new OffsetAndEpoch(0L, logEndEpoch));
-        state.transitionToVoted(5, ReplicaKey.of(node1, 
ReplicaKey.NO_DIRECTORY_ID));
+        state.transitionToUnattachedVotedState(5, ReplicaKey.of(node1, 
ReplicaKey.NO_DIRECTORY_ID));
         assertThrows(IllegalStateException.class, () -> 
state.transitionToLeader(0, accumulator));
         assertThrows(IllegalStateException.class, () -> 
state.transitionToResigned(Collections.emptyList()));
     }
 
     @ParameterizedTest
     @EnumSource(value = KRaftVersion.class)
-    public void testVotedToCandidate(KRaftVersion kraftVersion) {
+    public void testUnattachedVotedToCandidate(KRaftVersion kraftVersion) {
         int node1 = 1;
         int node2 = 2;
         VoterSet voters = localWithRemoteVoterSet(IntStream.of(node1, node2), 
kraftVersion);
         QuorumState state = initializeEmptyState(voters, kraftVersion);
         state.initialize(new OffsetAndEpoch(0L, logEndEpoch));
-        state.transitionToVoted(5, ReplicaKey.of(node1, 
ReplicaKey.NO_DIRECTORY_ID));
+        state.transitionToUnattachedVotedState(5, ReplicaKey.of(node1, 
ReplicaKey.NO_DIRECTORY_ID));
 
         int jitterMs = 2500;
         random.mockNextInt(electionTimeoutMs, jitterMs);
@@ -952,33 +967,47 @@ public class QuorumStateTest {
 
     @ParameterizedTest
     @EnumSource(value = KRaftVersion.class)
-    public void testVotedToVotedSameEpoch(KRaftVersion kraftVersion) {
+    public void testObserverFromUnattachedVotedToCandidate(KRaftVersion 
kraftVersion) {
+        int voter1 = 1;
+        int voter2 = 2;
+        VoterSet voters = withRemoteVoterSet(IntStream.of(voter1, voter2), 
kraftVersion);
+        QuorumState state = initializeEmptyState(voters, kraftVersion);
+        state.initialize(new OffsetAndEpoch(0L, logEndEpoch));
+        state.transitionToUnattachedVotedState(5, ReplicaKey.of(voter1, 
ReplicaKey.NO_DIRECTORY_ID));
+
+        assertThrows(IllegalStateException.class, () -> 
state.transitionToCandidate());
+        assertTrue(state.isUnattached());
+    }
+
+    @ParameterizedTest
+    @EnumSource(value = KRaftVersion.class)
+    public void testUnattachedVotedToUnattachedVotedSameEpoch(KRaftVersion 
kraftVersion) {
         int node1 = 1;
         int node2 = 2;
         VoterSet voters = localWithRemoteVoterSet(IntStream.of(node1, node2), 
kraftVersion);
         QuorumState state = initializeEmptyState(voters, kraftVersion);
         state.initialize(new OffsetAndEpoch(0L, logEndEpoch));
         state.transitionToUnattached(5);
-        state.transitionToVoted(8, ReplicaKey.of(node1, Uuid.randomUuid()));
+        state.transitionToUnattachedVotedState(8, ReplicaKey.of(node1, 
Uuid.randomUuid()));
         assertThrows(
             IllegalStateException.class,
-            () -> state.transitionToVoted(8, ReplicaKey.of(node1, 
ReplicaKey.NO_DIRECTORY_ID))
+            () -> state.transitionToUnattachedVotedState(8, 
ReplicaKey.of(node1, ReplicaKey.NO_DIRECTORY_ID))
         );
         assertThrows(
             IllegalStateException.class,
-            () -> state.transitionToVoted(8, ReplicaKey.of(node2, 
ReplicaKey.NO_DIRECTORY_ID))
+            () -> state.transitionToUnattachedVotedState(8, 
ReplicaKey.of(node2, ReplicaKey.NO_DIRECTORY_ID))
         );
     }
 
     @ParameterizedTest
     @EnumSource(value = KRaftVersion.class)
-    public void testVotedToFollowerSameEpoch(KRaftVersion kraftVersion) {
+    public void testUnattachedVotedToFollowerSameEpoch(KRaftVersion 
kraftVersion) {
         int node1 = 1;
         int node2 = 2;
         VoterSet voters = localWithRemoteVoterSet(IntStream.of(node1, node2), 
kraftVersion);
         QuorumState state = initializeEmptyState(voters, kraftVersion);
         state.initialize(new OffsetAndEpoch(0L, logEndEpoch));
-        state.transitionToVoted(5, ReplicaKey.of(node1, 
ReplicaKey.NO_DIRECTORY_ID));
+        state.transitionToUnattachedVotedState(5, ReplicaKey.of(node1, 
ReplicaKey.NO_DIRECTORY_ID));
         state.transitionToFollower(
             5,
             node2,
@@ -1005,13 +1034,13 @@ public class QuorumStateTest {
 
     @ParameterizedTest
     @EnumSource(value = KRaftVersion.class)
-    public void testVotedToFollowerHigherEpoch(KRaftVersion kraftVersion) {
+    public void testUnattachedVotedToFollowerHigherEpoch(KRaftVersion 
kraftVersion) {
         int node1 = 1;
         int node2 = 2;
         VoterSet voters = localWithRemoteVoterSet(IntStream.of(node1, node2), 
kraftVersion);
         QuorumState state = initializeEmptyState(voters, kraftVersion);
         state.initialize(new OffsetAndEpoch(0L, logEndEpoch));
-        state.transitionToVoted(5, ReplicaKey.of(node1, 
ReplicaKey.NO_DIRECTORY_ID));
+        state.transitionToUnattachedVotedState(5, ReplicaKey.of(node1, 
ReplicaKey.NO_DIRECTORY_ID));
         state.transitionToFollower(
             8,
             node2,
@@ -1038,26 +1067,26 @@ public class QuorumStateTest {
 
     @ParameterizedTest
     @EnumSource(value = KRaftVersion.class)
-    public void testVotedToUnattachedSameEpoch(KRaftVersion kraftVersion) {
+    public void testUnattachedVotedToUnattachedSameEpoch(KRaftVersion 
kraftVersion) {
         int node1 = 1;
         int node2 = 2;
         VoterSet voters = localWithRemoteVoterSet(IntStream.of(node1, node2), 
kraftVersion);
         QuorumState state = initializeEmptyState(voters, kraftVersion);
         state.initialize(new OffsetAndEpoch(0L, logEndEpoch));
-        state.transitionToVoted(5, ReplicaKey.of(node1, 
ReplicaKey.NO_DIRECTORY_ID));
+        state.transitionToUnattachedVotedState(5, ReplicaKey.of(node1, 
ReplicaKey.NO_DIRECTORY_ID));
         assertThrows(IllegalStateException.class, () -> 
state.transitionToUnattached(5));
     }
 
     @ParameterizedTest
     @EnumSource(value = KRaftVersion.class)
-    public void testVotedToUnattachedHigherEpoch(KRaftVersion kraftVersion) {
+    public void testUnattachedVotedToUnattachedHigherEpoch(KRaftVersion 
kraftVersion) {
         int otherNodeId = 1;
         VoterSet voters = localWithRemoteVoterSet(IntStream.of(otherNodeId), 
kraftVersion);
         QuorumState state = initializeEmptyState(voters, kraftVersion);
         state.initialize(new OffsetAndEpoch(0L, logEndEpoch));
-        state.transitionToVoted(5, ReplicaKey.of(otherNodeId, 
ReplicaKey.NO_DIRECTORY_ID));
+        state.transitionToUnattachedVotedState(5, ReplicaKey.of(otherNodeId, 
ReplicaKey.NO_DIRECTORY_ID));
 
-        long remainingElectionTimeMs = 
state.votedStateOrThrow().remainingElectionTimeMs(time.milliseconds());
+        long remainingElectionTimeMs = 
state.unattachedStateOrThrow().remainingElectionTimeMs(time.milliseconds());
         time.sleep(1000);
 
         state.transitionToUnattached(6);
@@ -1071,14 +1100,14 @@ public class QuorumStateTest {
 
     @ParameterizedTest
     @EnumSource(value = KRaftVersion.class)
-    public void testVotedToAnyStateLowerEpoch(KRaftVersion kraftVersion) {
+    public void testUnattachedVotedToAnyStateLowerEpoch(KRaftVersion 
kraftVersion) {
         ReplicaKey otherNodeKey = ReplicaKey.of(1, Uuid.randomUuid());
         VoterSet voters = VoterSetTest.voterSet(Stream.of(localVoterKey, 
otherNodeKey));
         QuorumState state = initializeEmptyState(voters, kraftVersion);
         state.initialize(new OffsetAndEpoch(0L, logEndEpoch));
-        state.transitionToVoted(5, otherNodeKey);
+        state.transitionToUnattachedVotedState(5, otherNodeKey);
         assertThrows(IllegalStateException.class, () -> 
state.transitionToUnattached(4));
-        assertThrows(IllegalStateException.class, () -> 
state.transitionToVoted(4, otherNodeKey));
+        assertThrows(IllegalStateException.class, () -> 
state.transitionToUnattachedVotedState(4, otherNodeKey));
         assertThrows(
             IllegalStateException.class,
             () -> state.transitionToFollower(
@@ -1100,6 +1129,43 @@ public class QuorumStateTest {
         );
     }
 
+    @ParameterizedTest
+    @EnumSource(value = KRaftVersion.class)
+    public void testAllStatesToUnattachedFailInSameEpoch(KRaftVersion 
kraftVersion) {
+        ReplicaKey voter1 = ReplicaKey.of(1, Uuid.randomUuid());
+        ReplicaKey voter2 = ReplicaKey.of(2, Uuid.randomUuid());
+        VoterSet voters = VoterSetTest.voterSet(Stream.of(localVoterKey, 
voter1, voter2));
+        QuorumState state = initializeEmptyState(voters, kraftVersion);
+        state.initialize(new OffsetAndEpoch(0L, logEndEpoch));
+
+        // unattached to unattached
+        state.unattachedStateOrThrow();
+        state.transitionToUnattachedVotedState(5, voter1);
+        // cannot vote for same or different node in same epoch
+        assertThrows(IllegalStateException.class, () -> 
state.transitionToUnattachedVotedState(5, voter1));
+        assertThrows(IllegalStateException.class, () -> 
state.transitionToUnattachedVotedState(5, voter2));
+        // can vote for same or different node in larger epoch
+        state.transitionToUnattachedVotedState(10, voter1);
+        state.transitionToUnattachedVotedState(15, voter2);
+
+        // follower to unattached
+        state.transitionToFollower(20, voter1.id(), 
voters.listeners(voter1.id()));
+        assertThrows(IllegalStateException.class, () -> 
state.transitionToUnattachedVotedState(state.epoch(), voter1));
+        state.transitionToUnattachedVotedState(state.epoch() + 1, voter1);
+
+        // candidate
+        state.transitionToCandidate();
+        assertThrows(IllegalStateException.class, () -> 
state.transitionToUnattachedVotedState(state.epoch(), voter1));
+        state.transitionToUnattachedVotedState(state.epoch() + 1, voter1);
+
+        // leader
+        state.transitionToCandidate();
+        state.candidateStateOrThrow().recordGrantedVote(voter1.id());
+        state.transitionToLeader(0L, accumulator);
+        assertThrows(IllegalStateException.class, () -> 
state.transitionToUnattachedVotedState(state.epoch(), voter1));
+        state.transitionToUnattachedVotedState(state.epoch() + 1, voter1);
+    }
+
     @ParameterizedTest
     @EnumSource(value = KRaftVersion.class)
     public void testFollowerToFollowerSameEpoch(KRaftVersion kraftVersion) {
@@ -1268,7 +1334,7 @@ public class QuorumStateTest {
 
     @ParameterizedTest
     @EnumSource(value = KRaftVersion.class)
-    public void testFollowerToVotedSameEpoch(KRaftVersion kraftVersion) {
+    public void testFollowerToUnattachedVotedSameEpoch(KRaftVersion 
kraftVersion) {
         int node1 = 1;
         int node2 = 2;
         VoterSet voters = localWithRemoteVoterSet(IntStream.of(node1, node2), 
kraftVersion);
@@ -1282,21 +1348,21 @@ public class QuorumStateTest {
 
         assertThrows(
             IllegalStateException.class,
-            () -> state.transitionToVoted(8, ReplicaKey.of(node1, 
ReplicaKey.NO_DIRECTORY_ID))
+            () -> state.transitionToUnattachedVotedState(8, 
ReplicaKey.of(node1, ReplicaKey.NO_DIRECTORY_ID))
         );
         assertThrows(
             IllegalStateException.class,
-            () -> state.transitionToVoted(8, ReplicaKey.of(localId, 
ReplicaKey.NO_DIRECTORY_ID))
+            () -> state.transitionToUnattachedVotedState(8, 
ReplicaKey.of(localId, ReplicaKey.NO_DIRECTORY_ID))
         );
         assertThrows(
             IllegalStateException.class,
-            () -> state.transitionToVoted(8, ReplicaKey.of(node2, 
ReplicaKey.NO_DIRECTORY_ID))
+            () -> state.transitionToUnattachedVotedState(8, 
ReplicaKey.of(node2, ReplicaKey.NO_DIRECTORY_ID))
         );
     }
 
     @ParameterizedTest
     @EnumSource(value = KRaftVersion.class)
-    public void testFollowerToVotedHigherEpoch(KRaftVersion kraftVersion) {
+    public void testFollowerToUnattachedVotedHigherEpoch(KRaftVersion 
kraftVersion) {
         ReplicaKey nodeKey1 = ReplicaKey.of(1, Uuid.randomUuid());
         ReplicaKey nodeKey2 = ReplicaKey.of(2, Uuid.randomUuid());
 
@@ -1312,12 +1378,12 @@ public class QuorumStateTest {
         int jitterMs = 2500;
         random.mockNextInt(electionTimeoutMs, jitterMs);
 
-        state.transitionToVoted(9, nodeKey1);
-        assertTrue(state.isVoted());
+        state.transitionToUnattachedVotedState(9, nodeKey1);
+        assertTrue(state.isUnattachedAndVoted());
 
-        VotedState votedState = state.votedStateOrThrow();
+        UnattachedState votedState = state.unattachedStateOrThrow();
         assertEquals(9, votedState.epoch());
-        assertEquals(nodeKey1, votedState.votedKey());
+        assertEquals(nodeKey1, votedState.votedKey().get());
 
         assertEquals(electionTimeoutMs + jitterMs,
             votedState.remainingElectionTimeMs(time.milliseconds()));
@@ -1338,7 +1404,7 @@ public class QuorumStateTest {
         assertThrows(IllegalStateException.class, () -> 
state.transitionToUnattached(4));
         assertThrows(
             IllegalStateException.class,
-            () -> state.transitionToVoted(4, ReplicaKey.of(otherNodeId, 
ReplicaKey.NO_DIRECTORY_ID))
+            () -> state.transitionToUnattachedVotedState(4, 
ReplicaKey.of(otherNodeId, ReplicaKey.NO_DIRECTORY_ID))
         );
         assertThrows(
             IllegalStateException.class,
@@ -1370,13 +1436,13 @@ public class QuorumStateTest {
         QuorumState state = initializeEmptyState(voters, kraftVersion);
         state.initialize(new OffsetAndEpoch(0L, logEndEpoch));
 
-        // Transition to voted
-        state.transitionToVoted(4, nonVoterKey);
-        assertTrue(state.isVoted());
+        // Add voted state
+        state.transitionToUnattachedVotedState(4, nonVoterKey);
+        assertTrue(state.isUnattachedAndVoted());
 
-        VotedState votedState = state.votedStateOrThrow();
+        UnattachedState votedState = state.unattachedStateOrThrow();
         assertEquals(4, votedState.epoch());
-        assertEquals(nonVoterKey, votedState.votedKey());
+        assertEquals(nonVoterKey, votedState.votedKey().get());
 
         // Transition to follower
         state.transitionToFollower(
@@ -1420,12 +1486,12 @@ public class QuorumStateTest {
         state.initialize(new OffsetAndEpoch(0L, logEndEpoch));
         assertTrue(state.isObserver());
 
-        state.transitionToVoted(5, otherNodeKey);
-        assertTrue(state.isVoted());
+        state.transitionToUnattachedVotedState(5, otherNodeKey);
+        assertTrue(state.isUnattachedAndVoted());
 
-        VotedState votedState = state.votedStateOrThrow();
+        UnattachedState votedState = state.unattachedStateOrThrow();
         assertEquals(5, votedState.epoch());
-        assertEquals(otherNodeKey, votedState.votedKey());
+        assertEquals(otherNodeKey, votedState.votedKey().get());
     }
 
     @ParameterizedTest
@@ -1518,7 +1584,7 @@ public class QuorumStateTest {
         state.transitionToUnattached(state.epoch() + 1);
         assertFalse(state.hasRemoteLeader());
 
-        state.transitionToVoted(state.epoch() + 1, otherNodeKey);
+        state.transitionToUnattachedVotedState(state.epoch() + 1, 
otherNodeKey);
         assertFalse(state.hasRemoteLeader());
 
         state.transitionToFollower(
@@ -1551,7 +1617,7 @@ public class QuorumStateTest {
         state.transitionToUnattached(6);
         assertEquals(highWatermark, state.highWatermark());
 
-        state.transitionToVoted(7, otherNodeKey);
+        state.transitionToUnattachedVotedState(7, otherNodeKey);
         assertEquals(highWatermark, state.highWatermark());
 
         state.transitionToCandidate();
@@ -1581,7 +1647,7 @@ public class QuorumStateTest {
         assertThrows(IllegalStateException.class, 
state::transitionToCandidate);
         assertThrows(
             IllegalStateException.class,
-            () -> state.transitionToVoted(1, ReplicaKey.of(1, 
ReplicaKey.NO_DIRECTORY_ID))
+            () -> state.transitionToUnattachedVotedState(1, ReplicaKey.of(1, 
ReplicaKey.NO_DIRECTORY_ID))
         );
         assertThrows(IllegalStateException.class, () -> 
state.transitionToLeader(0L, accumulator));
 
diff --git a/raft/src/test/java/org/apache/kafka/raft/UnattachedStateTest.java 
b/raft/src/test/java/org/apache/kafka/raft/UnattachedStateTest.java
index 38033c14ef0..113b24c8a6b 100644
--- a/raft/src/test/java/org/apache/kafka/raft/UnattachedStateTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/UnattachedStateTest.java
@@ -48,6 +48,7 @@ public class UnattachedStateTest {
             time,
             epoch,
             leaderId,
+            Optional.empty(),
             voters,
             Optional.empty(),
             electionTimeoutMs,
diff --git a/raft/src/test/java/org/apache/kafka/raft/VotedStateTest.java 
b/raft/src/test/java/org/apache/kafka/raft/UnattachedStateWithVoteTest.java
similarity index 87%
rename from raft/src/test/java/org/apache/kafka/raft/VotedStateTest.java
rename to 
raft/src/test/java/org/apache/kafka/raft/UnattachedStateWithVoteTest.java
index b8a184917a1..fdeded219fc 100644
--- a/raft/src/test/java/org/apache/kafka/raft/VotedStateTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/UnattachedStateWithVoteTest.java
@@ -27,6 +27,7 @@ import org.junit.jupiter.params.provider.ValueSource;
 
 import java.util.Collections;
 import java.util.Optional;
+import java.util.OptionalInt;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -40,13 +41,14 @@ class VotedStateTest {
     private final int votedId = 1;
     private final int electionTimeoutMs = 10000;
 
-    private VotedState newVotedState(
+    private UnattachedState newUnattachedVotedState(
         Uuid votedDirectoryId
     ) {
-        return new VotedState(
+        return new UnattachedState(
             time,
             epoch,
-            ReplicaKey.of(votedId, votedDirectoryId),
+            OptionalInt.empty(),
+            Optional.of(ReplicaKey.of(votedId, votedDirectoryId)),
             Collections.emptySet(),
             Optional.empty(),
             electionTimeoutMs,
@@ -56,11 +58,11 @@ class VotedStateTest {
 
     @Test
     public void testElectionTimeout() {
-        VotedState state = newVotedState(ReplicaKey.NO_DIRECTORY_ID);
+        UnattachedState state = 
newUnattachedVotedState(ReplicaKey.NO_DIRECTORY_ID);
         ReplicaKey votedKey  = ReplicaKey.of(votedId, 
ReplicaKey.NO_DIRECTORY_ID);
 
         assertEquals(epoch, state.epoch());
-        assertEquals(votedKey, state.votedKey());
+        assertEquals(votedKey, state.votedKey().get());
         assertEquals(
             ElectionState.withVotedCandidate(epoch, votedKey, 
Collections.emptySet()),
             state.election()
@@ -80,7 +82,7 @@ class VotedStateTest {
     @ParameterizedTest
     @ValueSource(booleans = {true, false})
     public void testCanGrantVoteWithoutDirectoryId(boolean isLogUpToDate) {
-        VotedState state = newVotedState(ReplicaKey.NO_DIRECTORY_ID);
+        UnattachedState state = 
newUnattachedVotedState(ReplicaKey.NO_DIRECTORY_ID);
 
         assertTrue(
             state.canGrantVote(ReplicaKey.of(votedId, 
ReplicaKey.NO_DIRECTORY_ID), isLogUpToDate)
@@ -100,7 +102,7 @@ class VotedStateTest {
     @Test
     void testCanGrantVoteWithDirectoryId() {
         Uuid votedDirectoryId = Uuid.randomUuid();
-        VotedState state = newVotedState(votedDirectoryId);
+        UnattachedState state = newUnattachedVotedState(votedDirectoryId);
 
         assertTrue(state.canGrantVote(ReplicaKey.of(votedId, 
votedDirectoryId), false));
 
@@ -116,7 +118,7 @@ class VotedStateTest {
     @Test
     void testLeaderEndpoints() {
         Uuid votedDirectoryId = Uuid.randomUuid();
-        VotedState state = newVotedState(votedDirectoryId);
+        UnattachedState state = newUnattachedVotedState(votedDirectoryId);
 
         assertEquals(Endpoints.empty(), state.leaderEndpoints());
     }
diff --git 
a/raft/src/test/java/org/apache/kafka/raft/internals/KafkaRaftMetricsTest.java 
b/raft/src/test/java/org/apache/kafka/raft/internals/KafkaRaftMetricsTest.java
index bd65bbe993f..3e399de3c22 100644
--- 
a/raft/src/test/java/org/apache/kafka/raft/internals/KafkaRaftMetricsTest.java
+++ 
b/raft/src/test/java/org/apache/kafka/raft/internals/KafkaRaftMetricsTest.java
@@ -183,7 +183,7 @@ public class KafkaRaftMetricsTest {
         state.followerStateOrThrow().updateHighWatermark(OptionalLong.of(10L));
         assertEquals((double) 10L, getMetric(metrics, 
"high-watermark").metricValue());
 
-        state.transitionToVoted(3, ReplicaKey.of(2, 
ReplicaKey.NO_DIRECTORY_ID));
+        state.transitionToUnattachedVotedState(3, ReplicaKey.of(2, 
ReplicaKey.NO_DIRECTORY_ID));
         assertEquals("voted", getMetric(metrics, 
"current-state").metricValue());
         assertEquals((double) -1, getMetric(metrics, 
"current-leader").metricValue());
         assertEquals((double) 2, getMetric(metrics, 
"current-vote").metricValue());

Reply via email to