ahuang98 commented on code in PR #16637: URL: https://github.com/apache/kafka/pull/16637#discussion_r1690605706
########## raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java: ########## @@ -2762,79 +2764,357 @@ public void testDescribeQuorumNonLeader(boolean withKip853Rpc) throws Exception assertEquals(Errors.NOT_LEADER_OR_FOLLOWER.message(), partitionData.errorMessage()); } + @Test + public void testDescribeQuorumWithOnlyStaticVoters() throws Exception { + int localId = 0; + ReplicaKey local = replicaKey(localId, true); + ReplicaKey follower1 = replicaKey(1, true); + Set<Integer> voters = Utils.mkSet(localId, follower1.id()); + + RaftClientTestContext context = new RaftClientTestContext.Builder(localId, local.directoryId().get()) + .withStaticVoters(voters) + .withKip853Rpc(true) + .build(); + + context.becomeLeader(); + int epoch = context.currentEpoch(); + + // Describe quorum response will not include directory ids + context.deliverRequest(context.describeQuorumRequest()); + context.pollUntilResponse(); + List<ReplicaState> expectedVoterStates = Arrays.asList( + new ReplicaState() + .setReplicaId(localId) + .setReplicaDirectoryId(ReplicaKey.NO_DIRECTORY_ID) + .setLogEndOffset(1L) + .setLastFetchTimestamp(context.time.milliseconds()) + .setLastCaughtUpTimestamp(context.time.milliseconds()), + new ReplicaState() + .setReplicaId(follower1.id()) + .setReplicaDirectoryId(ReplicaKey.NO_DIRECTORY_ID) + .setLogEndOffset(-1L) + .setLastFetchTimestamp(-1) + .setLastCaughtUpTimestamp(-1)); + context.assertSentDescribeQuorumResponse(localId, epoch, -1L, expectedVoterStates, Collections.emptyList()); + } + + @ParameterizedTest @ValueSource(booleans = { true, false }) - public void testDescribeQuorum(boolean withKip853Rpc) throws Exception { - int localId = randomReplicaId(); - ReplicaKey closeFollower = replicaKey(localId + 2, withKip853Rpc); - ReplicaKey laggingFollower = replicaKey(localId + 1, withKip853Rpc); - Set<Integer> voters = Utils.mkSet(localId, closeFollower.id(), laggingFollower.id()); - - RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters) - .withKip853Rpc(withKip853Rpc) - .build(); + public void testDescribeQuorumWithFollowers(boolean withKip853Rpc) throws Exception { + int localId = 0; + ReplicaKey local = replicaKey(localId, true); + Uuid localDirectoryId = local.directoryId().get(); + ReplicaKey follower1 = replicaKey(1, true); + Uuid followerDirectoryId1 = follower1.directoryId().get(); + ReplicaKey follower2 = replicaKey(2, false); + Set<Integer> voters = Utils.mkSet(localId, follower1.id(), follower2.id()); + VoterSet voterSet = VoterSetTest.voterSet(Stream.of(local, follower1, follower2)); + + RaftClientTestContext.Builder builder = new RaftClientTestContext.Builder(localId, localDirectoryId) + .withStaticVoters(voters) + .withKip853Rpc(withKip853Rpc); + + if (withKip853Rpc) { + builder.withBootstrapSnapshot(Optional.of(voterSet)); + } + RaftClientTestContext context = builder.build(); context.becomeLeader(); int epoch = context.currentEpoch(); - long laggingFollowerFetchTime = context.time.milliseconds(); - context.deliverRequest(context.fetchRequest(1, laggingFollower, 1L, epoch, 0)); + // Describe quorum response before any fetches made + context.deliverRequest(context.describeQuorumRequest()); context.pollUntilResponse(); - context.assertSentFetchPartitionResponse(1L, epoch); + List<ReplicaState> expectedVoterStates = Arrays.asList( + new ReplicaState() + .setReplicaId(localId) + .setReplicaDirectoryId(withKip853Rpc ? localDirectoryId : ReplicaKey.NO_DIRECTORY_ID) + // the leader will write bootstrap snapshot records (kraft version and voters) to the log if withKip853Rpc + .setLogEndOffset(withKip853Rpc ? 3L : 1L) + .setLastFetchTimestamp(context.time.milliseconds()) + .setLastCaughtUpTimestamp(context.time.milliseconds()), + new ReplicaState() + .setReplicaId(follower1.id()) + .setReplicaDirectoryId(withKip853Rpc ? followerDirectoryId1 : ReplicaKey.NO_DIRECTORY_ID) + .setLogEndOffset(-1L) + .setLastFetchTimestamp(-1) + .setLastCaughtUpTimestamp(-1), + new ReplicaState() + .setReplicaId(follower2.id()) + .setReplicaDirectoryId(ReplicaKey.NO_DIRECTORY_ID) + .setLogEndOffset(-1L) + .setLastFetchTimestamp(-1) + .setLastCaughtUpTimestamp(-1)); + context.assertSentDescribeQuorumResponse(localId, epoch, -1L, expectedVoterStates, Collections.emptyList()); + + // After follower1 makes progress but both followers are not caught up + context.time.sleep(100); + // withKip853Rpc leader will write bootstrap snapshot records (kraft version and voters) to the log + long fetchOffset = withKip853Rpc ? 3L : 1L; + long followerFetchTime1 = context.time.milliseconds(); + context.deliverRequest(context.fetchRequest(1, follower1, fetchOffset, epoch, 0)); + context.pollUntilResponse(); + long expectedHW = fetchOffset; + context.assertSentFetchPartitionResponse(expectedHW, epoch); - context.client.scheduleAppend(epoch, Arrays.asList("foo", "bar")); + List<String> records = Arrays.asList("foo", "bar"); + long nextFetchOffset = fetchOffset + records.size(); + context.client.scheduleAppend(epoch, records); context.client.poll(); + context.deliverRequest(context.describeQuorumRequest()); + context.pollUntilResponse(); + expectedVoterStates.get(0) + .setLogEndOffset(nextFetchOffset) + .setLastFetchTimestamp(context.time.milliseconds()) + .setLastCaughtUpTimestamp(context.time.milliseconds()); + expectedVoterStates.get(1) + .setLogEndOffset(fetchOffset) + .setLastFetchTimestamp(followerFetchTime1) + .setLastCaughtUpTimestamp(followerFetchTime1); Review Comment: opted for adding a sleep -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org