This is an automated email from the ASF dual-hosted git repository.

jsancio pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new e1bf1552709 KAFKA-17055; Use random replica ids in kraft protocol tests
e1bf1552709 is described below

commit e1bf155270926ce236991cef71a6d100a5dd8432
Author: Mason Chen <mason.yunc...@gmail.com>
AuthorDate: Wed Jul 17 03:33:32 2024 +1200

    KAFKA-17055; Use random replica ids in kraft protocol tests
    
    All of the tests in KafkakRaftClientTest and KafkaRaftClientSnapshotTest 
use well known ids like 0, 1, etc. Because of this those tests were not able to 
catch a bug in the BeginQuorumEpoch schema were the default value for VoterId 
was 0 instead of -1.
    
    Improve those tests by using random valid replica id to lower the 
probability that the replica id will match the default value of the schema.
    
    Reviewers: José Armando García Sancio <jsan...@apache.org>
---
 .../kafka/raft/KafkaRaftClientSnapshotTest.java    | 155 ++++---
 .../org/apache/kafka/raft/KafkaRaftClientTest.java | 506 +++++++++++----------
 .../apache/kafka/raft/RaftClientTestContext.java   |  36 +-
 3 files changed, 370 insertions(+), 327 deletions(-)

diff --git 
a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java 
b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java
index 11fefd347ca..dac88b1b9cc 100644
--- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java
@@ -52,6 +52,7 @@ import java.util.Optional;
 import java.util.OptionalInt;
 import java.util.OptionalLong;
 import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Stream;
 
