jsancio commented on code in PR #16637: URL: https://github.com/apache/kafka/pull/16637#discussion_r1689873055
########## raft/src/main/java/org/apache/kafka/raft/Endpoints.java: ########## @@ -108,12 +109,25 @@ public AddRaftVoterRequestData.ListenerCollection toAddVoterRequest() { for (Map.Entry<ListenerName, InetSocketAddress> entry : endpoints.entrySet()) { listeners.add( new AddRaftVoterRequestData.Listener() - .setName(entry.getKey().value()) + .setName(entry.getKey().value()) Review Comment: Use spaces instead of tabs. ########## raft/src/main/java/org/apache/kafka/raft/LeaderState.java: ########## @@ -647,20 +602,27 @@ private void updateVoterAndObserverStates(VoterSet lastVoterSet) { // Move any of the remaining old voters to observerStates for (ReplicaState replicaStateEntry : oldVoterStates.values()) { + replicaStateEntry.listeners = Endpoints.empty(); observerStates.putIfAbsent(replicaStateEntry.replicaKey, replicaStateEntry); } } - private static class ReplicaState implements Comparable<ReplicaState> { + static class ReplicaState implements Comparable<ReplicaState> { ReplicaKey replicaKey; + Endpoints listeners; Optional<LogOffsetMetadata> endOffset; long lastFetchTimestamp; long lastFetchLeaderLogEndOffset; long lastCaughtUpTimestamp; boolean hasAcknowledgedLeader; public ReplicaState(ReplicaKey replicaKey, boolean hasAcknowledgedLeader) { + this(replicaKey, hasAcknowledgedLeader, Endpoints.empty()); + } + + public ReplicaState(ReplicaKey replicaKey, boolean hasAcknowledgedLeader, Endpoints listeners) { Review Comment: Let's just have one constructor. I don't see the value of having two constructors. ########## raft/src/main/java/org/apache/kafka/raft/RaftUtil.java: ########## @@ -527,6 +541,45 @@ public static AddRaftVoterResponseData addVoterResponse( .setErrorMessage(errorMessage); } + private static List<DescribeQuorumResponseData.ReplicaState> toReplicaStates( + short apiVersion, + int leaderId, + Collection<LeaderState.ReplicaState> states, + long currentTimeMs + ) { + return states + .stream() + .map(replicaState -> toReplicaState(apiVersion, leaderId, replicaState, currentTimeMs)) + .collect(Collectors.toList()); + } + + private static DescribeQuorumResponseData.ReplicaState toReplicaState( + short apiVersion, + int leaderId, + LeaderState.ReplicaState replicaState, + long currentTimeMs + ) { + final long lastCaughtUpTimestamp; + final long lastFetchTimestamp; + if (replicaState.replicaKey.id() == leaderId) { + lastCaughtUpTimestamp = currentTimeMs; + lastFetchTimestamp = currentTimeMs; + } else { + lastCaughtUpTimestamp = replicaState.lastCaughtUpTimestamp; + lastFetchTimestamp = replicaState.lastFetchTimestamp; Review Comment: These fields in ReplicaState should be private with a public accessor method. E.g. ```java public long lastCaughtUpTimestamp() { return lastCaughtUpTimestamp; } ########## raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java: ########## @@ -648,19 +648,41 @@ void assertSentDescribeQuorumResponse( long highWatermark, List<ReplicaState> voterStates, List<ReplicaState> observerStates + ) { + assertSentDescribeQuorumResponse(leaderId, leaderEpoch, highWatermark, voterStates, observerStates, Errors.NONE); + } + + void assertSentDescribeQuorumResponse( + int leaderId, + int leaderEpoch, + long highWatermark, + List<ReplicaState> voterStates, + List<ReplicaState> observerStates, + Errors error Review Comment: The order of this parameter doesn't match the order used in other methods. `Errors` parameter tends to go first in the method parameter list. ########## raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java: ########## @@ -2762,79 +2764,357 @@ public void testDescribeQuorumNonLeader(boolean withKip853Rpc) throws Exception assertEquals(Errors.NOT_LEADER_OR_FOLLOWER.message(), partitionData.errorMessage()); } + @Test + public void testDescribeQuorumWithOnlyStaticVoters() throws Exception { + int localId = 0; + ReplicaKey local = replicaKey(localId, true); + ReplicaKey follower1 = replicaKey(1, true); + Set<Integer> voters = Utils.mkSet(localId, follower1.id()); Review Comment: Let's make the id randoms. Please take a look at other methods/tests for an example of how to do this. ########## raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java: ########## @@ -2762,79 +2764,357 @@ public void testDescribeQuorumNonLeader(boolean withKip853Rpc) throws Exception assertEquals(Errors.NOT_LEADER_OR_FOLLOWER.message(), partitionData.errorMessage()); } + @Test + public void testDescribeQuorumWithOnlyStaticVoters() throws Exception { + int localId = 0; + ReplicaKey local = replicaKey(localId, true); + ReplicaKey follower1 = replicaKey(1, true); + Set<Integer> voters = Utils.mkSet(localId, follower1.id()); + + RaftClientTestContext context = new RaftClientTestContext.Builder(localId, local.directoryId().get()) + .withStaticVoters(voters) + .withKip853Rpc(true) + .build(); + + context.becomeLeader(); + int epoch = context.currentEpoch(); + + // Describe quorum response will not include directory ids + context.deliverRequest(context.describeQuorumRequest()); + context.pollUntilResponse(); + List<ReplicaState> expectedVoterStates = Arrays.asList( + new ReplicaState() + .setReplicaId(localId) + .setReplicaDirectoryId(ReplicaKey.NO_DIRECTORY_ID) + .setLogEndOffset(1L) + .setLastFetchTimestamp(context.time.milliseconds()) + .setLastCaughtUpTimestamp(context.time.milliseconds()), + new ReplicaState() + .setReplicaId(follower1.id()) + .setReplicaDirectoryId(ReplicaKey.NO_DIRECTORY_ID) + .setLogEndOffset(-1L) + .setLastFetchTimestamp(-1) + .setLastCaughtUpTimestamp(-1)); + context.assertSentDescribeQuorumResponse(localId, epoch, -1L, expectedVoterStates, Collections.emptyList()); + } + + @ParameterizedTest @ValueSource(booleans = { true, false }) - public void testDescribeQuorum(boolean withKip853Rpc) throws Exception { - int localId = randomReplicaId(); - ReplicaKey closeFollower = replicaKey(localId + 2, withKip853Rpc); - ReplicaKey laggingFollower = replicaKey(localId + 1, withKip853Rpc); - Set<Integer> voters = Utils.mkSet(localId, closeFollower.id(), laggingFollower.id()); - - RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters) - .withKip853Rpc(withKip853Rpc) - .build(); + public void testDescribeQuorumWithFollowers(boolean withKip853Rpc) throws Exception { + int localId = 0; + ReplicaKey local = replicaKey(localId, true); + Uuid localDirectoryId = local.directoryId().get(); + ReplicaKey follower1 = replicaKey(1, true); + Uuid followerDirectoryId1 = follower1.directoryId().get(); + ReplicaKey follower2 = replicaKey(2, false); + Set<Integer> voters = Utils.mkSet(localId, follower1.id(), follower2.id()); + VoterSet voterSet = VoterSetTest.voterSet(Stream.of(local, follower1, follower2)); + + RaftClientTestContext.Builder builder = new RaftClientTestContext.Builder(localId, localDirectoryId) + .withStaticVoters(voters) + .withKip853Rpc(withKip853Rpc); + + if (withKip853Rpc) { + builder.withBootstrapSnapshot(Optional.of(voterSet)); + } + RaftClientTestContext context = builder.build(); context.becomeLeader(); int epoch = context.currentEpoch(); - long laggingFollowerFetchTime = context.time.milliseconds(); - context.deliverRequest(context.fetchRequest(1, laggingFollower, 1L, epoch, 0)); + // Describe quorum response before any fetches made + context.deliverRequest(context.describeQuorumRequest()); context.pollUntilResponse(); - context.assertSentFetchPartitionResponse(1L, epoch); + List<ReplicaState> expectedVoterStates = Arrays.asList( + new ReplicaState() + .setReplicaId(localId) + .setReplicaDirectoryId(withKip853Rpc ? localDirectoryId : ReplicaKey.NO_DIRECTORY_ID) + // the leader will write bootstrap snapshot records (kraft version and voters) to the log if withKip853Rpc + .setLogEndOffset(withKip853Rpc ? 3L : 1L) + .setLastFetchTimestamp(context.time.milliseconds()) + .setLastCaughtUpTimestamp(context.time.milliseconds()), + new ReplicaState() + .setReplicaId(follower1.id()) + .setReplicaDirectoryId(withKip853Rpc ? followerDirectoryId1 : ReplicaKey.NO_DIRECTORY_ID) + .setLogEndOffset(-1L) + .setLastFetchTimestamp(-1) + .setLastCaughtUpTimestamp(-1), + new ReplicaState() + .setReplicaId(follower2.id()) + .setReplicaDirectoryId(ReplicaKey.NO_DIRECTORY_ID) + .setLogEndOffset(-1L) + .setLastFetchTimestamp(-1) + .setLastCaughtUpTimestamp(-1)); + context.assertSentDescribeQuorumResponse(localId, epoch, -1L, expectedVoterStates, Collections.emptyList()); + + // After follower1 makes progress but both followers are not caught up + context.time.sleep(100); + // withKip853Rpc leader will write bootstrap snapshot records (kraft version and voters) to the log + long fetchOffset = withKip853Rpc ? 3L : 1L; + long followerFetchTime1 = context.time.milliseconds(); + context.deliverRequest(context.fetchRequest(1, follower1, fetchOffset, epoch, 0)); + context.pollUntilResponse(); + long expectedHW = fetchOffset; + context.assertSentFetchPartitionResponse(expectedHW, epoch); - context.client.scheduleAppend(epoch, Arrays.asList("foo", "bar")); + List<String> records = Arrays.asList("foo", "bar"); + long nextFetchOffset = fetchOffset + records.size(); + context.client.scheduleAppend(epoch, records); context.client.poll(); + context.deliverRequest(context.describeQuorumRequest()); + context.pollUntilResponse(); + expectedVoterStates.get(0) + .setLogEndOffset(nextFetchOffset) + .setLastFetchTimestamp(context.time.milliseconds()) + .setLastCaughtUpTimestamp(context.time.milliseconds()); + expectedVoterStates.get(1) + .setLogEndOffset(fetchOffset) + .setLastFetchTimestamp(followerFetchTime1) + .setLastCaughtUpTimestamp(followerFetchTime1); + context.assertSentDescribeQuorumResponse(localId, epoch, expectedHW, expectedVoterStates, Collections.emptyList()); + + // After follower2 catches up to leader context.time.sleep(100); - long closeFollowerFetchTime = context.time.milliseconds(); - context.deliverRequest(context.fetchRequest(epoch, closeFollower, 3L, epoch, 0)); + long followerFetchTime2 = context.time.milliseconds(); + context.deliverRequest(context.fetchRequest(epoch, follower2, nextFetchOffset, epoch, 0)); context.pollUntilResponse(); - context.assertSentFetchPartitionResponse(3L, epoch); + expectedHW = nextFetchOffset; + context.assertSentFetchPartitionResponse(expectedHW, epoch); + + context.time.sleep(100); + context.deliverRequest(context.describeQuorumRequest()); + context.pollUntilResponse(); + + expectedVoterStates.get(0) + .setLastFetchTimestamp(context.time.milliseconds()) + .setLastCaughtUpTimestamp(context.time.milliseconds()); + expectedVoterStates.get(2) + .setLogEndOffset(nextFetchOffset) + .setLastFetchTimestamp(followerFetchTime2) + .setLastCaughtUpTimestamp(followerFetchTime2); + context.assertSentDescribeQuorumResponse(localId, epoch, expectedHW, expectedVoterStates, Collections.emptyList()); + + // Describe quorum returns error if leader loses leadership + context.time.sleep(context.checkQuorumTimeoutMs); + context.deliverRequest(context.describeQuorumRequest()); + context.pollUntilResponse(); + expectedVoterStates.get(0) + .setLastFetchTimestamp(context.time.milliseconds()) + .setLastCaughtUpTimestamp(context.time.milliseconds()); Review Comment: Why is this necessary? The object `expectedVoterStates` is not used after this point. ########## raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java: ########## @@ -2762,79 +2764,357 @@ public void testDescribeQuorumNonLeader(boolean withKip853Rpc) throws Exception assertEquals(Errors.NOT_LEADER_OR_FOLLOWER.message(), partitionData.errorMessage()); } + @Test + public void testDescribeQuorumWithOnlyStaticVoters() throws Exception { + int localId = 0; + ReplicaKey local = replicaKey(localId, true); + ReplicaKey follower1 = replicaKey(1, true); + Set<Integer> voters = Utils.mkSet(localId, follower1.id()); + + RaftClientTestContext context = new RaftClientTestContext.Builder(localId, local.directoryId().get()) + .withStaticVoters(voters) + .withKip853Rpc(true) Review Comment: Should we test this case with both the new and old RPC versions? ########## raft/src/main/java/org/apache/kafka/raft/internals/VoterSet.java: ########## @@ -321,7 +321,7 @@ public boolean isVoter(ReplicaKey replicaKey) { } } - Endpoints listeners() { + public Endpoints listeners() { Review Comment: Add a java doc section. ########## raft/src/main/java/org/apache/kafka/raft/LeaderState.java: ########## @@ -647,20 +602,27 @@ private void updateVoterAndObserverStates(VoterSet lastVoterSet) { // Move any of the remaining old voters to observerStates for (ReplicaState replicaStateEntry : oldVoterStates.values()) { + replicaStateEntry.listeners = Endpoints.empty(); observerStates.putIfAbsent(replicaStateEntry.replicaKey, replicaStateEntry); } } - private static class ReplicaState implements Comparable<ReplicaState> { + static class ReplicaState implements Comparable<ReplicaState> { ReplicaKey replicaKey; + Endpoints listeners; Review Comment: Now that you are exposing this type to the public, let's make these fields as private and final. ########## raft/src/main/java/org/apache/kafka/raft/RaftUtil.java: ########## @@ -527,6 +541,45 @@ public static AddRaftVoterResponseData addVoterResponse( .setErrorMessage(errorMessage); } + private static List<DescribeQuorumResponseData.ReplicaState> toReplicaStates( + short apiVersion, + int leaderId, + Collection<LeaderState.ReplicaState> states, + long currentTimeMs + ) { + return states + .stream() + .map(replicaState -> toReplicaState(apiVersion, leaderId, replicaState, currentTimeMs)) + .collect(Collectors.toList()); + } + + private static DescribeQuorumResponseData.ReplicaState toReplicaState( + short apiVersion, + int leaderId, + LeaderState.ReplicaState replicaState, + long currentTimeMs + ) { + final long lastCaughtUpTimestamp; + final long lastFetchTimestamp; + if (replicaState.replicaKey.id() == leaderId) { + lastCaughtUpTimestamp = currentTimeMs; + lastFetchTimestamp = currentTimeMs; + } else { + lastCaughtUpTimestamp = replicaState.lastCaughtUpTimestamp; + lastFetchTimestamp = replicaState.lastFetchTimestamp; + } + DescribeQuorumResponseData.ReplicaState replicaStateData = new DescribeQuorumResponseData.ReplicaState() + .setReplicaId(replicaState.replicaKey.id()) + .setLogEndOffset(replicaState.endOffset.map(LogOffsetMetadata::offset).orElse(-1L)) Review Comment: The `endOffset` field in ReplicaState should be private with a public accessor `public LogOffsetMetadata endOffset()`. This comment applies to all of the fields of `ReplicaState`. ########## raft/src/main/java/org/apache/kafka/raft/LeaderState.java: ########## @@ -647,20 +602,27 @@ private void updateVoterAndObserverStates(VoterSet lastVoterSet) { // Move any of the remaining old voters to observerStates for (ReplicaState replicaStateEntry : oldVoterStates.values()) { + replicaStateEntry.listeners = Endpoints.empty(); observerStates.putIfAbsent(replicaStateEntry.replicaKey, replicaStateEntry); } } - private static class ReplicaState implements Comparable<ReplicaState> { + static class ReplicaState implements Comparable<ReplicaState> { ReplicaKey replicaKey; + Endpoints listeners; Optional<LogOffsetMetadata> endOffset; long lastFetchTimestamp; long lastFetchLeaderLogEndOffset; long lastCaughtUpTimestamp; boolean hasAcknowledgedLeader; Review Comment: Now that you are exposing this type to the public, let's make these fields as private. ########## raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java: ########## @@ -648,19 +648,41 @@ void assertSentDescribeQuorumResponse( long highWatermark, List<ReplicaState> voterStates, List<ReplicaState> observerStates + ) { + assertSentDescribeQuorumResponse(leaderId, leaderEpoch, highWatermark, voterStates, observerStates, Errors.NONE); + } + + void assertSentDescribeQuorumResponse( + int leaderId, + int leaderEpoch, + long highWatermark, + List<ReplicaState> voterStates, + List<ReplicaState> observerStates, + Errors error ) { DescribeQuorumResponseData response = collectDescribeQuorumResponse(); DescribeQuorumResponseData.PartitionData partitionData = new DescribeQuorumResponseData.PartitionData() - .setErrorCode(Errors.NONE.code()) + .setErrorCode(error.code()) Review Comment: This comment applies to the entire method. Why reimplement this instead of using `RaftUtil.singletonDescribeQuorumResponse`? ########## raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java: ########## @@ -2762,79 +2764,357 @@ public void testDescribeQuorumNonLeader(boolean withKip853Rpc) throws Exception assertEquals(Errors.NOT_LEADER_OR_FOLLOWER.message(), partitionData.errorMessage()); } + @Test + public void testDescribeQuorumWithOnlyStaticVoters() throws Exception { + int localId = 0; + ReplicaKey local = replicaKey(localId, true); + ReplicaKey follower1 = replicaKey(1, true); + Set<Integer> voters = Utils.mkSet(localId, follower1.id()); + + RaftClientTestContext context = new RaftClientTestContext.Builder(localId, local.directoryId().get()) + .withStaticVoters(voters) + .withKip853Rpc(true) + .build(); + + context.becomeLeader(); + int epoch = context.currentEpoch(); + + // Describe quorum response will not include directory ids + context.deliverRequest(context.describeQuorumRequest()); + context.pollUntilResponse(); + List<ReplicaState> expectedVoterStates = Arrays.asList( + new ReplicaState() + .setReplicaId(localId) + .setReplicaDirectoryId(ReplicaKey.NO_DIRECTORY_ID) + .setLogEndOffset(1L) + .setLastFetchTimestamp(context.time.milliseconds()) + .setLastCaughtUpTimestamp(context.time.milliseconds()), + new ReplicaState() + .setReplicaId(follower1.id()) + .setReplicaDirectoryId(ReplicaKey.NO_DIRECTORY_ID) + .setLogEndOffset(-1L) + .setLastFetchTimestamp(-1) + .setLastCaughtUpTimestamp(-1)); + context.assertSentDescribeQuorumResponse(localId, epoch, -1L, expectedVoterStates, Collections.emptyList()); + } + + @ParameterizedTest @ValueSource(booleans = { true, false }) - public void testDescribeQuorum(boolean withKip853Rpc) throws Exception { - int localId = randomReplicaId(); - ReplicaKey closeFollower = replicaKey(localId + 2, withKip853Rpc); - ReplicaKey laggingFollower = replicaKey(localId + 1, withKip853Rpc); - Set<Integer> voters = Utils.mkSet(localId, closeFollower.id(), laggingFollower.id()); - - RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters) - .withKip853Rpc(withKip853Rpc) - .build(); + public void testDescribeQuorumWithFollowers(boolean withKip853Rpc) throws Exception { + int localId = 0; + ReplicaKey local = replicaKey(localId, true); + Uuid localDirectoryId = local.directoryId().get(); + ReplicaKey follower1 = replicaKey(1, true); + Uuid followerDirectoryId1 = follower1.directoryId().get(); + ReplicaKey follower2 = replicaKey(2, false); Review Comment: Let's make the replica ids random. Please take a look at other methods/tests for an example on how to do this. ########## raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java: ########## @@ -2762,79 +2764,357 @@ public void testDescribeQuorumNonLeader(boolean withKip853Rpc) throws Exception assertEquals(Errors.NOT_LEADER_OR_FOLLOWER.message(), partitionData.errorMessage()); } + @Test + public void testDescribeQuorumWithOnlyStaticVoters() throws Exception { + int localId = 0; + ReplicaKey local = replicaKey(localId, true); + ReplicaKey follower1 = replicaKey(1, true); + Set<Integer> voters = Utils.mkSet(localId, follower1.id()); + + RaftClientTestContext context = new RaftClientTestContext.Builder(localId, local.directoryId().get()) + .withStaticVoters(voters) + .withKip853Rpc(true) + .build(); + + context.becomeLeader(); + int epoch = context.currentEpoch(); + + // Describe quorum response will not include directory ids + context.deliverRequest(context.describeQuorumRequest()); + context.pollUntilResponse(); + List<ReplicaState> expectedVoterStates = Arrays.asList( + new ReplicaState() + .setReplicaId(localId) + .setReplicaDirectoryId(ReplicaKey.NO_DIRECTORY_ID) + .setLogEndOffset(1L) + .setLastFetchTimestamp(context.time.milliseconds()) + .setLastCaughtUpTimestamp(context.time.milliseconds()), + new ReplicaState() + .setReplicaId(follower1.id()) + .setReplicaDirectoryId(ReplicaKey.NO_DIRECTORY_ID) + .setLogEndOffset(-1L) + .setLastFetchTimestamp(-1) + .setLastCaughtUpTimestamp(-1)); + context.assertSentDescribeQuorumResponse(localId, epoch, -1L, expectedVoterStates, Collections.emptyList()); + } + + @ParameterizedTest @ValueSource(booleans = { true, false }) - public void testDescribeQuorum(boolean withKip853Rpc) throws Exception { - int localId = randomReplicaId(); - ReplicaKey closeFollower = replicaKey(localId + 2, withKip853Rpc); - ReplicaKey laggingFollower = replicaKey(localId + 1, withKip853Rpc); - Set<Integer> voters = Utils.mkSet(localId, closeFollower.id(), laggingFollower.id()); - - RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters) - .withKip853Rpc(withKip853Rpc) - .build(); + public void testDescribeQuorumWithFollowers(boolean withKip853Rpc) throws Exception { + int localId = 0; + ReplicaKey local = replicaKey(localId, true); + Uuid localDirectoryId = local.directoryId().get(); + ReplicaKey follower1 = replicaKey(1, true); + Uuid followerDirectoryId1 = follower1.directoryId().get(); + ReplicaKey follower2 = replicaKey(2, false); + Set<Integer> voters = Utils.mkSet(localId, follower1.id(), follower2.id()); + VoterSet voterSet = VoterSetTest.voterSet(Stream.of(local, follower1, follower2)); + + RaftClientTestContext.Builder builder = new RaftClientTestContext.Builder(localId, localDirectoryId) + .withStaticVoters(voters) + .withKip853Rpc(withKip853Rpc); + + if (withKip853Rpc) { + builder.withBootstrapSnapshot(Optional.of(voterSet)); + } + RaftClientTestContext context = builder.build(); context.becomeLeader(); int epoch = context.currentEpoch(); - long laggingFollowerFetchTime = context.time.milliseconds(); - context.deliverRequest(context.fetchRequest(1, laggingFollower, 1L, epoch, 0)); + // Describe quorum response before any fetches made + context.deliverRequest(context.describeQuorumRequest()); context.pollUntilResponse(); - context.assertSentFetchPartitionResponse(1L, epoch); + List<ReplicaState> expectedVoterStates = Arrays.asList( + new ReplicaState() + .setReplicaId(localId) + .setReplicaDirectoryId(withKip853Rpc ? localDirectoryId : ReplicaKey.NO_DIRECTORY_ID) + // the leader will write bootstrap snapshot records (kraft version and voters) to the log if withKip853Rpc + .setLogEndOffset(withKip853Rpc ? 3L : 1L) + .setLastFetchTimestamp(context.time.milliseconds()) + .setLastCaughtUpTimestamp(context.time.milliseconds()), + new ReplicaState() + .setReplicaId(follower1.id()) + .setReplicaDirectoryId(withKip853Rpc ? followerDirectoryId1 : ReplicaKey.NO_DIRECTORY_ID) + .setLogEndOffset(-1L) + .setLastFetchTimestamp(-1) + .setLastCaughtUpTimestamp(-1), + new ReplicaState() + .setReplicaId(follower2.id()) + .setReplicaDirectoryId(ReplicaKey.NO_DIRECTORY_ID) + .setLogEndOffset(-1L) + .setLastFetchTimestamp(-1) + .setLastCaughtUpTimestamp(-1)); + context.assertSentDescribeQuorumResponse(localId, epoch, -1L, expectedVoterStates, Collections.emptyList()); + + // After follower1 makes progress but both followers are not caught up + context.time.sleep(100); + // withKip853Rpc leader will write bootstrap snapshot records (kraft version and voters) to the log + long fetchOffset = withKip853Rpc ? 3L : 1L; + long followerFetchTime1 = context.time.milliseconds(); + context.deliverRequest(context.fetchRequest(1, follower1, fetchOffset, epoch, 0)); + context.pollUntilResponse(); + long expectedHW = fetchOffset; + context.assertSentFetchPartitionResponse(expectedHW, epoch); - context.client.scheduleAppend(epoch, Arrays.asList("foo", "bar")); + List<String> records = Arrays.asList("foo", "bar"); + long nextFetchOffset = fetchOffset + records.size(); + context.client.scheduleAppend(epoch, records); context.client.poll(); + context.deliverRequest(context.describeQuorumRequest()); + context.pollUntilResponse(); + expectedVoterStates.get(0) + .setLogEndOffset(nextFetchOffset) + .setLastFetchTimestamp(context.time.milliseconds()) + .setLastCaughtUpTimestamp(context.time.milliseconds()); + expectedVoterStates.get(1) + .setLogEndOffset(fetchOffset) + .setLastFetchTimestamp(followerFetchTime1) + .setLastCaughtUpTimestamp(followerFetchTime1); Review Comment: `followerFetchTime1` is `context.time.milliseconds` since you didn't sleep after sending the FETCH request. In other words, you can remove the variable `followerFetchTime1` and make it easier to read. ########## raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java: ########## @@ -2762,79 +2764,357 @@ public void testDescribeQuorumNonLeader(boolean withKip853Rpc) throws Exception assertEquals(Errors.NOT_LEADER_OR_FOLLOWER.message(), partitionData.errorMessage()); } + @Test + public void testDescribeQuorumWithOnlyStaticVoters() throws Exception { + int localId = 0; + ReplicaKey local = replicaKey(localId, true); + ReplicaKey follower1 = replicaKey(1, true); + Set<Integer> voters = Utils.mkSet(localId, follower1.id()); + + RaftClientTestContext context = new RaftClientTestContext.Builder(localId, local.directoryId().get()) + .withStaticVoters(voters) + .withKip853Rpc(true) + .build(); + + context.becomeLeader(); + int epoch = context.currentEpoch(); + + // Describe quorum response will not include directory ids + context.deliverRequest(context.describeQuorumRequest()); + context.pollUntilResponse(); + List<ReplicaState> expectedVoterStates = Arrays.asList( + new ReplicaState() + .setReplicaId(localId) + .setReplicaDirectoryId(ReplicaKey.NO_DIRECTORY_ID) + .setLogEndOffset(1L) + .setLastFetchTimestamp(context.time.milliseconds()) + .setLastCaughtUpTimestamp(context.time.milliseconds()), + new ReplicaState() + .setReplicaId(follower1.id()) + .setReplicaDirectoryId(ReplicaKey.NO_DIRECTORY_ID) + .setLogEndOffset(-1L) + .setLastFetchTimestamp(-1) + .setLastCaughtUpTimestamp(-1)); + context.assertSentDescribeQuorumResponse(localId, epoch, -1L, expectedVoterStates, Collections.emptyList()); + } + + @ParameterizedTest @ValueSource(booleans = { true, false }) - public void testDescribeQuorum(boolean withKip853Rpc) throws Exception { - int localId = randomReplicaId(); - ReplicaKey closeFollower = replicaKey(localId + 2, withKip853Rpc); - ReplicaKey laggingFollower = replicaKey(localId + 1, withKip853Rpc); - Set<Integer> voters = Utils.mkSet(localId, closeFollower.id(), laggingFollower.id()); - - RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters) - .withKip853Rpc(withKip853Rpc) - .build(); + public void testDescribeQuorumWithFollowers(boolean withKip853Rpc) throws Exception { + int localId = 0; + ReplicaKey local = replicaKey(localId, true); + Uuid localDirectoryId = local.directoryId().get(); + ReplicaKey follower1 = replicaKey(1, true); + Uuid followerDirectoryId1 = follower1.directoryId().get(); + ReplicaKey follower2 = replicaKey(2, false); + Set<Integer> voters = Utils.mkSet(localId, follower1.id(), follower2.id()); + VoterSet voterSet = VoterSetTest.voterSet(Stream.of(local, follower1, follower2)); + + RaftClientTestContext.Builder builder = new RaftClientTestContext.Builder(localId, localDirectoryId) + .withStaticVoters(voters) + .withKip853Rpc(withKip853Rpc); + + if (withKip853Rpc) { + builder.withBootstrapSnapshot(Optional.of(voterSet)); Review Comment: Did you meant to set both the static voters and the bootstrap checkpoint when `withKip853Rpc` is true? This comment applies to a few places. ########## raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java: ########## @@ -2762,79 +2764,357 @@ public void testDescribeQuorumNonLeader(boolean withKip853Rpc) throws Exception assertEquals(Errors.NOT_LEADER_OR_FOLLOWER.message(), partitionData.errorMessage()); } + @Test + public void testDescribeQuorumWithOnlyStaticVoters() throws Exception { + int localId = 0; + ReplicaKey local = replicaKey(localId, true); + ReplicaKey follower1 = replicaKey(1, true); + Set<Integer> voters = Utils.mkSet(localId, follower1.id()); + + RaftClientTestContext context = new RaftClientTestContext.Builder(localId, local.directoryId().get()) + .withStaticVoters(voters) + .withKip853Rpc(true) + .build(); + + context.becomeLeader(); + int epoch = context.currentEpoch(); + + // Describe quorum response will not include directory ids + context.deliverRequest(context.describeQuorumRequest()); + context.pollUntilResponse(); + List<ReplicaState> expectedVoterStates = Arrays.asList( + new ReplicaState() + .setReplicaId(localId) + .setReplicaDirectoryId(ReplicaKey.NO_DIRECTORY_ID) + .setLogEndOffset(1L) + .setLastFetchTimestamp(context.time.milliseconds()) + .setLastCaughtUpTimestamp(context.time.milliseconds()), + new ReplicaState() + .setReplicaId(follower1.id()) + .setReplicaDirectoryId(ReplicaKey.NO_DIRECTORY_ID) + .setLogEndOffset(-1L) + .setLastFetchTimestamp(-1) + .setLastCaughtUpTimestamp(-1)); + context.assertSentDescribeQuorumResponse(localId, epoch, -1L, expectedVoterStates, Collections.emptyList()); + } + + @ParameterizedTest @ValueSource(booleans = { true, false }) - public void testDescribeQuorum(boolean withKip853Rpc) throws Exception { - int localId = randomReplicaId(); - ReplicaKey closeFollower = replicaKey(localId + 2, withKip853Rpc); - ReplicaKey laggingFollower = replicaKey(localId + 1, withKip853Rpc); - Set<Integer> voters = Utils.mkSet(localId, closeFollower.id(), laggingFollower.id()); - - RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters) - .withKip853Rpc(withKip853Rpc) - .build(); + public void testDescribeQuorumWithFollowers(boolean withKip853Rpc) throws Exception { + int localId = 0; + ReplicaKey local = replicaKey(localId, true); + Uuid localDirectoryId = local.directoryId().get(); + ReplicaKey follower1 = replicaKey(1, true); + Uuid followerDirectoryId1 = follower1.directoryId().get(); + ReplicaKey follower2 = replicaKey(2, false); Review Comment: Looks like you want to use `replicaKey(someId, withKip853Rpc)` and compute the directory id sent over the RPC with `replicaKey.directoryId().orElse(ReplicaKey.NO_DIRECTORY_ID)`. ########## raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java: ########## @@ -2762,79 +2764,357 @@ public void testDescribeQuorumNonLeader(boolean withKip853Rpc) throws Exception assertEquals(Errors.NOT_LEADER_OR_FOLLOWER.message(), partitionData.errorMessage()); } + @Test + public void testDescribeQuorumWithOnlyStaticVoters() throws Exception { + int localId = 0; + ReplicaKey local = replicaKey(localId, true); + ReplicaKey follower1 = replicaKey(1, true); + Set<Integer> voters = Utils.mkSet(localId, follower1.id()); + + RaftClientTestContext context = new RaftClientTestContext.Builder(localId, local.directoryId().get()) + .withStaticVoters(voters) + .withKip853Rpc(true) + .build(); + + context.becomeLeader(); + int epoch = context.currentEpoch(); + + // Describe quorum response will not include directory ids + context.deliverRequest(context.describeQuorumRequest()); + context.pollUntilResponse(); + List<ReplicaState> expectedVoterStates = Arrays.asList( + new ReplicaState() + .setReplicaId(localId) + .setReplicaDirectoryId(ReplicaKey.NO_DIRECTORY_ID) + .setLogEndOffset(1L) + .setLastFetchTimestamp(context.time.milliseconds()) + .setLastCaughtUpTimestamp(context.time.milliseconds()), + new ReplicaState() + .setReplicaId(follower1.id()) + .setReplicaDirectoryId(ReplicaKey.NO_DIRECTORY_ID) + .setLogEndOffset(-1L) + .setLastFetchTimestamp(-1) + .setLastCaughtUpTimestamp(-1)); + context.assertSentDescribeQuorumResponse(localId, epoch, -1L, expectedVoterStates, Collections.emptyList()); + } + + @ParameterizedTest @ValueSource(booleans = { true, false }) - public void testDescribeQuorum(boolean withKip853Rpc) throws Exception { - int localId = randomReplicaId(); - ReplicaKey closeFollower = replicaKey(localId + 2, withKip853Rpc); - ReplicaKey laggingFollower = replicaKey(localId + 1, withKip853Rpc); - Set<Integer> voters = Utils.mkSet(localId, closeFollower.id(), laggingFollower.id()); - - RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters) - .withKip853Rpc(withKip853Rpc) - .build(); + public void testDescribeQuorumWithFollowers(boolean withKip853Rpc) throws Exception { + int localId = 0; + ReplicaKey local = replicaKey(localId, true); + Uuid localDirectoryId = local.directoryId().get(); + ReplicaKey follower1 = replicaKey(1, true); + Uuid followerDirectoryId1 = follower1.directoryId().get(); + ReplicaKey follower2 = replicaKey(2, false); + Set<Integer> voters = Utils.mkSet(localId, follower1.id(), follower2.id()); + VoterSet voterSet = VoterSetTest.voterSet(Stream.of(local, follower1, follower2)); + + RaftClientTestContext.Builder builder = new RaftClientTestContext.Builder(localId, localDirectoryId) + .withStaticVoters(voters) + .withKip853Rpc(withKip853Rpc); + + if (withKip853Rpc) { + builder.withBootstrapSnapshot(Optional.of(voterSet)); + } + RaftClientTestContext context = builder.build(); context.becomeLeader(); int epoch = context.currentEpoch(); - long laggingFollowerFetchTime = context.time.milliseconds(); - context.deliverRequest(context.fetchRequest(1, laggingFollower, 1L, epoch, 0)); + // Describe quorum response before any fetches made + context.deliverRequest(context.describeQuorumRequest()); context.pollUntilResponse(); - context.assertSentFetchPartitionResponse(1L, epoch); + List<ReplicaState> expectedVoterStates = Arrays.asList( + new ReplicaState() + .setReplicaId(localId) + .setReplicaDirectoryId(withKip853Rpc ? localDirectoryId : ReplicaKey.NO_DIRECTORY_ID) + // the leader will write bootstrap snapshot records (kraft version and voters) to the log if withKip853Rpc + .setLogEndOffset(withKip853Rpc ? 3L : 1L) + .setLastFetchTimestamp(context.time.milliseconds()) + .setLastCaughtUpTimestamp(context.time.milliseconds()), + new ReplicaState() + .setReplicaId(follower1.id()) + .setReplicaDirectoryId(withKip853Rpc ? followerDirectoryId1 : ReplicaKey.NO_DIRECTORY_ID) + .setLogEndOffset(-1L) + .setLastFetchTimestamp(-1) + .setLastCaughtUpTimestamp(-1), + new ReplicaState() + .setReplicaId(follower2.id()) + .setReplicaDirectoryId(ReplicaKey.NO_DIRECTORY_ID) + .setLogEndOffset(-1L) + .setLastFetchTimestamp(-1) + .setLastCaughtUpTimestamp(-1)); + context.assertSentDescribeQuorumResponse(localId, epoch, -1L, expectedVoterStates, Collections.emptyList()); + + // After follower1 makes progress but both followers are not caught up + context.time.sleep(100); + // withKip853Rpc leader will write bootstrap snapshot records (kraft version and voters) to the log + long fetchOffset = withKip853Rpc ? 3L : 1L; + long followerFetchTime1 = context.time.milliseconds(); + context.deliverRequest(context.fetchRequest(1, follower1, fetchOffset, epoch, 0)); + context.pollUntilResponse(); + long expectedHW = fetchOffset; + context.assertSentFetchPartitionResponse(expectedHW, epoch); - context.client.scheduleAppend(epoch, Arrays.asList("foo", "bar")); + List<String> records = Arrays.asList("foo", "bar"); + long nextFetchOffset = fetchOffset + records.size(); + context.client.scheduleAppend(epoch, records); context.client.poll(); + context.deliverRequest(context.describeQuorumRequest()); + context.pollUntilResponse(); + expectedVoterStates.get(0) + .setLogEndOffset(nextFetchOffset) + .setLastFetchTimestamp(context.time.milliseconds()) + .setLastCaughtUpTimestamp(context.time.milliseconds()); + expectedVoterStates.get(1) + .setLogEndOffset(fetchOffset) + .setLastFetchTimestamp(followerFetchTime1) + .setLastCaughtUpTimestamp(followerFetchTime1); + context.assertSentDescribeQuorumResponse(localId, epoch, expectedHW, expectedVoterStates, Collections.emptyList()); + + // After follower2 catches up to leader context.time.sleep(100); - long closeFollowerFetchTime = context.time.milliseconds(); - context.deliverRequest(context.fetchRequest(epoch, closeFollower, 3L, epoch, 0)); + long followerFetchTime2 = context.time.milliseconds(); + context.deliverRequest(context.fetchRequest(epoch, follower2, nextFetchOffset, epoch, 0)); context.pollUntilResponse(); - context.assertSentFetchPartitionResponse(3L, epoch); + expectedHW = nextFetchOffset; + context.assertSentFetchPartitionResponse(expectedHW, epoch); + + context.time.sleep(100); + context.deliverRequest(context.describeQuorumRequest()); + context.pollUntilResponse(); + + expectedVoterStates.get(0) + .setLastFetchTimestamp(context.time.milliseconds()) + .setLastCaughtUpTimestamp(context.time.milliseconds()); + expectedVoterStates.get(2) + .setLogEndOffset(nextFetchOffset) + .setLastFetchTimestamp(followerFetchTime2) + .setLastCaughtUpTimestamp(followerFetchTime2); + context.assertSentDescribeQuorumResponse(localId, epoch, expectedHW, expectedVoterStates, Collections.emptyList()); + + // Describe quorum returns error if leader loses leadership + context.time.sleep(context.checkQuorumTimeoutMs); + context.deliverRequest(context.describeQuorumRequest()); + context.pollUntilResponse(); + expectedVoterStates.get(0) + .setLastFetchTimestamp(context.time.milliseconds()) + .setLastCaughtUpTimestamp(context.time.milliseconds()); + context.assertSentDescribeQuorumResponse(0, 0, 0, Collections.emptyList(), Collections.emptyList(), Errors.NOT_LEADER_OR_FOLLOWER); + } + + @ParameterizedTest + @ValueSource(booleans = { true, false }) + public void testDescribeQuorumWithObserver(boolean withKip853Rpc) throws Exception { + int localId = 0; + ReplicaKey local = replicaKey(localId, true); + Uuid localDirectoryId = local.directoryId().get(); + ReplicaKey follower = replicaKey(1, withKip853Rpc); + Uuid followerDirectoryId = withKip853Rpc ? follower.directoryId().get() : ReplicaKey.NO_DIRECTORY_ID; Review Comment: You can use Optional's orElse. ########## raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java: ########## @@ -2762,79 +2764,357 @@ public void testDescribeQuorumNonLeader(boolean withKip853Rpc) throws Exception assertEquals(Errors.NOT_LEADER_OR_FOLLOWER.message(), partitionData.errorMessage()); } + @Test + public void testDescribeQuorumWithOnlyStaticVoters() throws Exception { + int localId = 0; + ReplicaKey local = replicaKey(localId, true); + ReplicaKey follower1 = replicaKey(1, true); + Set<Integer> voters = Utils.mkSet(localId, follower1.id()); + + RaftClientTestContext context = new RaftClientTestContext.Builder(localId, local.directoryId().get()) + .withStaticVoters(voters) + .withKip853Rpc(true) + .build(); + + context.becomeLeader(); + int epoch = context.currentEpoch(); + + // Describe quorum response will not include directory ids + context.deliverRequest(context.describeQuorumRequest()); + context.pollUntilResponse(); + List<ReplicaState> expectedVoterStates = Arrays.asList( + new ReplicaState() + .setReplicaId(localId) + .setReplicaDirectoryId(ReplicaKey.NO_DIRECTORY_ID) + .setLogEndOffset(1L) + .setLastFetchTimestamp(context.time.milliseconds()) + .setLastCaughtUpTimestamp(context.time.milliseconds()), + new ReplicaState() + .setReplicaId(follower1.id()) + .setReplicaDirectoryId(ReplicaKey.NO_DIRECTORY_ID) + .setLogEndOffset(-1L) + .setLastFetchTimestamp(-1) + .setLastCaughtUpTimestamp(-1)); + context.assertSentDescribeQuorumResponse(localId, epoch, -1L, expectedVoterStates, Collections.emptyList()); + } + + @ParameterizedTest @ValueSource(booleans = { true, false }) - public void testDescribeQuorum(boolean withKip853Rpc) throws Exception { - int localId = randomReplicaId(); - ReplicaKey closeFollower = replicaKey(localId + 2, withKip853Rpc); - ReplicaKey laggingFollower = replicaKey(localId + 1, withKip853Rpc); - Set<Integer> voters = Utils.mkSet(localId, closeFollower.id(), laggingFollower.id()); - - RaftClientTestContext context = new RaftClientTestContext.Builder(localId, voters) - .withKip853Rpc(withKip853Rpc) - .build(); + public void testDescribeQuorumWithFollowers(boolean withKip853Rpc) throws Exception { + int localId = 0; + ReplicaKey local = replicaKey(localId, true); + Uuid localDirectoryId = local.directoryId().get(); + ReplicaKey follower1 = replicaKey(1, true); + Uuid followerDirectoryId1 = follower1.directoryId().get(); + ReplicaKey follower2 = replicaKey(2, false); + Set<Integer> voters = Utils.mkSet(localId, follower1.id(), follower2.id()); + VoterSet voterSet = VoterSetTest.voterSet(Stream.of(local, follower1, follower2)); + + RaftClientTestContext.Builder builder = new RaftClientTestContext.Builder(localId, localDirectoryId) + .withStaticVoters(voters) + .withKip853Rpc(withKip853Rpc); + + if (withKip853Rpc) { + builder.withBootstrapSnapshot(Optional.of(voterSet)); + } + RaftClientTestContext context = builder.build(); context.becomeLeader(); int epoch = context.currentEpoch(); - long laggingFollowerFetchTime = context.time.milliseconds(); - context.deliverRequest(context.fetchRequest(1, laggingFollower, 1L, epoch, 0)); + // Describe quorum response before any fetches made + context.deliverRequest(context.describeQuorumRequest()); context.pollUntilResponse(); - context.assertSentFetchPartitionResponse(1L, epoch); + List<ReplicaState> expectedVoterStates = Arrays.asList( + new ReplicaState() + .setReplicaId(localId) + .setReplicaDirectoryId(withKip853Rpc ? localDirectoryId : ReplicaKey.NO_DIRECTORY_ID) + // the leader will write bootstrap snapshot records (kraft version and voters) to the log if withKip853Rpc + .setLogEndOffset(withKip853Rpc ? 3L : 1L) + .setLastFetchTimestamp(context.time.milliseconds()) + .setLastCaughtUpTimestamp(context.time.milliseconds()), + new ReplicaState() + .setReplicaId(follower1.id()) + .setReplicaDirectoryId(withKip853Rpc ? followerDirectoryId1 : ReplicaKey.NO_DIRECTORY_ID) + .setLogEndOffset(-1L) + .setLastFetchTimestamp(-1) + .setLastCaughtUpTimestamp(-1), + new ReplicaState() + .setReplicaId(follower2.id()) + .setReplicaDirectoryId(ReplicaKey.NO_DIRECTORY_ID) + .setLogEndOffset(-1L) + .setLastFetchTimestamp(-1) + .setLastCaughtUpTimestamp(-1)); + context.assertSentDescribeQuorumResponse(localId, epoch, -1L, expectedVoterStates, Collections.emptyList()); + + // After follower1 makes progress but both followers are not caught up + context.time.sleep(100); + // withKip853Rpc leader will write bootstrap snapshot records (kraft version and voters) to the log + long fetchOffset = withKip853Rpc ? 3L : 1L; + long followerFetchTime1 = context.time.milliseconds(); + context.deliverRequest(context.fetchRequest(1, follower1, fetchOffset, epoch, 0)); + context.pollUntilResponse(); + long expectedHW = fetchOffset; + context.assertSentFetchPartitionResponse(expectedHW, epoch); - context.client.scheduleAppend(epoch, Arrays.asList("foo", "bar")); + List<String> records = Arrays.asList("foo", "bar"); + long nextFetchOffset = fetchOffset + records.size(); + context.client.scheduleAppend(epoch, records); context.client.poll(); + context.deliverRequest(context.describeQuorumRequest()); + context.pollUntilResponse(); + expectedVoterStates.get(0) + .setLogEndOffset(nextFetchOffset) + .setLastFetchTimestamp(context.time.milliseconds()) + .setLastCaughtUpTimestamp(context.time.milliseconds()); + expectedVoterStates.get(1) + .setLogEndOffset(fetchOffset) + .setLastFetchTimestamp(followerFetchTime1) + .setLastCaughtUpTimestamp(followerFetchTime1); + context.assertSentDescribeQuorumResponse(localId, epoch, expectedHW, expectedVoterStates, Collections.emptyList()); + + // After follower2 catches up to leader context.time.sleep(100); - long closeFollowerFetchTime = context.time.milliseconds(); - context.deliverRequest(context.fetchRequest(epoch, closeFollower, 3L, epoch, 0)); + long followerFetchTime2 = context.time.milliseconds(); + context.deliverRequest(context.fetchRequest(epoch, follower2, nextFetchOffset, epoch, 0)); context.pollUntilResponse(); - context.assertSentFetchPartitionResponse(3L, epoch); + expectedHW = nextFetchOffset; + context.assertSentFetchPartitionResponse(expectedHW, epoch); + + context.time.sleep(100); + context.deliverRequest(context.describeQuorumRequest()); + context.pollUntilResponse(); + + expectedVoterStates.get(0) + .setLastFetchTimestamp(context.time.milliseconds()) + .setLastCaughtUpTimestamp(context.time.milliseconds()); + expectedVoterStates.get(2) + .setLogEndOffset(nextFetchOffset) + .setLastFetchTimestamp(followerFetchTime2) + .setLastCaughtUpTimestamp(followerFetchTime2); + context.assertSentDescribeQuorumResponse(localId, epoch, expectedHW, expectedVoterStates, Collections.emptyList()); + + // Describe quorum returns error if leader loses leadership + context.time.sleep(context.checkQuorumTimeoutMs); + context.deliverRequest(context.describeQuorumRequest()); + context.pollUntilResponse(); + expectedVoterStates.get(0) + .setLastFetchTimestamp(context.time.milliseconds()) + .setLastCaughtUpTimestamp(context.time.milliseconds()); + context.assertSentDescribeQuorumResponse(0, 0, 0, Collections.emptyList(), Collections.emptyList(), Errors.NOT_LEADER_OR_FOLLOWER); + } + + @ParameterizedTest + @ValueSource(booleans = { true, false }) + public void testDescribeQuorumWithObserver(boolean withKip853Rpc) throws Exception { + int localId = 0; + ReplicaKey local = replicaKey(localId, true); + Uuid localDirectoryId = local.directoryId().get(); + ReplicaKey follower = replicaKey(1, withKip853Rpc); + Uuid followerDirectoryId = withKip853Rpc ? follower.directoryId().get() : ReplicaKey.NO_DIRECTORY_ID; + Set<Integer> voters = Utils.mkSet(localId, follower.id()); + VoterSet voterSet = VoterSetTest.voterSet(Stream.of(local, follower)); + + RaftClientTestContext.Builder builder = new RaftClientTestContext.Builder(localId, localDirectoryId) + .withStaticVoters(voters) + .withKip853Rpc(withKip853Rpc); + + if (withKip853Rpc) { + builder.withBootstrapSnapshot(Optional.of(voterSet)); + } + RaftClientTestContext context = builder.build(); + + context.becomeLeader(); + int epoch = context.currentEpoch(); + + // Update HW to non-initial value + context.time.sleep(100); + // The leader will write bootstrap snapshot records (kraft version and voters) to the log if withKip853Rpc + long fetchOffset = withKip853Rpc ? 3L : 1L; + long followerFetchTime = context.time.milliseconds(); + context.deliverRequest(context.fetchRequest(1, follower, fetchOffset, epoch, 0)); + context.pollUntilResponse(); + long expectedHW = fetchOffset; + context.assertSentFetchPartitionResponse(expectedHW, epoch); // Create observer - ReplicaKey observerId = replicaKey(localId + 3, withKip853Rpc); + ReplicaKey observerId = replicaKey(3, withKip853Rpc); + Uuid observerDirectoryId = withKip853Rpc ? observerId.directoryId().get() : ReplicaKey.NO_DIRECTORY_ID; context.time.sleep(100); long observerFetchTime = context.time.milliseconds(); context.deliverRequest(context.fetchRequest(epoch, observerId, 0L, 0, 0)); context.pollUntilResponse(); - context.assertSentFetchPartitionResponse(3L, epoch); + context.assertSentFetchPartitionResponse(expectedHW, epoch); + + context.time.sleep(100); + context.deliverRequest(context.describeQuorumRequest()); + context.pollUntilResponse(); + + List<ReplicaState> expectedVoterStates = Arrays.asList( + new ReplicaState() + .setReplicaId(localId) + .setReplicaDirectoryId(withKip853Rpc ? localDirectoryId : ReplicaKey.NO_DIRECTORY_ID) + // As we are appending the records directly to the log, + // the leader end offset hasn't been updated yet. + .setLogEndOffset(fetchOffset) + .setLastFetchTimestamp(context.time.milliseconds()) + .setLastCaughtUpTimestamp(context.time.milliseconds()), + new ReplicaState() + .setReplicaId(follower.id()) + .setReplicaDirectoryId(followerDirectoryId) + .setLogEndOffset(fetchOffset) + .setLastFetchTimestamp(followerFetchTime) + .setLastCaughtUpTimestamp(followerFetchTime)); + List<ReplicaState> expectedObserverStates = singletonList( + new ReplicaState() + .setReplicaId(observerId.id()) + .setReplicaDirectoryId(observerDirectoryId) + .setLogEndOffset(0L) + .setLastFetchTimestamp(observerFetchTime) + .setLastCaughtUpTimestamp(-1L)); + context.assertSentDescribeQuorumResponse(localId, epoch, expectedHW, expectedVoterStates, expectedObserverStates); + + // Update observer fetch state + context.time.sleep(100); + observerFetchTime = context.time.milliseconds(); + context.deliverRequest(context.fetchRequest(epoch, observerId, fetchOffset, epoch, 0)); + context.pollUntilResponse(); + context.assertSentFetchPartitionResponse(expectedHW, epoch); + + context.time.sleep(100); + context.deliverRequest(context.describeQuorumRequest()); + context.pollUntilResponse(); + + expectedVoterStates.get(0) + .setLastFetchTimestamp(context.time.milliseconds()) + .setLastCaughtUpTimestamp(context.time.milliseconds()); + expectedObserverStates.get(0) + .setLogEndOffset(fetchOffset) + .setLastFetchTimestamp(observerFetchTime) + .setLastCaughtUpTimestamp(observerFetchTime); + context.assertSentDescribeQuorumResponse(localId, epoch, expectedHW, expectedVoterStates, expectedObserverStates); + + // Observer falls behind + context.time.sleep(100); + List<String> records = Arrays.asList("foo", "bar"); + context.client.scheduleAppend(epoch, records); + context.client.poll(); + + context.deliverRequest(context.describeQuorumRequest()); + context.pollUntilResponse(); + + expectedVoterStates.get(0) + .setLogEndOffset(fetchOffset + records.size()) + .setLastFetchTimestamp(context.time.milliseconds()) + .setLastCaughtUpTimestamp(context.time.milliseconds()); + context.assertSentDescribeQuorumResponse(localId, epoch, expectedHW, expectedVoterStates, expectedObserverStates); + + // Observer is removed due to inactivity + long timeToSleep = LeaderState.OBSERVER_SESSION_TIMEOUT_MS; + while (timeToSleep > 0) { + // Follower needs to continue polling to keep leader alive + followerFetchTime = context.time.milliseconds(); + context.deliverRequest(context.fetchRequest(epoch, follower, fetchOffset, epoch, 0)); + context.pollUntilResponse(); + context.assertSentFetchPartitionResponse(expectedHW, epoch); + + context.time.sleep(context.checkQuorumTimeoutMs - 1); + timeToSleep = timeToSleep - (context.checkQuorumTimeoutMs - 1); + } + context.deliverRequest(context.describeQuorumRequest()); + context.pollUntilResponse(); + + expectedVoterStates.get(0) + .setLastFetchTimestamp(context.time.milliseconds()) + .setLastCaughtUpTimestamp(context.time.milliseconds()); + expectedVoterStates.get(1) + .setLastFetchTimestamp(followerFetchTime); + context.assertSentDescribeQuorumResponse(localId, epoch, expectedHW, expectedVoterStates, Collections.emptyList()); + + // No-op for negative node id + context.deliverRequest(context.fetchRequest(epoch, ReplicaKey.of(-1, ReplicaKey.NO_DIRECTORY_ID), 0L, 0, 0)); + context.pollUntilResponse(); + context.assertSentFetchPartitionResponse(expectedHW, epoch); + context.deliverRequest(context.describeQuorumRequest()); + context.pollUntilResponse(); + + expectedVoterStates.get(0) + .setLastFetchTimestamp(context.time.milliseconds()) + .setLastCaughtUpTimestamp(context.time.milliseconds()); + context.assertSentDescribeQuorumResponse(localId, epoch, expectedHW, expectedVoterStates, Collections.emptyList()); + } + + @Test + public void testDescribeQuorumNonMonotonicFollowerFetch() throws Exception { Review Comment: For some of these tests it looks like you want the following CSV source: ``` withKip853Rpc,withBootstrapCheckpoint false,false true,true true,false ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org