@@ -64,7 +65,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 public final class KafkaRaftClientSnapshotTest {
     @Test
     public void testLatestSnapshotId() throws Exception {
-        int localId = 0;
+        int localId = randomReplicaId();
         int leaderId = localId + 1;
         Set<Integer> voters = Utils.mkSet(localId, leaderId);
         int epoch = 2;
@@ -82,7 +83,7 @@ public final class KafkaRaftClientSnapshotTest {
 
     @Test
     public void testLatestSnapshotIdMissing() throws Exception {
-        int localId = 0;
+        int localId = randomReplicaId();
         int leaderId = localId + 1;
         Set<Integer> voters = Utils.mkSet(localId, leaderId);
         int epoch = 2;
@@ -98,9 +99,9 @@ public final class KafkaRaftClientSnapshotTest {
     }
 
     @ParameterizedTest
-    @CsvSource({"false,false", "false,true", "true,false", "true,true"})
+    @CsvSource({ "false,false", "false,true", "true,false", "true,true" })
     public void testLeaderListenerNotified(boolean entireLog, boolean 
withKip853Rpc) throws Exception {
-        int localId = 0;
+        int localId = randomReplicaId();
         ReplicaKey otherNodeKey = replicaKey(localId + 1, false);
         Set<Integer> voters = Utils.mkSet(localId, otherNodeKey.id());
         OffsetAndEpoch snapshotId = new OffsetAndEpoch(3, 1);
@@ -135,9 +136,9 @@ public final class KafkaRaftClientSnapshotTest {
     }
 
     @ParameterizedTest
-    @ValueSource(booleans = {true, false})
+    @ValueSource(booleans = { true, false })
     public void testFollowerListenerNotified(boolean entireLog) throws 
Exception {
-        int localId = 0;
+        int localId = randomReplicaId();
         int leaderId = localId + 1;
         Set<Integer> voters = Utils.mkSet(localId, leaderId);
         int epoch = 2;
@@ -177,9 +178,9 @@ public final class KafkaRaftClientSnapshotTest {
     }
 
     @ParameterizedTest
-    @ValueSource(booleans = {true, false})
+    @ValueSource(booleans = { true, false })
     public void testSecondListenerNotified(boolean entireLog) throws Exception 
{
-        int localId = 0;
+        int localId = randomReplicaId();
         int leaderId = localId + 1;
         Set<Integer> voters = Utils.mkSet(localId, leaderId);
         int epoch = 2;
@@ -223,9 +224,9 @@ public final class KafkaRaftClientSnapshotTest {
     }
 
     @ParameterizedTest
-    @ValueSource(booleans = {false, true})
+    @ValueSource(booleans = { false, true })
     public void testListenerRenotified(boolean withKip853Rpc) throws Exception 
{
-        int localId = 0;
+        int localId = randomReplicaId();
         ReplicaKey otherNodeKey = replicaKey(localId + 1, withKip853Rpc);
         Set<Integer> voters = Utils.mkSet(localId, otherNodeKey.id());
         OffsetAndEpoch snapshotId = new OffsetAndEpoch(3, 1);
@@ -279,10 +280,10 @@ public final class KafkaRaftClientSnapshotTest {
     }
 
     @ParameterizedTest
-    @ValueSource(booleans = {false, true})
+    @ValueSource(booleans = { false, true })
     public void testLeaderImmediatelySendsSnapshotId(boolean withKip853Rpc) 
throws Exception {
-        int localId = 0;
-        ReplicaKey otherNodeKey = replicaKey(1, withKip853Rpc);
+        int localId = randomReplicaId();
+        ReplicaKey otherNodeKey = replicaKey(localId + 1, withKip853Rpc);
         Set<Integer> voters = Utils.mkSet(localId, otherNodeKey.id());
         OffsetAndEpoch snapshotId = new OffsetAndEpoch(3, 4);
 
@@ -313,9 +314,9 @@ public final class KafkaRaftClientSnapshotTest {
     }
 
     @ParameterizedTest
-    @ValueSource(booleans = {false, true})
+    @ValueSource(booleans = { false, true })
     public void testFetchRequestOffsetLessThanLogStart(boolean withKip853Rpc) 
throws Exception {
-        int localId = 0;
+        int localId = randomReplicaId();
         ReplicaKey otherNodeKey = replicaKey(localId + 1, withKip853Rpc);
         Set<Integer> voters = Utils.mkSet(localId, otherNodeKey.id());
 
@@ -361,10 +362,10 @@ public final class KafkaRaftClientSnapshotTest {
     }
 
     @ParameterizedTest
-    @ValueSource(booleans = {false, true})
+    @ValueSource(booleans = { false, true })
     public void testFetchRequestOffsetAtZero(boolean withKip853Rpc) throws 
Exception {
         // When the follower sends a FETCH request at offset 0, reply with 
snapshot id if it exists
-        int localId = 0;
+        int localId = randomReplicaId();
         ReplicaKey otherNodeKey = replicaKey(localId + 1, withKip853Rpc);
         Set<Integer> voters = Utils.mkSet(localId, otherNodeKey.id());
 
@@ -409,9 +410,9 @@ public final class KafkaRaftClientSnapshotTest {
     }
 
     @ParameterizedTest
-    @ValueSource(booleans = {false, true})
+    @ValueSource(booleans = { false, true })
     public void testFetchRequestWithLargerLastFetchedEpoch(boolean 
withKip853Rpc) throws Exception {
-        int localId = 0;
+        int localId = randomReplicaId();
         ReplicaKey otherNodeKey = replicaKey(localId + 1, withKip853Rpc);
         Set<Integer> voters = Utils.mkSet(localId, otherNodeKey.id());
 
@@ -449,9 +450,9 @@ public final class KafkaRaftClientSnapshotTest {
     }
 
     @ParameterizedTest
-    @ValueSource(booleans = {false, true})
+    @ValueSource(booleans = { false, true })
     public void testFetchRequestTruncateToLogStart(boolean withKip853Rpc) 
throws Exception {
-        int localId = 0;
+        int localId = randomReplicaId();
         ReplicaKey otherNodeKey = replicaKey(localId + 1, withKip853Rpc);
         int syncNodeId = otherNodeKey.id() + 1;
         Set<Integer> voters = Utils.mkSet(localId, otherNodeKey.id(), 
syncNodeId);
@@ -499,9 +500,9 @@ public final class KafkaRaftClientSnapshotTest {
     }
 
     @ParameterizedTest
-    @ValueSource(booleans = {false, true})
+    @ValueSource(booleans = { false, true })
     public void testFetchRequestAtLogStartOffsetWithValidEpoch(boolean 
withKip853Rpc) throws Exception {
-        int localId = 0;
+        int localId = randomReplicaId();
         ReplicaKey otherNodeKey = replicaKey(localId + 1, withKip853Rpc);
         int syncNodeId = otherNodeKey.id() + 1;
         Set<Integer> voters = Utils.mkSet(localId, otherNodeKey.id(), 
syncNodeId);
@@ -545,9 +546,9 @@ public final class KafkaRaftClientSnapshotTest {
     }
 
     @ParameterizedTest
-    @ValueSource(booleans = {false, true})
+    @ValueSource(booleans = { false, true })
     public void testFetchRequestAtLogStartOffsetWithInvalidEpoch(boolean 
withKip853Rpc) throws Exception {
-        int localId = 0;
+        int localId = randomReplicaId();
         ReplicaKey otherNodeKey = replicaKey(localId + 1, withKip853Rpc);
         int syncNodeId = otherNodeKey.id() + 1;
         Set<Integer> voters = Utils.mkSet(localId, otherNodeKey.id(), 
syncNodeId);
@@ -597,11 +598,11 @@ public final class KafkaRaftClientSnapshotTest {
     }
 
     @ParameterizedTest
-    @ValueSource(booleans = {false, true})
+    @ValueSource(booleans = { false, true })
     public void testFetchRequestWithLastFetchedEpochLessThanOldestSnapshot(
         boolean withKip853Rpc
     ) throws Exception {
-        int localId = 0;
+        int localId = randomReplicaId();
         ReplicaKey otherNodeKey = replicaKey(localId + 1, withKip853Rpc);
         int syncNodeId = otherNodeKey.id() + 1;
         Set<Integer> voters = Utils.mkSet(localId, otherNodeKey.id(), 
syncNodeId);
@@ -650,9 +651,9 @@ public final class KafkaRaftClientSnapshotTest {
     }
 
     @ParameterizedTest
-    @ValueSource(booleans = {false, true})
+    @ValueSource(booleans = { false, true })
     public void testFetchSnapshotRequestMissingSnapshot(boolean withKip853Rpc) 
throws Exception {
-        int localId = 0;
+        int localId = randomReplicaId();
         Set<Integer> voters = Utils.mkSet(localId, localId + 1);
 
         RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
@@ -713,9 +714,9 @@ public final class KafkaRaftClientSnapshotTest {
     }
 
     @ParameterizedTest
-    @ValueSource(booleans = {false, true})
+    @ValueSource(booleans = { false, true })
     public void testFetchSnapshotRequestUnknownPartition(boolean 
withKip853Rpc) throws Exception {
-        int localId = 0;
+        int localId = randomReplicaId();
         Set<Integer> voters = Utils.mkSet(localId, localId + 1);
         TopicPartition topicPartition = new TopicPartition("unknown", 0);
 
@@ -744,9 +745,9 @@ public final class KafkaRaftClientSnapshotTest {
     }
 
     @ParameterizedTest
-    @ValueSource(booleans = {false, true})
+    @ValueSource(booleans = { false, true })
     public void testFetchSnapshotRequestAsLeader(boolean withKip853Rpc) throws 
Exception {
-        int localId = 0;
+        int localId = randomReplicaId();
         Set<Integer> voters = Utils.mkSet(localId, localId + 1);
         OffsetAndEpoch snapshotId = new OffsetAndEpoch(1, 1);
         List<String> records = Arrays.asList("foo", "bar");
@@ -795,14 +796,14 @@ public final class KafkaRaftClientSnapshotTest {
     }
 
     @ParameterizedTest
-    @ValueSource(booleans = {false, true})
+    @ValueSource(booleans = { false, true })
     public void 
testLeaderShouldResignLeadershipIfNotGetFetchSnapshotRequestFromMajorityVoters(
         boolean withKip853Rpc
     ) throws Exception {
-        int localId = 0;
-        ReplicaKey voter1 = replicaKey(1, withKip853Rpc);
-        ReplicaKey voter2 = replicaKey(2, withKip853Rpc);
-        ReplicaKey observer3 = replicaKey(3, withKip853Rpc);
+        int localId = randomReplicaId();
+        ReplicaKey voter1 = replicaKey(localId + 1, withKip853Rpc);
+        ReplicaKey voter2 = replicaKey(localId + 2, withKip853Rpc);
+        ReplicaKey observer3 = replicaKey(localId + 3, withKip853Rpc);
         Set<Integer> voters = Utils.mkSet(localId, voter1.id(), voter2.id());
         OffsetAndEpoch snapshotId = new OffsetAndEpoch(1, 1);
         List<String> records = Arrays.asList("foo", "bar");
@@ -890,9 +891,9 @@ public final class KafkaRaftClientSnapshotTest {
     }
 
     @ParameterizedTest
-    @ValueSource(booleans = {false, true})
+    @ValueSource(booleans = { false, true })
     public void testPartialFetchSnapshotRequestAsLeader(boolean withKip853Rpc) 
throws Exception {
-        int localId = 0;
+        int localId = randomReplicaId();
         Set<Integer> voters = Utils.mkSet(localId, localId + 1);
         OffsetAndEpoch snapshotId = new OffsetAndEpoch(2, 1);
         List<String> records = Arrays.asList("foo", "bar");
@@ -971,9 +972,9 @@ public final class KafkaRaftClientSnapshotTest {
     }
 
     @ParameterizedTest
-    @ValueSource(booleans = {false, true})
+    @ValueSource(booleans = { false, true })
     public void testFetchSnapshotRequestAsFollower(boolean withKip853Rpc) 
throws IOException {
-        int localId = 0;
+        int localId = randomReplicaId();
         int leaderId = localId + 1;
         Set<Integer> voters = Utils.mkSet(localId, leaderId);
         int epoch = 2;
@@ -1003,9 +1004,9 @@ public final class KafkaRaftClientSnapshotTest {
     }
 
     @ParameterizedTest
-    @ValueSource(booleans = {false, true})
+    @ValueSource(booleans = { false, true })
     public void testFetchSnapshotRequestWithInvalidPosition(boolean 
withKip853Rpc) throws Exception {
-        int localId = 0;
+        int localId = randomReplicaId();
         Set<Integer> voters = Utils.mkSet(localId, localId + 1);
         OffsetAndEpoch snapshotId = new OffsetAndEpoch(1, 1);
         List<String> records = Arrays.asList("foo", "bar");
@@ -1063,9 +1064,9 @@ public final class KafkaRaftClientSnapshotTest {
     }
 
     @ParameterizedTest
-    @ValueSource(booleans = {false, true})
+    @ValueSource(booleans = { false, true })
     public void testFetchSnapshotRequestWithOlderEpoch(boolean withKip853Rpc) 
throws Exception {
-        int localId = 0;
+        int localId = randomReplicaId();
         Set<Integer> voters = Utils.mkSet(localId, localId + 1);
         OffsetAndEpoch snapshotId = Snapshots.BOOTSTRAP_SNAPSHOT_ID;
 
@@ -1096,9 +1097,9 @@ public final class KafkaRaftClientSnapshotTest {
     }
 
     @ParameterizedTest
-    @ValueSource(booleans = {false, true})
+    @ValueSource(booleans = { false, true })
     public void testFetchSnapshotRequestWithNewerEpoch(boolean withKip853Rpc) 
throws Exception {
-        int localId = 0;
+        int localId = randomReplicaId();
         Set<Integer> voters = Utils.mkSet(localId, localId + 1);
         OffsetAndEpoch snapshotId = Snapshots.BOOTSTRAP_SNAPSHOT_ID;
 
@@ -1129,9 +1130,9 @@ public final class KafkaRaftClientSnapshotTest {
     }
 
     @ParameterizedTest
-    @ValueSource(booleans = {true, false})
+    @ValueSource(booleans = { true, false })
     public void testFetchResponseWithInvalidSnapshotId(boolean withKip853Rpc) 
throws Exception {
-        int localId = 0;
+        int localId = randomReplicaId();
         int leaderId = localId + 1;
         Set<Integer> voters = Utils.mkSet(localId, leaderId);
         int epoch = 2;
@@ -1192,9 +1193,9 @@ public final class KafkaRaftClientSnapshotTest {
     }
 
     @ParameterizedTest
-    @ValueSource(booleans = {true, false})
+    @ValueSource(booleans = { true, false })
     public void testFetchResponseWithSnapshotId(boolean withKip853Rpc) throws 
Exception {
-        int localId = 0;
+        int localId = randomReplicaId();
         int leaderId = localId + 1;
         Set<Integer> voters = Utils.mkSet(localId, leaderId);
         int epoch = 2;
@@ -1265,9 +1266,9 @@ public final class KafkaRaftClientSnapshotTest {
     }
 
     @ParameterizedTest
-    @ValueSource(booleans = {true, false})
+    @ValueSource(booleans = { true, false })
     public void testFetchSnapshotResponsePartialData(boolean withKip853Rpc) 
throws Exception {
-        int localId = 0;
+        int localId = randomReplicaId();
         int leaderId = localId + 1;
         Set<Integer> voters = Utils.mkSet(localId, leaderId);
         int epoch = 2;
@@ -1370,9 +1371,9 @@ public final class KafkaRaftClientSnapshotTest {
     }
 
     @ParameterizedTest
-    @ValueSource(booleans = {true, false})
+    @ValueSource(booleans = { true, false })
     public void testFetchSnapshotResponseMissingSnapshot(boolean 
withKip853Rpc) throws Exception {
-        int localId = 0;
+        int localId = randomReplicaId();
         int leaderId = localId + 1;
         Set<Integer> voters = Utils.mkSet(localId, leaderId);
         int epoch = 2;
@@ -1431,9 +1432,9 @@ public final class KafkaRaftClientSnapshotTest {
     }
 
     @ParameterizedTest
-    @ValueSource(booleans = {true, false})
+    @ValueSource(booleans = { true, false })
     public void testFetchSnapshotResponseFromNewerEpochNotLeader(boolean 
withKip853Rpc) throws Exception {
-        int localId = 0;
+        int localId = randomReplicaId();
         int firstLeaderId = localId + 1;
         int secondLeaderId = firstLeaderId + 1;
         Set<Integer> voters = Utils.mkSet(localId, firstLeaderId, 
secondLeaderId);
@@ -1493,9 +1494,9 @@ public final class KafkaRaftClientSnapshotTest {
     }
 
     @ParameterizedTest
-    @ValueSource(booleans = {true, false})
+    @ValueSource(booleans = { true, false })
     public void testFetchSnapshotResponseFromNewerEpochLeader(boolean 
withKip853Rpc) throws Exception {
-        int localId = 0;
+        int localId = randomReplicaId();
         int leaderId = localId + 1;
         Set<Integer> voters = Utils.mkSet(localId, leaderId);
         int epoch = 2;
@@ -1554,9 +1555,9 @@ public final class KafkaRaftClientSnapshotTest {
     }
 
     @ParameterizedTest
-    @ValueSource(booleans = {true, false})
+    @ValueSource(booleans = { true, false })
     public void testFetchSnapshotResponseFromOlderEpoch(boolean withKip853Rpc) 
throws Exception {
-        int localId = 0;
+        int localId = randomReplicaId();
         int leaderId = localId + 1;
         Set<Integer> voters = Utils.mkSet(localId, leaderId);
         int epoch = 2;
@@ -1625,9 +1626,9 @@ public final class KafkaRaftClientSnapshotTest {
     }
 
     @ParameterizedTest
-    @ValueSource(booleans = {true, false})
+    @ValueSource(booleans = { true, false })
     public void testFetchSnapshotResponseWithInvalidId(boolean withKip853Rpc) 
throws Exception {
-        int localId = 0;
+        int localId = randomReplicaId();
         int leaderId = localId + 1;
         Set<Integer> voters = Utils.mkSet(localId, leaderId);
         int epoch = 2;
@@ -1741,9 +1742,9 @@ public final class KafkaRaftClientSnapshotTest {
     }
 
     @ParameterizedTest
-    @ValueSource(booleans = {true, false})
+    @ValueSource(booleans = { true, false })
     public void testFetchSnapshotResponseToNotFollower(boolean withKip853Rpc) 
throws Exception {
-        int localId = 0;
+        int localId = randomReplicaId();
         int leaderId = localId + 1;
         Set<Integer> voters = Utils.mkSet(localId, leaderId);
         int epoch = 2;
@@ -1815,12 +1816,12 @@ public final class KafkaRaftClientSnapshotTest {
     }
 
     @ParameterizedTest
-    @ValueSource(booleans = {false, true})
+    @ValueSource(booleans = { false, true })
     public void testFetchSnapshotRequestClusterIdValidation(
         boolean withKip853Rpc
     ) throws Exception {
-        int localId = 0;
-        ReplicaKey otherNode = replicaKey(1, withKip853Rpc);
+        int localId = randomReplicaId();
+        ReplicaKey otherNode = replicaKey(localId + 1, withKip853Rpc);
         Set<Integer> voters = Utils.mkSet(localId, otherNode.id());
 
         RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
@@ -1893,9 +1894,9 @@ public final class KafkaRaftClientSnapshotTest {
     }
 
     @ParameterizedTest
-    @ValueSource(booleans = {true, false})
+    @ValueSource(booleans = { true, false })
     public void testCreateSnapshotAsLeaderWithInvalidSnapshotId(boolean 
withKip853Rpc) throws Exception {
-        int localId = 0;
+        int localId = randomReplicaId();
         int otherNodeId = localId + 1;
         Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
         int epoch = 2;
@@ -1941,11 +1942,11 @@ public final class KafkaRaftClientSnapshotTest {
     }
 
     @ParameterizedTest
-    @ValueSource(booleans = {true, false})
+    @ValueSource(booleans = { true, false })
     public void testCreateSnapshotAsFollowerWithInvalidSnapshotId(boolean 
withKip853Rpc) throws Exception {
-        int localId = 0;
-        int leaderId = 1;
-        int otherFollowerId = 2;
+        int localId = randomReplicaId();
+        int leaderId = localId + 1;
+        int otherFollowerId = localId + 2;
         int epoch = 5;
         Set<Integer> voters = Utils.mkSet(localId, leaderId, otherFollowerId);
 
@@ -2014,6 +2015,10 @@ public final class KafkaRaftClientSnapshotTest {
         return ReplicaKey.of(id, directoryId);
     }
 
+    private static int randomReplicaId() {
+        return ThreadLocalRandom.current().nextInt(1025);
+    }
+
     public static FetchSnapshotRequestData fetchSnapshotRequest(
             TopicPartition topicPartition,
             int epoch,
diff --git a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java 
b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
index 9e1258ba666..aaf7f6c93c5 100644
--- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
@@ -26,6 +26,7 @@ import 
org.apache.kafka.common.message.DescribeQuorumResponseData.ReplicaState;
 import org.apache.kafka.common.message.EndQuorumEpochResponseData;
 import org.apache.kafka.common.message.FetchRequestData;
 import org.apache.kafka.common.message.FetchResponseData;
+import org.apache.kafka.common.message.VoteRequestData;
 import org.apache.kafka.common.message.VoteResponseData;
 import org.apache.kafka.common.metrics.KafkaMetric;
 import org.apache.kafka.common.metrics.Metrics;
@@ -62,6 +63,7 @@ import java.util.OptionalInt;
 import java.util.OptionalLong;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeoutException;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
@@ -82,7 +84,7 @@ public class KafkaRaftClientTest {
     @ParameterizedTest
     @ValueSource(booleans = { true, false })
     public void testInitializeSingleMemberQuorum(boolean withKip853Rpc) throws 
IOException {
-        int localId = 0;
+        int localId = randomReplicaId();
         RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, Collections.singleton(localId))
             .withKip853Rpc(withKip853Rpc)
             .build();
@@ -94,8 +96,7 @@ public class KafkaRaftClientTest {
     @ValueSource(booleans = { true, false })
     public void testInitializeAsLeaderFromStateStoreSingleMemberQuorum(boolean 
withKip853Rpc) throws Exception {
         // Start off as leader. We should still bump the epoch after 
initialization
-
-        int localId = 0;
+        int localId = randomReplicaId();
         int initialEpoch = 2;
         Set<Integer> voters = Collections.singleton(localId);
         RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
@@ -114,8 +115,9 @@ public class KafkaRaftClientTest {
     @ParameterizedTest
     @ValueSource(booleans = { true, false })
     public void testRejectVotesFromSameEpochAfterResigningLeadership(boolean 
withKip853Rpc) throws Exception {
-        int localId = 0;
-        ReplicaKey remoteKey = replicaKey(1, withKip853Rpc);
+        int localId = randomReplicaId();
+        int remoteId = localId + 1;
+        ReplicaKey remoteKey = replicaKey(remoteId, withKip853Rpc);
         Set<Integer> voters = Utils.mkSet(localId, remoteKey.id());
         int epoch = 2;
 
@@ -145,8 +147,9 @@ public class KafkaRaftClientTest {
     @ParameterizedTest
     @ValueSource(booleans = { true, false })
     public void testRejectVotesFromSameEpochAfterResigningCandidacy(boolean 
withKip853Rpc) throws Exception {
-        int localId = 0;
-        ReplicaKey remoteKey = replicaKey(1, withKip853Rpc);
+        int localId = randomReplicaId();
+        int remoteId = localId + 1;
+        ReplicaKey remoteKey = replicaKey(remoteId, withKip853Rpc);
         Set<Integer> voters = Utils.mkSet(localId, remoteKey.id());
         int epoch = 2;
 
@@ -176,8 +179,9 @@ public class KafkaRaftClientTest {
     @ParameterizedTest
     @ValueSource(booleans = { true, false })
     public void testGrantVotesFromHigherEpochAfterResigningLeadership(boolean 
withKip853Rpc) throws Exception {
-        int localId = 0;
-        ReplicaKey remoteKey = replicaKey(1, withKip853Rpc);
+        int localId = randomReplicaId();
+        int remoteId = localId + 1;
+        ReplicaKey remoteKey = replicaKey(remoteId, withKip853Rpc);
         Set<Integer> voters = Utils.mkSet(localId, remoteKey.id());
         int epoch = 2;
 
@@ -212,8 +216,9 @@ public class KafkaRaftClientTest {
     @ParameterizedTest
     @ValueSource(booleans = { true, false })
     public void testGrantVotesFromHigherEpochAfterResigningCandidacy(boolean 
withKip853Rpc) throws Exception {
-        int localId = 0;
-        ReplicaKey remoteKey = replicaKey(1, withKip853Rpc);
+        int localId = randomReplicaId();
+        int remoteId = localId + 1;
+        ReplicaKey remoteKey = replicaKey(remoteId, withKip853Rpc);
         Set<Integer> voters = Utils.mkSet(localId, remoteKey.id());
         int epoch = 2;
 
@@ -248,8 +253,9 @@ public class KafkaRaftClientTest {
     @ParameterizedTest
     @ValueSource(booleans = { true, false })
     public void testGrantVotesWhenShuttingDown(boolean withKip853Rpc) throws 
Exception {
-        int localId = 0;
-        ReplicaKey remoteKey = replicaKey(1, withKip853Rpc);
+        int localId = randomReplicaId();
+        int remoteId = localId + 1;
+        ReplicaKey remoteKey = replicaKey(remoteId, withKip853Rpc);
         Set<Integer> voters = Utils.mkSet(localId, remoteKey.id());
 
         RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
@@ -275,7 +281,13 @@ public class KafkaRaftClientTest {
         context.client.poll();
 
         // We will first transition to unattached and then grant vote and then 
transition to voted
-        assertTrue(context.client.quorum().isVoted());
+        assertTrue(
+            context.client.quorum().isVoted(),
+            "Local Id: " + localId +
+            " Remote Id: " + remoteId +
+            " Quorum local Id: " + context.client.quorum().localIdOrSentinel() 
+
+            " Quorum leader Id: " + 
context.client.quorum().leaderIdOrSentinel()
+        );
         context.assertVotedCandidate(epoch + 1, remoteKey.id());
         context.assertSentVoteResponse(Errors.NONE, epoch + 1, 
OptionalInt.empty(), true);
     }
@@ -283,8 +295,8 @@ public class KafkaRaftClientTest {
     @ParameterizedTest
     @ValueSource(booleans = { true, false })
     public void testInitializeAsResignedAndBecomeCandidate(boolean 
withKip853Rpc) throws Exception {
-        int localId = 0;
-        int remoteId = 1;
+        int localId = randomReplicaId();
+        int remoteId = localId + 1;
         Set<Integer> voters = Utils.mkSet(localId, remoteId);
         int epoch = 2;
 
@@ -311,8 +323,9 @@ public class KafkaRaftClientTest {
     @ParameterizedTest
     @ValueSource(booleans = { true, false })
     public void testInitializeAsResignedLeaderFromStateStore(boolean 
withKip853Rpc) throws Exception {
-        int localId = 0;
-        Set<Integer> voters = Utils.mkSet(localId, 1);
+        int localId = randomReplicaId();
+        int remoteId = localId + 1;
+        Set<Integer> voters = Utils.mkSet(localId, remoteId);
         int epoch = 2;
 
         RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
@@ -329,7 +342,7 @@ public class KafkaRaftClientTest {
         assertThrows(NotLeaderException.class, () -> 
context.client.scheduleAppend(epoch, Arrays.asList("a", "b")));
 
         context.pollUntilRequest();
-        RaftRequest.Outbound request = 
context.assertSentEndQuorumEpochRequest(epoch, 1);
+        RaftRequest.Outbound request = 
context.assertSentEndQuorumEpochRequest(epoch, remoteId);
         context.deliverResponse(
             request.correlationId(),
             request.destination(),
@@ -346,8 +359,9 @@ public class KafkaRaftClientTest {
     @ParameterizedTest
     @ValueSource(booleans = { true, false })
     public void testAppendFailedWithNotLeaderException(boolean withKip853Rpc) 
throws Exception {
-        int localId = 0;
-        Set<Integer> voters = Utils.mkSet(localId, 1);
+        int localId = randomReplicaId();
+        int remoteId = localId + 1;
+        Set<Integer> voters = Utils.mkSet(localId, remoteId);
         int epoch = 2;
 
         RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
@@ -361,8 +375,8 @@ public class KafkaRaftClientTest {
     @ParameterizedTest
     @ValueSource(booleans = { true, false })
     public void testAppendFailedWithBufferAllocationException(boolean 
withKip853Rpc) throws Exception {
-        int localId = 0;
-        int otherNodeId = 1;
+        int localId = randomReplicaId();
+        int otherNodeId = localId + 1;
         Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
 
         MemoryPool memoryPool = Mockito.mock(MemoryPool.class);
@@ -388,8 +402,8 @@ public class KafkaRaftClientTest {
     @ParameterizedTest
     @ValueSource(booleans = { true, false })
     public void testAppendFailedWithFencedEpoch(boolean withKip853Rpc) throws 
Exception {
-        int localId = 0;
-        int otherNodeId = 1;
+        int localId = randomReplicaId();
+        int otherNodeId = localId + 1;
         Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
 
         RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
@@ -409,8 +423,8 @@ public class KafkaRaftClientTest {
     @ParameterizedTest
     @ValueSource(booleans = { true, false })
     public void testAppendFailedWithRecordBatchTooLargeException(boolean 
withKip853Rpc) throws Exception {
-        int localId = 0;
-        int otherNodeId = 1;
+        int localId = randomReplicaId();
+        int otherNodeId = localId + 1;
         Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
 
         RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
@@ -427,15 +441,15 @@ public class KafkaRaftClientTest {
             batchToLarge.add("a");
 
         assertThrows(RecordBatchTooLargeException.class,
-                () -> context.client.scheduleAtomicAppend(epoch, 
OptionalLong.empty(), batchToLarge));
+            () -> context.client.scheduleAtomicAppend(epoch, 
OptionalLong.empty(), batchToLarge));
     }
 
     @ParameterizedTest
     @ValueSource(booleans = { true, false })
     public void testEndQuorumEpochRetriesWhileResigned(boolean withKip853Rpc) 
throws Exception {
-        int localId = 0;
-        int voter1 = 1;
-        int voter2 = 2;
+        int localId = randomReplicaId();
+        int voter1 = localId + 1;
+        int voter2 = localId + 2;
         Set<Integer> voters = Utils.mkSet(localId, voter1, voter2);
         int epoch = 19;
 
@@ -479,8 +493,9 @@ public class KafkaRaftClientTest {
     @ParameterizedTest
     @ValueSource(booleans = { true, false })
     public void testResignWillCompleteFetchPurgatory(boolean withKip853Rpc) 
throws Exception {
-        int localId = 0;
-        ReplicaKey otherNodeKey = replicaKey(1, withKip853Rpc);
+        int localId = randomReplicaId();
+        int remoteId = localId + 1;
+        ReplicaKey otherNodeKey = replicaKey(remoteId, withKip853Rpc);
         Set<Integer> voters = Utils.mkSet(localId, otherNodeKey.id());
 
         RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
@@ -517,8 +532,8 @@ public class KafkaRaftClientTest {
     @ParameterizedTest
     @ValueSource(booleans = { true, false })
     public void testResignInOlderEpochIgnored(boolean withKip853Rpc) throws 
Exception {
-        int localId = 0;
-        int otherNodeId = 1;
+        int localId = randomReplicaId();
+        int otherNodeId = localId + 1;
         Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
 
         RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
@@ -543,9 +558,9 @@ public class KafkaRaftClientTest {
     public void testHandleBeginQuorumEpochAfterUserInitiatedResign(
         boolean withKip853Rpc
     ) throws Exception {
-        int localId = 0;
-        int remoteId1 = 1;
-        int remoteId2 = 2;
+        int localId = randomReplicaId();
+        int remoteId1 = localId + 1;
+        int remoteId2 = localId + 2;
         Set<Integer> voters = Utils.mkSet(localId, remoteId1, remoteId2);
 
         RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
@@ -571,9 +586,9 @@ public class KafkaRaftClientTest {
     @ParameterizedTest
     @ValueSource(booleans = { true, false })
     public void testBeginQuorumEpochHeartbeat(boolean withKip853Rpc) throws 
Exception {
-        int localId = 0;
-        int remoteId1 = 1;
-        int remoteId2 = 2;
+        int localId = randomReplicaId();
+        int remoteId1 = localId + 1;
+        int remoteId2 = localId + 2;
         Set<Integer> voters = Utils.mkSet(localId, remoteId1, remoteId2);
 
         RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
@@ -601,10 +616,13 @@ public class KafkaRaftClientTest {
     @ParameterizedTest
     @ValueSource(booleans = { true, false })
     public void 
testLeaderShouldResignLeadershipIfNotGetFetchRequestFromMajorityVoters(boolean 
withKip853Rpc) throws Exception {
-        int localId = 0;
-        ReplicaKey remoteKey1 = replicaKey(1, withKip853Rpc);
-        ReplicaKey remoteKey2 = replicaKey(2, withKip853Rpc);
-        ReplicaKey observerKey3 = replicaKey(3, withKip853Rpc);
+        int localId = randomReplicaId();
+        int remoteId1 = localId + 1;
+        int remoteId2 = localId + 2;
+        int observerId = localId + 3;
+        ReplicaKey remoteKey1 = replicaKey(remoteId1, withKip853Rpc);
+        ReplicaKey remoteKey2 = replicaKey(remoteId2, withKip853Rpc);
+        ReplicaKey observerKey3 = replicaKey(observerId, withKip853Rpc);
         Set<Integer> voters = Utils.mkSet(localId, remoteKey1.id(), 
remoteKey2.id());
 
         RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
@@ -658,7 +676,7 @@ public class KafkaRaftClientTest {
     @ParameterizedTest
     @ValueSource(booleans = { true, false })
     public void testLeaderShouldNotResignLeadershipIfOnlyOneVoters(boolean 
withKip853Rpc) throws Exception {
-        int localId = 0;
+        int localId = randomReplicaId();
         Set<Integer> voters = Utils.mkSet(localId);
 
         RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
@@ -677,8 +695,8 @@ public class KafkaRaftClientTest {
     @ParameterizedTest
     @ValueSource(booleans = { true, false })
     public void testElectionTimeoutAfterUserInitiatedResign(boolean 
withKip853Rpc) throws Exception {
-        int localId = 0;
-        int otherNodeId = 1;
+        int localId = randomReplicaId();
+        int otherNodeId = localId + 1;
         Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
 
         RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
@@ -730,8 +748,8 @@ public class KafkaRaftClientTest {
     @ParameterizedTest
     @ValueSource(booleans = { true, false })
     public void testCannotResignWithLargerEpochThanCurrentEpoch(boolean 
withKip853Rpc) throws Exception {
-        int localId = 0;
-        int otherNodeId = 1;
+        int localId = randomReplicaId();
+        int otherNodeId = localId + 1;
         Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
 
         RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
@@ -746,8 +764,8 @@ public class KafkaRaftClientTest {
     @ParameterizedTest
     @ValueSource(booleans = { true, false })
     public void testCannotResignIfNotLeader(boolean withKip853Rpc) throws 
Exception {
-        int localId = 0;
-        int otherNodeId = 1;
+        int localId = randomReplicaId();
+        int otherNodeId = localId + 1;
         int leaderEpoch = 2;
         Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
 
@@ -763,8 +781,8 @@ public class KafkaRaftClientTest {
     @ParameterizedTest
     @ValueSource(booleans = { true, false })
     public void testCannotResignIfObserver(boolean withKip853Rpc) throws 
Exception {
-        int leaderId = 1;
-        int otherNodeId = 2;
+        int leaderId = randomReplicaId();
+        int otherNodeId = randomReplicaId() + 1;
         int epoch = 5;
         Set<Integer> voters = Utils.mkSet(leaderId, otherNodeId);
 
@@ -791,9 +809,9 @@ public class KafkaRaftClientTest {
     @ParameterizedTest
     @ValueSource(booleans = { true, false })
     public void testInitializeAsCandidateFromStateStore(boolean withKip853Rpc) 
throws Exception {
-        int localId = 0;
+        int localId = randomReplicaId();
         // Need 3 node to require a 2-node majority
-        Set<Integer> voters = Utils.mkSet(localId, 1, 2);
+        Set<Integer> voters = Utils.mkSet(localId, localId + 1, localId + 2);
 
         RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
             .withVotedCandidate(2, ReplicaKey.of(localId, 
ReplicaKey.NO_DIRECTORY_ID))
@@ -811,8 +829,8 @@ public class KafkaRaftClientTest {
     @ParameterizedTest
     @ValueSource(booleans = { true, false })
     public void testInitializeAsCandidateAndBecomeLeader(boolean 
withKip853Rpc) throws Exception {
-        int localId = 0;
-        final int otherNodeId = 1;
+        final int localId = randomReplicaId();
+        final int otherNodeId = localId + 1;
         Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
         RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
             .withKip853Rpc(withKip853Rpc)
@@ -857,9 +875,9 @@ public class KafkaRaftClientTest {
     @ParameterizedTest
     @ValueSource(booleans = { true, false })
     public void testInitializeAsCandidateAndBecomeLeaderQuorumOfThree(boolean 
withKip853Rpc) throws Exception {
-        int localId = 0;
-        final int firstNodeId = 1;
-        final int secondNodeId = 2;
+        int localId = randomReplicaId();
+        final int firstNodeId = localId + 1;
+        final int secondNodeId = localId + 2;
         Set<Integer> voters = Utils.mkSet(localId, firstNodeId, secondNodeId);
         RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
             .withKip853Rpc(withKip853Rpc)
@@ -878,6 +896,10 @@ public class KafkaRaftClientTest {
             context.voteResponse(true, OptionalInt.empty(), 1)
         );
 
+        VoteRequestData voteRequest = (VoteRequestData) request.data();
+        int voterId = voteRequest.voterId();
+        assertNotEquals(localId, voterId);
+
         // Become leader after receiving the vote
         context.pollUntil(() -> context.log.endOffset().offset() == 1L);
         context.assertElectedLeader(1, localId);
@@ -898,14 +920,14 @@ public class KafkaRaftClientTest {
         Record record = batch.iterator().next();
         assertEquals(electionTimestamp, record.timestamp());
         RaftClientTestContext.verifyLeaderChangeMessage(localId, 
Arrays.asList(localId, firstNodeId, secondNodeId),
-            Arrays.asList(firstNodeId, localId), record.key(), record.value());
+            Arrays.asList(voterId, localId), record.key(), record.value());
     }
 
     @ParameterizedTest
     @ValueSource(booleans = { true, false })
     public void testHandleBeginQuorumRequest(boolean withKip853Rpc) throws 
Exception {
-        int localId = 0;
-        ReplicaKey otherNodeKey = replicaKey(1, withKip853Rpc);
+        int localId = randomReplicaId();
+        ReplicaKey otherNodeKey = replicaKey(localId + 1, withKip853Rpc);
         int votedCandidateEpoch = 2;
         Set<Integer> voters = Utils.mkSet(localId, otherNodeKey.id());
 
@@ -929,8 +951,8 @@ public class KafkaRaftClientTest {
     @ParameterizedTest
     @ValueSource(booleans = { true, false })
     public void testHandleBeginQuorumResponse(boolean withKip853Rpc) throws 
Exception {
-        int localId = 0;
-        int otherNodeId = 1;
+        int localId = randomReplicaId();
+        int otherNodeId = localId + 1;
         int leaderEpoch = 2;
         Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
 
@@ -948,8 +970,8 @@ public class KafkaRaftClientTest {
     @ParameterizedTest
     @ValueSource(booleans = { true, false })
     public void testEndQuorumIgnoredAsCandidateIfOlderEpoch(boolean 
withKip853Rpc) throws Exception {
-        int localId = 0;
-        int otherNodeId = 1;
+        int localId = randomReplicaId();
+        int otherNodeId = localId + 1;
         int epoch = 5;
         int jitterMs = 85;
         Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
@@ -995,7 +1017,7 @@ public class KafkaRaftClientTest {
     @ParameterizedTest
     @ValueSource(booleans = { true, false })
     public void testEndQuorumIgnoredAsLeaderIfOlderEpoch(boolean 
withKip853Rpc) throws Exception {
-        int localId = 0;
+        int localId = randomReplicaId();
         int voter2 = localId + 1;
         ReplicaKey voter3 = replicaKey(localId + 2, withKip853Rpc);
         Set<Integer> voters = Utils.mkSet(localId, voter2, voter3.id());
@@ -1026,7 +1048,7 @@ public class KafkaRaftClientTest {
     public void testEndQuorumStartsNewElectionImmediatelyIfFollowerUnattached(
         boolean withKip853Rpc
     ) throws Exception {
-        int localId = 0;
+        int localId = randomReplicaId();
         int voter2 = localId + 1;
         ReplicaKey voter3 = replicaKey(localId + 2, withKip853Rpc);
         int epoch = 2;
@@ -1056,8 +1078,8 @@ public class KafkaRaftClientTest {
     @ParameterizedTest
     @ValueSource(booleans = { true, false })
     public void testAccumulatorClearedAfterBecomingFollower(boolean 
withKip853Rpc) throws Exception {
-        int localId = 0;
-        int otherNodeId = 1;
+        int localId = randomReplicaId();
+        int otherNodeId = localId + 1;
         int lingerMs = 50;
         Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
 
@@ -1088,8 +1110,8 @@ public class KafkaRaftClientTest {
     @ParameterizedTest
     @ValueSource(booleans = { true, false })
     public void testAccumulatorClearedAfterBecomingVoted(boolean 
withKip853Rpc) throws Exception {
-        int localId = 0;
-        ReplicaKey otherNodeKey = replicaKey(1, withKip853Rpc);
+        int localId = randomReplicaId();
+        ReplicaKey otherNodeKey = replicaKey(localId + 1, withKip853Rpc);
         int lingerMs = 50;
         Set<Integer> voters = Utils.mkSet(localId, otherNodeKey.id());
 
@@ -1121,8 +1143,8 @@ public class KafkaRaftClientTest {
     @ParameterizedTest
     @ValueSource(booleans = { true, false })
     public void testAccumulatorClearedAfterBecomingUnattached(boolean 
withKip853Rpc) throws Exception {
-        int localId = 0;
-        ReplicaKey otherNodeKey = replicaKey(1, withKip853Rpc);
+        int localId = randomReplicaId();
+        ReplicaKey otherNodeKey = replicaKey(localId + 1, withKip853Rpc);
         int lingerMs = 50;
         Set<Integer> voters = Utils.mkSet(localId, otherNodeKey.id());
 
@@ -1155,9 +1177,8 @@ public class KafkaRaftClientTest {
     public void testChannelWokenUpIfLingerTimeoutReachedWithoutAppend(boolean 
withKip853Rpc) throws Exception {
         // This test verifies that the client will set its poll timeout 
accounting
         // for the lingerMs of a pending append
-
-        int localId = 0;
-        int otherNodeId = 1;
+        int localId = randomReplicaId();
+        int otherNodeId = localId + 1;
         int lingerMs = 50;
         Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
 
@@ -1191,9 +1212,8 @@ public class KafkaRaftClientTest {
     public void testChannelWokenUpIfLingerTimeoutReachedDuringAppend(boolean 
withKip853Rpc) throws Exception {
         // This test verifies that the client will get woken up immediately
         // if the linger timeout has expired during an append
-
-        int localId = 0;
-        int otherNodeId = 1;
+        int localId = randomReplicaId();
+        int otherNodeId = localId + 1;
         int lingerMs = 50;
         Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
 
@@ -1225,8 +1245,8 @@ public class KafkaRaftClientTest {
     @ParameterizedTest
     @ValueSource(booleans = { true, false })
     public void testHandleEndQuorumRequest(boolean withKip853Rpc) throws 
Exception {
-        int localId = 0;
-        int oldLeaderId = 1;
+        int localId = randomReplicaId();
+        int oldLeaderId = randomReplicaId() + 1;
         int leaderEpoch = 2;
         Set<Integer> voters = Utils.mkSet(localId, oldLeaderId);
 
@@ -1253,10 +1273,10 @@ public class KafkaRaftClientTest {
     @ParameterizedTest
     @ValueSource(booleans = { true, false })
     public void 
testHandleEndQuorumRequestWithLowerPriorityToBecomeLeader(boolean 
withKip853Rpc) throws Exception {
-        int localId = 0;
-        ReplicaKey oldLeaderKey = replicaKey(1, withKip853Rpc);
+        int localId = randomReplicaId();
+        ReplicaKey oldLeaderKey = replicaKey(localId + 1, withKip853Rpc);
         int leaderEpoch = 2;
-        ReplicaKey preferredNextLeader = replicaKey(3, withKip853Rpc);
+        ReplicaKey preferredNextLeader = replicaKey(localId + 2, 
withKip853Rpc);
         Set<Integer> voters = Utils.mkSet(localId, oldLeaderKey.id(), 
preferredNextLeader.id());
 
         RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
@@ -1296,9 +1316,9 @@ public class KafkaRaftClientTest {
     @ParameterizedTest
     @ValueSource(booleans = { true, false })
     public void testVoteRequestTimeout(boolean withKip853Rpc) throws Exception 
{
-        int localId = 0;
+        int localId = randomReplicaId();
+        int otherNodeId = localId + 1;
         int epoch = 1;
-        int otherNodeId = 1;
         Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
 
         RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
@@ -1338,9 +1358,9 @@ public class KafkaRaftClientTest {
     @ParameterizedTest
     @ValueSource(booleans = { true, false })
     public void testHandleValidVoteRequestAsFollower(boolean withKip853Rpc) 
throws Exception {
-        int localId = 0;
+        int localId = randomReplicaId();
         int epoch = 2;
-        ReplicaKey otherNodeKey = replicaKey(1, withKip853Rpc);
+        ReplicaKey otherNodeKey = replicaKey(localId + 1, withKip853Rpc);
         Set<Integer> voters = Utils.mkSet(localId, otherNodeKey.id());
 
         RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
@@ -1359,10 +1379,10 @@ public class KafkaRaftClientTest {
     @ParameterizedTest
     @ValueSource(booleans = { true, false })
     public void testHandleVoteRequestAsFollowerWithElectedLeader(boolean 
withKip853Rpc) throws Exception {
-        int localId = 0;
+        int localId = randomReplicaId();
         int epoch = 2;
-        ReplicaKey otherNodeKey = replicaKey(1, withKip853Rpc);
-        int electedLeaderId = 3;
+        ReplicaKey otherNodeKey = replicaKey(localId + 1, withKip853Rpc);
+        int electedLeaderId = localId + 2;
         Set<Integer> voters = Utils.mkSet(localId, otherNodeKey.id(), 
electedLeaderId);
 
         RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
@@ -1381,10 +1401,10 @@ public class KafkaRaftClientTest {
     @ParameterizedTest
     @ValueSource(booleans = { true, false })
     public void testHandleVoteRequestAsFollowerWithVotedCandidate(boolean 
withKip853Rpc) throws Exception {
-        int localId = 0;
+        int localId = randomReplicaId();
         int epoch = 2;
-        ReplicaKey otherNodeKey = replicaKey(1, withKip853Rpc);
-        ReplicaKey votedCandidateKey = replicaKey(3, withKip853Rpc);
+        ReplicaKey otherNodeKey = replicaKey(localId + 1, withKip853Rpc);
+        ReplicaKey votedCandidateKey = replicaKey(localId + 2, withKip853Rpc);
         Set<Integer> voters = Utils.mkSet(localId, otherNodeKey.id(), 
votedCandidateKey.id());
 
         RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
@@ -1402,9 +1422,9 @@ public class KafkaRaftClientTest {
     @ParameterizedTest
     @ValueSource(booleans = { true, false })
     public void testHandleInvalidVoteRequestWithOlderEpoch(boolean 
withKip853Rpc) throws Exception {
-        int localId = 0;
+        int localId = randomReplicaId();
         int epoch = 2;
-        ReplicaKey otherNodeKey = replicaKey(1, withKip853Rpc);
+        ReplicaKey otherNodeKey = replicaKey(localId + 1, withKip853Rpc);
         Set<Integer> voters = Utils.mkSet(localId, otherNodeKey.id());
 
         RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
@@ -1422,10 +1442,10 @@ public class KafkaRaftClientTest {
     @ParameterizedTest
     @ValueSource(booleans = { true, false })
     public void testHandleVoteRequestAsObserver(boolean withKip853Rpc) throws 
Exception {
-        int localId = 0;
+        int localId = randomReplicaId();
         int epoch = 2;
-        ReplicaKey otherNodeKey = replicaKey(1, withKip853Rpc);
-        int otherNodeId2 = 2;
+        ReplicaKey otherNodeKey = replicaKey(localId + 1, withKip853Rpc);
+        int otherNodeId2 = localId + 2;
         Set<Integer> voters = Utils.mkSet(otherNodeKey.id(), otherNodeId2);
 
         RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
@@ -1443,8 +1463,8 @@ public class KafkaRaftClientTest {
     @ParameterizedTest
     @ValueSource(booleans = { true, false })
     public void testLeaderIgnoreVoteRequestOnSameEpoch(boolean withKip853Rpc) 
throws Exception {
-        int localId = 0;
-        ReplicaKey otherNodeKey = replicaKey(1, withKip853Rpc);
+        int localId = randomReplicaId();
+        ReplicaKey otherNodeKey = replicaKey(localId + 1, withKip853Rpc);
         Set<Integer> voters = Utils.mkSet(localId, otherNodeKey.id());
 
         RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
@@ -1466,8 +1486,8 @@ public class KafkaRaftClientTest {
     @ParameterizedTest
     @ValueSource(booleans = { true, false })
     public void testListenerCommitCallbackAfterLeaderWrite(boolean 
withKip853Rpc) throws Exception {
-        int localId = 0;
-        ReplicaKey otherNodeKey = replicaKey(1, withKip853Rpc);
+        int localId = randomReplicaId();
+        ReplicaKey otherNodeKey = replicaKey(localId + 1, withKip853Rpc);
         Set<Integer> voters = Utils.mkSet(localId, otherNodeKey.id());
 
         RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
@@ -1511,8 +1531,8 @@ public class KafkaRaftClientTest {
     @ParameterizedTest
     @ValueSource(booleans = { true, false })
     public void testLeaderImmediatelySendsDivergingEpoch(boolean 
withKip853Rpc) throws Exception {
-        int localId = 0;
-        ReplicaKey otherNodeKey = replicaKey(1, withKip853Rpc);
+        int localId = randomReplicaId();
+        ReplicaKey otherNodeKey = replicaKey(localId + 1, withKip853Rpc);
         Set<Integer> voters = Utils.mkSet(localId, otherNodeKey.id());
 
         RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
@@ -1543,8 +1563,8 @@ public class KafkaRaftClientTest {
     @ParameterizedTest
     @ValueSource(booleans = { true, false })
     public void testCandidateIgnoreVoteRequestOnSameEpoch(boolean 
withKip853Rpc) throws Exception {
-        int localId = 0;
-        ReplicaKey otherNodeKey = replicaKey(1, withKip853Rpc);
+        int localId = randomReplicaId();
+        ReplicaKey otherNodeKey = replicaKey(localId + 1, withKip853Rpc);
         int leaderEpoch = 2;
         Set<Integer> voters = Utils.mkSet(localId, otherNodeKey.id());
 
@@ -1564,8 +1584,8 @@ public class KafkaRaftClientTest {
     @ParameterizedTest
     @ValueSource(booleans = { true, false })
     public void testRetryElection(boolean withKip853Rpc) throws Exception {
-        int localId = 0;
-        int otherNodeId = 1;
+        int localId = randomReplicaId();
+        int otherNodeId = localId + 1;
         int epoch = 1;
         int exponentialFactor = 85;  // set it large enough so that we will 
bound on jitter
         Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
@@ -1611,8 +1631,8 @@ public class KafkaRaftClientTest {
     @ParameterizedTest
     @ValueSource(booleans = { true, false })
     public void testInitializeAsFollowerEmptyLog(boolean withKip853Rpc) throws 
Exception {
-        int localId = 0;
-        int otherNodeId = 1;
+        int localId = randomReplicaId();
+        int otherNodeId = localId + 1;
         int epoch = 5;
         Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
 
@@ -1631,8 +1651,8 @@ public class KafkaRaftClientTest {
     @ParameterizedTest
     @ValueSource(booleans = { true, false })
     public void testInitializeAsFollowerNonEmptyLog(boolean withKip853Rpc) 
throws Exception {
-        int localId = 0;
-        int otherNodeId = 1;
+        int localId = randomReplicaId();
+        int otherNodeId = localId + 1;
         int epoch = 5;
         int lastEpoch = 3;
         Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
@@ -1652,8 +1672,8 @@ public class KafkaRaftClientTest {
     @ParameterizedTest
     @ValueSource(booleans = { true, false })
     public void testVoterBecomeCandidateAfterFetchTimeout(boolean 
withKip853Rpc) throws Exception {
-        int localId = 0;
-        int otherNodeId = 1;
+        int localId = randomReplicaId();
+        int otherNodeId = localId + 1;
         int epoch = 5;
         int lastEpoch = 3;
         Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
@@ -1679,9 +1699,9 @@ public class KafkaRaftClientTest {
     @ParameterizedTest
     @ValueSource(booleans = { true, false })
     public void testInitializeObserverNoPreviousState(boolean withKip853Rpc) 
throws Exception {
-        int localId = 0;
-        int leaderId = 1;
-        int otherNodeId = 2;
+        int localId = randomReplicaId();
+        int leaderId = localId + 1;
+        int otherNodeId = localId + 2;
         int epoch = 5;
         Set<Integer> voters = Utils.mkSet(leaderId, otherNodeId);
 
@@ -1707,8 +1727,8 @@ public class KafkaRaftClientTest {
     @ParameterizedTest
     @ValueSource(booleans = { true, false })
     public void testObserverQuorumDiscoveryFailure(boolean withKip853Rpc) 
throws Exception {
-        int localId = 0;
-        int leaderId = 1;
+        int localId = randomReplicaId();
+        int leaderId = localId + 1;
         int epoch = 5;
         Set<Integer> voters = Utils.mkSet(leaderId);
         List<InetSocketAddress> bootstrapServers = voters
@@ -1753,9 +1773,9 @@ public class KafkaRaftClientTest {
     @ParameterizedTest
     @ValueSource(booleans = { true, false })
     public void testObserverSendDiscoveryFetchAfterFetchTimeout(boolean 
withKip853Rpc) throws Exception {
-        int localId = 0;
-        int leaderId = 1;
-        int otherNodeId = 2;
+        int localId = randomReplicaId();
+        int leaderId = localId + 1;
+        int otherNodeId = localId + 2;
         int epoch = 5;
         Set<Integer> voters = Utils.mkSet(leaderId, otherNodeId);
         List<InetSocketAddress> bootstrapServers = voters
@@ -1796,10 +1816,9 @@ public class KafkaRaftClientTest {
     public void testObserverHandleRetryFetchtToBootstrapServer(boolean 
withKip853Rpc) throws Exception {
         // This test tries to check that KRaft is able to handle a retrying 
Fetch request to
         // a boostrap server after a Fetch request to the leader.
-
-        int localId = 0;
-        int leaderId = 1;
-        int otherNodeId = 2;
+        int localId = randomReplicaId();
+        int leaderId = localId + 1;
+        int otherNodeId = localId + 2;
         int epoch = 5;
         Set<Integer> voters = Utils.mkSet(leaderId, otherNodeId);
         List<InetSocketAddress> bootstrapServers = voters
@@ -1871,10 +1890,9 @@ public class KafkaRaftClientTest {
     public void testObserverHandleRetryFetchToLeader(boolean withKip853Rpc) 
throws Exception {
         // This test tries to check that KRaft is able to handle a retrying 
Fetch request to
         // the leader after a Fetch request to the bootstrap server.
-
-        int localId = 0;
-        int leaderId = 1;
-        int otherNodeId = 2;
+        int localId = randomReplicaId();
+        int leaderId = localId + 1;
+        int otherNodeId = localId + 2;
         int epoch = 5;
         Set<Integer> voters = Utils.mkSet(leaderId, otherNodeId);
         List<InetSocketAddress> bootstrapServers = voters
@@ -1929,8 +1947,8 @@ public class KafkaRaftClientTest {
     @ParameterizedTest
     @ValueSource(booleans = { true, false })
     public void testInvalidFetchRequest(boolean withKip853Rpc) throws 
Exception {
-        int localId = 0;
-        ReplicaKey otherNodeKey = replicaKey(1, withKip853Rpc);
+        int localId = randomReplicaId();
+        ReplicaKey otherNodeKey = replicaKey(localId + 1, withKip853Rpc);
         Set<Integer> voters = Utils.mkSet(localId, otherNodeKey.id());
 
         RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
@@ -1974,8 +1992,9 @@ public class KafkaRaftClientTest {
     @ParameterizedTest
     @MethodSource("validFetchVersions")
     public void testLeaderStateUpdateWithDifferentFetchRequestVersions(short 
version) throws Exception {
-        int localId = 0;
-        ReplicaKey otherNodeKey = replicaKey(1, false);
+        int localId = randomReplicaId();
+        int otherNodeId = localId + 1;
+        ReplicaKey otherNodeKey = replicaKey(otherNodeId, false);
         int epoch = 5;
         Set<Integer> voters = Utils.mkSet(localId, otherNodeKey.id());
 
@@ -1989,8 +2008,8 @@ public class KafkaRaftClientTest {
         // Now we will advance the high watermark with a follower fetch 
request.
         FetchRequestData fetchRequestData = context.fetchRequest(epoch, 
otherNodeKey, 1L, epoch, 0);
         FetchRequestData request = new 
FetchRequest.SimpleBuilder(fetchRequestData).build(version).data();
-        assertEquals((version < 15) ? 1 : -1, fetchRequestData.replicaId());
-        assertEquals((version < 15) ? -1 : 1, 
fetchRequestData.replicaState().replicaId());
+        assertEquals((version < 15) ? otherNodeId : -1, 
fetchRequestData.replicaId());
+        assertEquals((version < 15) ? -1 : otherNodeId, 
fetchRequestData.replicaState().replicaId());
         context.deliverRequest(request, version);
         context.pollUntilResponse();
         context.assertSentFetchPartitionResponse(Errors.NONE, epoch, 
OptionalInt.of(localId));
@@ -2000,8 +2019,8 @@ public class KafkaRaftClientTest {
     @ParameterizedTest
     @ValueSource(booleans = { true, false })
     public void testFetchRequestClusterIdValidation(boolean withKip853Rpc) 
throws Exception {
-        int localId = 0;
-        ReplicaKey otherNodeKey = replicaKey(1, withKip853Rpc);
+        int localId = randomReplicaId();
+        ReplicaKey otherNodeKey = replicaKey(localId + 1, withKip853Rpc);
         Set<Integer> voters = Utils.mkSet(localId, otherNodeKey.id());
 
         RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
@@ -2036,8 +2055,8 @@ public class KafkaRaftClientTest {
     @ParameterizedTest
     @ValueSource(booleans = { true, false })
     public void testVoteRequestClusterIdValidation(boolean withKip853Rpc) 
throws Exception {
-        int localId = 0;
-        ReplicaKey otherNodeKey = replicaKey(1, withKip853Rpc);
+        int localId = randomReplicaId();
+        ReplicaKey otherNodeKey = replicaKey(localId + 1, withKip853Rpc);
         Set<Integer> voters = Utils.mkSet(localId, otherNodeKey.id());
 
         RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
@@ -2070,8 +2089,8 @@ public class KafkaRaftClientTest {
 
     @Test
     public void testInvalidVoterReplicaVoteRequest() throws Exception {
-        int localId = 0;
-        ReplicaKey otherNodeKey = replicaKey(1, true);
+        int localId = randomReplicaId();
+        ReplicaKey otherNodeKey = replicaKey(localId + 1, true);
         Set<Integer> voters = Utils.mkSet(localId, otherNodeKey.id());
 
         RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
@@ -2112,12 +2131,11 @@ public class KafkaRaftClientTest {
 
     @Test
     public void testInvalidVoterReplicaBeginQuorumEpochRequest() throws 
Exception {
-        int localId = 0;
-        int voter1 = localId;
+        int localId = randomReplicaId();
         int voter2 = localId + 1;
         int voter3 = localId + 2;
         int epoch = 5;
-        Set<Integer> voters = Utils.mkSet(voter1, voter2, voter3);
+        Set<Integer> voters = Utils.mkSet(localId, voter2, voter3);
 
         RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
             .withUnknownLeader(epoch - 1)
@@ -2167,8 +2185,8 @@ public class KafkaRaftClientTest {
     @ParameterizedTest
     @ValueSource(booleans = { true, false })
     public void testBeginQuorumEpochRequestClusterIdValidation(boolean 
withKip853Rpc) throws Exception {
-        int localId = 0;
-        int otherNodeId = 1;
+        int localId = randomReplicaId();
+        int otherNodeId = localId + 1;
         Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
 
         RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
@@ -2203,8 +2221,8 @@ public class KafkaRaftClientTest {
     @ParameterizedTest
     @ValueSource(booleans = { true, false })
     public void testEndQuorumEpochRequestClusterIdValidation(boolean 
withKip853Rpc) throws Exception {
-        int localId = 0;
-        ReplicaKey otherNodeKey = replicaKey(1, withKip853Rpc);
+        int localId = randomReplicaId();
+        ReplicaKey otherNodeKey = replicaKey(localId + 1, withKip853Rpc);
         Set<Integer> voters = Utils.mkSet(localId, otherNodeKey.id());
 
         RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
@@ -2239,8 +2257,8 @@ public class KafkaRaftClientTest {
     @ParameterizedTest
     @ValueSource(booleans = { true, false })
     public void testLeaderAcceptVoteFromNonVoter(boolean withKip853Rpc) throws 
Exception {
-        int localId = 0;
-        int otherNodeId = 1;
+        int localId = randomReplicaId();
+        int otherNodeId = localId + 1;
         Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
 
         RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
@@ -2251,7 +2269,7 @@ public class KafkaRaftClientTest {
         context.becomeLeader();
         int epoch = context.currentEpoch();
 
-        ReplicaKey nonVoterKey = replicaKey(2, withKip853Rpc);
+        ReplicaKey nonVoterKey = replicaKey(localId + 2, withKip853Rpc);
         context.deliverRequest(context.voteRequest(epoch - 1, nonVoterKey, 0, 
0));
         context.client.poll();
         context.assertSentVoteResponse(Errors.FENCED_LEADER_EPOCH, epoch, 
OptionalInt.of(localId), false);
@@ -2264,8 +2282,8 @@ public class KafkaRaftClientTest {
     @ParameterizedTest
     @ValueSource(booleans = { true, false })
     public void testInvalidVoteRequest(boolean withKip853Rpc) throws Exception 
{
-        int localId = 0;
-        ReplicaKey otherNodeKey = replicaKey(1, withKip853Rpc);
+        int localId = randomReplicaId();
+        ReplicaKey otherNodeKey = replicaKey(localId + 1, withKip853Rpc);
         int epoch = 5;
         Set<Integer> voters = Utils.mkSet(localId, otherNodeKey.id());
 
@@ -2309,8 +2327,8 @@ public class KafkaRaftClientTest {
     @ParameterizedTest
     @ValueSource(booleans = { true, false })
     public void testPurgatoryFetchTimeout(boolean withKip853Rpc) throws 
Exception {
-        int localId = 0;
-        ReplicaKey otherNodeKey = replicaKey(1, withKip853Rpc);
+        int localId = randomReplicaId();
+        ReplicaKey otherNodeKey = replicaKey(localId + 1, withKip853Rpc);
         Set<Integer> voters = Utils.mkSet(localId, otherNodeKey.id());
 
         RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
@@ -2337,8 +2355,8 @@ public class KafkaRaftClientTest {
     @ParameterizedTest
     @ValueSource(booleans = { true, false })
     public void testPurgatoryFetchSatisfiedByWrite(boolean withKip853Rpc) 
throws Exception {
-        int localId = 0;
-        ReplicaKey otherNodeKey = replicaKey(1, withKip853Rpc);
+        int localId = randomReplicaId();
+        ReplicaKey otherNodeKey = replicaKey(localId + 1, withKip853Rpc);
         Set<Integer> voters = Utils.mkSet(localId, otherNodeKey.id());
 
         RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
@@ -2355,7 +2373,7 @@ public class KafkaRaftClientTest {
         assertEquals(0, context.channel.drainSendQueue().size());
 
         // Append some records that can fulfill the Fetch request
-        String[] appendRecords = new String[] {"a", "b", "c"};
+        String[] appendRecords = new String[]{"a", "b", "c"};
         context.client.scheduleAppend(epoch, Arrays.asList(appendRecords));
         context.client.poll();
 
@@ -2366,11 +2384,10 @@ public class KafkaRaftClientTest {
     @ParameterizedTest
     @ValueSource(booleans = { true, false })
     public void testPurgatoryFetchCompletedByFollowerTransition(boolean 
withKip853Rpc) throws Exception {
-        int localId = 0;
-        int voter1 = localId;
+        int localId = randomReplicaId();
         ReplicaKey voterKey2 = replicaKey(localId + 1, withKip853Rpc);
         int voter3 = localId + 2;
-        Set<Integer> voters = Utils.mkSet(voter1, voterKey2.id(), voter3);
+        Set<Integer> voters = Utils.mkSet(localId, voterKey2.id(), voter3);
 
         RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
             .withUnknownLeader(4)
@@ -2403,8 +2420,8 @@ public class KafkaRaftClientTest {
     @ParameterizedTest
     @ValueSource(booleans = { true, false })
     public void testFetchResponseIgnoredAfterBecomingCandidate(boolean 
withKip853Rpc) throws Exception {
-        int localId = 0;
-        int otherNodeId = 1;
+        int localId = randomReplicaId();
+        int otherNodeId = localId + 1;
         int epoch = 5;
         // The other node starts out as the leader
         Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
@@ -2442,13 +2459,12 @@ public class KafkaRaftClientTest {
     public void testFetchResponseIgnoredAfterBecomingFollowerOfDifferentLeader(
         boolean withKip853Rpc
     ) throws Exception {
-        int localId = 0;
-        int voter1 = localId;
+        int localId = randomReplicaId();
         int voter2 = localId + 1;
         int voter3 = localId + 2;
         int epoch = 5;
         // Start out with `voter2` as the leader
-        Set<Integer> voters = Utils.mkSet(voter1, voter2, voter3);
+        Set<Integer> voters = Utils.mkSet(localId, voter2, voter3);
 
         RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
             .withElectedLeader(epoch, voter2)
@@ -2482,12 +2498,11 @@ public class KafkaRaftClientTest {
     @ParameterizedTest
     @ValueSource(booleans = { true, false })
     public void testVoteResponseIgnoredAfterBecomingFollower(boolean 
withKip853Rpc) throws Exception {
-        int localId = 0;
-        int voter1 = localId;
+        int localId = randomReplicaId();
         int voter2 = localId + 1;
         int voter3 = localId + 2;
         int epoch = 5;
-        Set<Integer> voters = Utils.mkSet(voter1, voter2, voter3);
+        Set<Integer> voters = Utils.mkSet(localId, voter2, voter3);
 
         RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
             .withUnknownLeader(epoch - 1)
@@ -2531,9 +2546,9 @@ public class KafkaRaftClientTest {
     @ParameterizedTest
     @ValueSource(booleans = { true, false })
     public void 
testObserverLeaderRediscoveryAfterBrokerNotAvailableError(boolean 
withKip853Rpc) throws Exception {
-        int localId = 0;
-        int leaderId = 1;
-        int otherNodeId = 2;
+        int localId = randomReplicaId();
+        int leaderId = localId + 1;
+        int otherNodeId = localId + 2;
         int epoch = 5;
         Set<Integer> voters = Utils.mkSet(leaderId, otherNodeId);
         List<InetSocketAddress> bootstrapServers = voters
@@ -2582,9 +2597,9 @@ public class KafkaRaftClientTest {
     @ParameterizedTest
     @ValueSource(booleans = { true, false })
     public void testObserverLeaderRediscoveryAfterRequestTimeout(boolean 
withKip853Rpc) throws Exception {
-        int localId = 0;
-        int leaderId = 1;
-        int otherNodeId = 2;
+        int localId = randomReplicaId();
+        int leaderId = localId + 1;
+        int otherNodeId = localId + 2;
         int epoch = 5;
         Set<Integer> voters = Utils.mkSet(leaderId, otherNodeId);
         List<InetSocketAddress> bootstrapServers = voters
@@ -2627,8 +2642,8 @@ public class KafkaRaftClientTest {
     @ParameterizedTest
     @ValueSource(booleans = { true, false })
     public void testLeaderGracefulShutdown(boolean withKip853Rpc) throws 
Exception {
-        int localId = 0;
-        ReplicaKey otherNodeKey = replicaKey(1, withKip853Rpc);
+        int localId = randomReplicaId();
+        ReplicaKey otherNodeKey = replicaKey(localId + 1, withKip853Rpc);
         Set<Integer> voters = Utils.mkSet(localId, otherNodeKey.id());
 
         RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
@@ -2674,9 +2689,9 @@ public class KafkaRaftClientTest {
     @ParameterizedTest
     @ValueSource(booleans = { true, false })
     public void testEndQuorumEpochSentBasedOnFetchOffset(boolean 
withKip853Rpc) throws Exception {
-        int localId = 0;
-        ReplicaKey closeFollower = replicaKey(2, withKip853Rpc);
-        ReplicaKey laggingFollower = replicaKey(1, withKip853Rpc);
+        int localId = randomReplicaId();
+        ReplicaKey closeFollower = replicaKey(localId + 2, withKip853Rpc);
+        ReplicaKey laggingFollower = replicaKey(localId + 1, withKip853Rpc);
         Set<Integer> voters = Utils.mkSet(localId, closeFollower.id(), 
laggingFollower.id());
 
         RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
@@ -2719,7 +2734,7 @@ public class KafkaRaftClientTest {
     @ParameterizedTest
     @ValueSource(booleans = { true, false })
     public void testDescribeQuorumNonLeader(boolean withKip853Rpc) throws 
Exception {
-        int localId = 0;
+        int localId = randomReplicaId();
         ReplicaKey voter2 = replicaKey(localId + 1, withKip853Rpc);
         ReplicaKey voter3 = replicaKey(localId + 2, withKip853Rpc);
         int epoch = 2;
@@ -2750,9 +2765,9 @@ public class KafkaRaftClientTest {
     @ParameterizedTest
     @ValueSource(booleans = { true, false })
     public void testDescribeQuorum(boolean withKip853Rpc) throws Exception {
-        int localId = 0;
-        ReplicaKey closeFollower = replicaKey(2, withKip853Rpc);
-        ReplicaKey laggingFollower = replicaKey(1, withKip853Rpc);
+        int localId = randomReplicaId();
+        ReplicaKey closeFollower = replicaKey(localId + 2, withKip853Rpc);
+        ReplicaKey laggingFollower = replicaKey(localId + 1, withKip853Rpc);
         Set<Integer> voters = Utils.mkSet(localId, closeFollower.id(), 
laggingFollower.id());
 
         RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
@@ -2777,7 +2792,7 @@ public class KafkaRaftClientTest {
         context.assertSentFetchPartitionResponse(3L, epoch);
 
         // Create observer
-        ReplicaKey observerId = replicaKey(3, withKip853Rpc);
+        ReplicaKey observerId = replicaKey(localId + 3, withKip853Rpc);
         context.time.sleep(100);
         long observerFetchTime = context.time.milliseconds();
         context.deliverRequest(context.fetchRequest(epoch, observerId, 0L, 0, 
0));
@@ -2823,8 +2838,8 @@ public class KafkaRaftClientTest {
     @ParameterizedTest
     @ValueSource(booleans = { true, false })
     public void testLeaderGracefulShutdownTimeout(boolean withKip853Rpc) 
throws Exception {
-        int localId = 0;
-        int otherNodeId = 1;
+        int localId = randomReplicaId();
+        int otherNodeId = localId + 1;
         Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
 
         RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
@@ -2861,8 +2876,8 @@ public class KafkaRaftClientTest {
     @ParameterizedTest
     @ValueSource(booleans = { true, false })
     public void testFollowerGracefulShutdown(boolean withKip853Rpc) throws 
Exception {
-        int localId = 0;
-        int otherNodeId = 1;
+        int localId = randomReplicaId();
+        int otherNodeId = localId + 1;
         int epoch = 5;
         Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
 
@@ -2888,9 +2903,9 @@ public class KafkaRaftClientTest {
     @ParameterizedTest
     @ValueSource(booleans = { true, false })
     public void testObserverGracefulShutdown(boolean withKip853Rpc) throws 
Exception {
-        int localId = 0;
-        int voter1 = 1;
-        int voter2 = 2;
+        int localId = randomReplicaId();
+        int voter1 = localId + 1;
+        int voter2 = localId + 2;
         Set<Integer> voters = Utils.mkSet(voter1, voter2);
 
         RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
@@ -2915,7 +2930,7 @@ public class KafkaRaftClientTest {
     @ParameterizedTest
     @ValueSource(booleans = { true, false })
     public void testGracefulShutdownSingleMemberQuorum(boolean withKip853Rpc) 
throws IOException {
-        int localId = 0;
+        int localId = randomReplicaId();
         RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, Collections.singleton(localId))
             .withKip853Rpc(withKip853Rpc)
             .build();
@@ -2933,8 +2948,8 @@ public class KafkaRaftClientTest {
     @ParameterizedTest
     @ValueSource(booleans = { true, false })
     public void testFollowerReplication(boolean withKip853Rpc) throws 
Exception {
-        int localId = 0;
-        int otherNodeId = 1;
+        int localId = randomReplicaId();
+        int otherNodeId = localId + 1;
         int epoch = 5;
         Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
 
@@ -2963,8 +2978,8 @@ public class KafkaRaftClientTest {
     @ParameterizedTest
     @ValueSource(booleans = { true, false })
     public void testEmptyRecordSetInFetchResponse(boolean withKip853Rpc) 
throws Exception {
-        int localId = 0;
-        int otherNodeId = 1;
+        int localId = randomReplicaId();
+        int otherNodeId = localId + 1;
         int epoch = 5;
         Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
 
@@ -3030,8 +3045,8 @@ public class KafkaRaftClientTest {
     @ParameterizedTest
     @ValueSource(booleans = { true, false })
     public void testFetchShouldBeTreatedAsLeaderAcknowledgement(boolean 
withKip853Rpc) throws Exception {
-        int localId = 0;
-        ReplicaKey otherNodeKey = replicaKey(1, withKip853Rpc);
+        int localId = randomReplicaId();
+        ReplicaKey otherNodeKey = replicaKey(localId + 1, withKip853Rpc);
         int epoch = 5;
         Set<Integer> voters = Utils.mkSet(localId, otherNodeKey.id());
 
@@ -3066,7 +3081,7 @@ public class KafkaRaftClientTest {
     @ParameterizedTest
     @ValueSource(booleans = { true, false })
     public void testLeaderAppendSingleMemberQuorum(boolean withKip853Rpc) 
throws Exception {
-        int localId = 0;
+        int localId = randomReplicaId();
         Set<Integer> voters = Collections.singleton(localId);
 
         RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
@@ -3080,7 +3095,7 @@ public class KafkaRaftClientTest {
         // We still write the leader change message
         assertEquals(OptionalLong.of(1L), context.client.highWatermark());
 
-        String[] appendRecords = new String[] {"a", "b", "c"};
+        String[] appendRecords = new String[]{"a", "b", "c"};
 
         // First poll has no high watermark advance
         context.client.poll();
@@ -3093,7 +3108,7 @@ public class KafkaRaftClientTest {
         assertEquals(OptionalLong.of(4L), context.client.highWatermark());
 
         // Now try reading it
-        ReplicaKey otherNodeKey = replicaKey(1, withKip853Rpc);
+        ReplicaKey otherNodeKey = replicaKey(localId + 1, withKip853Rpc);
         List<MutableRecordBatch> batches = new ArrayList<>(2);
         boolean appended = true;
 
@@ -3142,8 +3157,8 @@ public class KafkaRaftClientTest {
     @ParameterizedTest
     @ValueSource(booleans = { true, false })
     public void testFollowerLogReconciliation(boolean withKip853Rpc) throws 
Exception {
-        int localId = 0;
-        int otherNodeId = 1;
+        int localId = randomReplicaId();
+        int otherNodeId = localId + 1;
         int epoch = 5;
         int lastEpoch = 3;
         Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
@@ -3179,7 +3194,7 @@ public class KafkaRaftClientTest {
     @ParameterizedTest
     @ValueSource(booleans = { true, false })
     public void testMetrics(boolean withKip853Rpc) throws Exception {
-        int localId = 0;
+        int localId = randomReplicaId();
         int epoch = 1;
         RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, Collections.singleton(localId))
             .withKip853Rpc(withKip853Rpc)
@@ -3226,8 +3241,8 @@ public class KafkaRaftClientTest {
     @ParameterizedTest
     @ValueSource(booleans = { true, false })
     public void testClusterAuthorizationFailedInFetch(boolean withKip853Rpc) 
throws Exception {
-        int localId = 0;
-        int otherNodeId = 1;
+        int localId = randomReplicaId();
+        int otherNodeId = localId + 1;
         int epoch = 5;
         Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
 
@@ -3254,8 +3269,8 @@ public class KafkaRaftClientTest {
     @ParameterizedTest
     @ValueSource(booleans = { true, false })
     public void testClusterAuthorizationFailedInBeginQuorumEpoch(boolean 
withKip853Rpc) throws Exception {
-        int localId = 0;
-        int otherNodeId = 1;
+        int localId = randomReplicaId();
+        int otherNodeId = localId + 1;
         int epoch = 5;
         Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
 
@@ -3283,8 +3298,8 @@ public class KafkaRaftClientTest {
     @ParameterizedTest
     @ValueSource(booleans = { true, false })
     public void testClusterAuthorizationFailedInVote(boolean withKip853Rpc) 
throws Exception {
-        int localId = 0;
-        int otherNodeId = 1;
+        int localId = randomReplicaId();
+        int otherNodeId = localId + 1;
         int epoch = 5;
         Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
 
@@ -3309,8 +3324,8 @@ public class KafkaRaftClientTest {
     @ParameterizedTest
     @ValueSource(booleans = { true, false })
     public void testClusterAuthorizationFailedInEndQuorumEpoch(boolean 
withKip853Rpc) throws Exception {
-        int localId = 0;
-        int otherNodeId = 1;
+        int localId = randomReplicaId();
+        int otherNodeId = localId + 1;
         Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
 
         RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
@@ -3337,8 +3352,8 @@ public class KafkaRaftClientTest {
     public void 
testHandleLeaderChangeFiresAfterListenerReachesEpochStartOffsetOnEmptyLog(
         boolean withKip853Rpc
     ) throws Exception {
-        int localId = 0;
-        ReplicaKey otherNodeKey = replicaKey(1, withKip853Rpc);
+        int localId = randomReplicaId();
+        ReplicaKey otherNodeKey = replicaKey(localId + 1, withKip853Rpc);
         Set<Integer> voters = Utils.mkSet(localId, otherNodeKey.id());
 
         RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
@@ -3384,8 +3399,8 @@ public class KafkaRaftClientTest {
     public void 
testHandleLeaderChangeFiresAfterListenerReachesEpochStartOffset(
         boolean withKip853Rpc
     ) throws Exception {
-        int localId = 0;
-        ReplicaKey otherNodeKey = replicaKey(1, withKip853Rpc);
+        int localId = randomReplicaId();
+        ReplicaKey otherNodeKey = replicaKey(localId + 1, withKip853Rpc);
         int epoch = 5;
         Set<Integer> voters = Utils.mkSet(localId, otherNodeKey.id());
 
@@ -3451,8 +3466,8 @@ public class KafkaRaftClientTest {
     @ParameterizedTest
     @ValueSource(booleans = { true, false })
     public void testLateRegisteredListenerCatchesUp(boolean withKip853Rpc) 
throws Exception {
-        int localId = 0;
-        ReplicaKey otherNodeKey = replicaKey(1, withKip853Rpc);
+        int localId = randomReplicaId();
+        ReplicaKey otherNodeKey = replicaKey(localId + 1, withKip853Rpc);
         int epoch = 5;
         Set<Integer> voters = Utils.mkSet(localId, otherNodeKey.id());
 
@@ -3494,8 +3509,8 @@ public class KafkaRaftClientTest {
     @ParameterizedTest
     @ValueSource(booleans = { true, false })
     public void testReregistrationChangesListenerContext(boolean 
withKip853Rpc) throws Exception {
-        int localId = 0;
-        int otherNodeId = 1;
+        int localId = randomReplicaId();
+        int otherNodeId = localId + 1;
         int epoch = 5;
         Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
 
@@ -3537,8 +3552,8 @@ public class KafkaRaftClientTest {
     @ParameterizedTest
     @ValueSource(booleans = { true, false })
     public void 
testHandleCommitCallbackFiresAfterFollowerHighWatermarkAdvances(boolean 
withKip853Rpc) throws Exception {
-        int localId = 0;
-        int otherNodeId = 1;
+        int localId = randomReplicaId();
+        int otherNodeId = localId + 1;
         int epoch = 5;
         Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
 
@@ -3598,9 +3613,8 @@ public class KafkaRaftClientTest {
     public void testHandleCommitCallbackFiresInVotedState(boolean 
withKip853Rpc) throws Exception {
         // This test verifies that the state machine can still catch up even 
while
         // an election is in progress as long as the high watermark is known.
-
-        int localId = 0;
-        ReplicaKey otherNodeKey = replicaKey(1, withKip853Rpc);
+        int localId = randomReplicaId();
+        ReplicaKey otherNodeKey = replicaKey(localId + 1, withKip853Rpc);
         int epoch = 7;
         Set<Integer> voters = Utils.mkSet(localId, otherNodeKey.id());
 
@@ -3646,9 +3660,8 @@ public class KafkaRaftClientTest {
     public void testHandleCommitCallbackFiresInCandidateState(boolean 
withKip853Rpc) throws Exception {
         // This test verifies that the state machine can still catch up even 
while
         // an election is in progress as long as the high watermark is known.
-
-        int localId = 0;
-        ReplicaKey otherNodeKey = replicaKey(1, withKip853Rpc);
+        int localId = randomReplicaId();
+        ReplicaKey otherNodeKey = replicaKey(localId + 1, withKip853Rpc);
         int epoch = 7;
         Set<Integer> voters = Utils.mkSet(localId, otherNodeKey.id());
 
@@ -3705,9 +3718,8 @@ public class KafkaRaftClientTest {
         // When registering a listener while the replica is unattached, it 
should get notified
         // with the current epoch
         // When transitioning to follower, expect another notification with 
the leader and epoch
-
-        int localId = 0;
-        int otherNodeId = 1;
+        int localId = randomReplicaId();
+        int otherNodeId = localId + 1;
         int epoch = 7;
         Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
 
@@ -3741,9 +3753,8 @@ public class KafkaRaftClientTest {
     public void testHandleLeaderChangeFiresAfterFollowerRegistration(boolean 
withKip853Rpc) throws Exception {
         // When registering a listener while the replica is a follower, it 
should get notified with
         // the current leader and epoch
-
-        int localId = 0;
-        int otherNodeId = 1;
+        int localId = randomReplicaId();
+        int otherNodeId = localId + 1;
         int epoch = 7;
         Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
 
@@ -3768,8 +3779,8 @@ public class KafkaRaftClientTest {
     public void testObserverFetchWithNoLocalId(boolean withKip853Rpc) throws 
Exception {
         // When no `localId` is defined, the client will behave as an observer.
         // This is designed for tooling/debugging use cases.
-
-        Set<Integer> voters = Utils.mkSet(1, 2);
+        int leaderId = randomReplicaId();
+        Set<Integer> voters = Utils.mkSet(leaderId, leaderId + 1);
         List<InetSocketAddress> bootstrapServers = voters
             .stream()
             .map(RaftClientTestContext::mockAddress)
@@ -3788,7 +3799,6 @@ public class KafkaRaftClientTest {
         context.assertFetchRequestData(fetchRequest1, 0, 0L, 0);
 
         int leaderEpoch = 5;
-        int leaderId = 1;
 
         context.deliverResponse(
             fetchRequest1.correlationId(),
@@ -3822,10 +3832,10 @@ public class KafkaRaftClientTest {
     }
 
     @ParameterizedTest
-    @CsvSource({"false,false", "false,true", "true,false", "true,true"})
+    @CsvSource({ "false,false", "false,true", "true,false", "true,true" })
     public void testAppendWithRequiredBaseOffset(boolean correctOffset, 
boolean withKip853Rpc) throws Exception {
-        int localId = 0;
-        int otherNodeId = 1;
+        int localId = randomReplicaId();
+        int otherNodeId = localId + 1;
         Set<Integer> voters = Utils.mkSet(localId, otherNodeId);
 
         RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
@@ -3854,4 +3864,8 @@ public class KafkaRaftClientTest {
         Uuid directoryId = withDirectoryId ? Uuid.randomUuid() : 
ReplicaKey.NO_DIRECTORY_ID;
         return ReplicaKey.of(id, directoryId);
     }
+
+    private static int randomReplicaId() {
+        return ThreadLocalRandom.current().nextInt(1025);
+    }
 }
diff --git 
a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java 
b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
index 31d696484f3..600130b41ab 100644
--- a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
+++ b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
@@ -71,6 +71,7 @@ import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -657,6 +658,18 @@ public final class RaftClientTestContext {
             partitionData,
             nodes
         );
+
+        List<ReplicaState> sortedVoters = response
+            .topics()
+            .get(0)
+            .partitions()
+            .get(0)
+            .currentVoters()
+            .stream()
+            .sorted(Comparator.comparingInt(ReplicaState::replicaId))
+            .collect(Collectors.toList());
+        
response.topics().get(0).partitions().get(0).setCurrentVoters(sortedVoters);
+
         assertEquals(expectedResponse, response);
     }
 
@@ -691,10 +704,12 @@ public final class RaftClientTestContext {
 
         VoteResponseData.PartitionData partitionResponse = 
response.topics().get(0).partitions().get(0);
 
-        assertEquals(voteGranted, partitionResponse.voteGranted());
-        assertEquals(error, Errors.forCode(partitionResponse.errorCode()));
-        assertEquals(epoch, partitionResponse.leaderEpoch());
+        String voterIdDebugLog = "Leader Id: " + leaderId +
+            " Partition response leader Id: " + partitionResponse.leaderId();
+        assertEquals(voteGranted, partitionResponse.voteGranted(), 
voterIdDebugLog);
+        assertEquals(error, Errors.forCode(partitionResponse.errorCode()), 
voterIdDebugLog);
         assertEquals(leaderId.orElse(-1), partitionResponse.leaderId());
+        assertEquals(epoch, partitionResponse.leaderEpoch());
 
         if (kip853Rpc && leaderId.isPresent()) {
             Endpoints expectedLeaderEndpoints = 
startingVoters.listeners(leaderId.getAsInt());
@@ -795,7 +810,7 @@ public final class RaftClientTestContext {
     }
 
     void assertSentBeginQuorumEpochResponse(
-            Errors responseError
+        Errors responseError
     ) {
         List<RaftResponse.Outbound> sentMessages = 
drainSentResponses(ApiKeys.BEGIN_QUORUM_EPOCH);
         assertEquals(1, sentMessages.size());
@@ -840,7 +855,12 @@ public final class RaftClientTestContext {
 
         assertEquals(epoch, partitionResponse.leaderEpoch());
         assertEquals(leaderId.orElse(-1), partitionResponse.leaderId());
-        assertEquals(partitionError, 
Errors.forCode(partitionResponse.errorCode()));
+        assertEquals(
+            partitionError,
+            Errors.forCode(partitionResponse.errorCode()),
+            "Leader Id: " + leaderId +
+            " Partition response leader Id: " + partitionResponse.leaderId()
+        );
 
         if (kip853Rpc && leaderId.isPresent()) {
             Endpoints expectedLeaderEndpoints = 
startingVoters.listeners(leaderId.getAsInt());
@@ -1318,7 +1338,11 @@ public final class RaftClientTestContext {
                 .stream()
                 .map(voterId -> new Voter().setVoterId(voterId))
                 .collect(Collectors.toList()),
-            leaderChangeMessage.voters()
+            leaderChangeMessage
+                    .voters()
+                    .stream()
+                    .sorted(Comparator.comparingInt(Voter::voterId))
+                    .collect(Collectors.toList())
         );
         assertEquals(
             grantingVoters


Reply via email to