ahuang98 commented on code in PR #16637:
URL: https://github.com/apache/kafka/pull/16637#discussion_r1693351547


##########
raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java:
##########
@@ -2764,77 +2766,414 @@ public void testDescribeQuorumNonLeader(boolean 
withKip853Rpc) throws Exception
 
     @ParameterizedTest
     @ValueSource(booleans = { true, false })
-    public void testDescribeQuorum(boolean withKip853Rpc) throws Exception {
+    public void testDescribeQuorumWithOnlyStaticVoters(boolean withKip853Rpc) 
throws Exception {
         int localId = randomReplicaId();
-        ReplicaKey closeFollower = replicaKey(localId + 2, withKip853Rpc);
-        ReplicaKey laggingFollower = replicaKey(localId + 1, withKip853Rpc);
-        Set<Integer> voters = Utils.mkSet(localId, closeFollower.id(), 
laggingFollower.id());
+        ReplicaKey local = replicaKey(localId, true);
+        ReplicaKey follower1 = replicaKey(localId + 1, true);
+        Set<Integer> voters = Utils.mkSet(localId, follower1.id());
 
-        RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
+        RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, local.directoryId().get())
+            .withStaticVoters(voters)
             .withKip853Rpc(withKip853Rpc)
             .build();
 
         context.becomeLeader();
         int epoch = context.currentEpoch();
 
-        long laggingFollowerFetchTime = context.time.milliseconds();
-        context.deliverRequest(context.fetchRequest(1, laggingFollower, 1L, 
epoch, 0));
+        // Describe quorum response will not include directory ids
+        context.deliverRequest(context.describeQuorumRequest());
         context.pollUntilResponse();
-        context.assertSentFetchPartitionResponse(1L, epoch);
+        List<ReplicaState> expectedVoterStates = Arrays.asList(
+            new ReplicaState()
+                .setReplicaId(localId)
+                .setReplicaDirectoryId(ReplicaKey.NO_DIRECTORY_ID)
+                .setLogEndOffset(1L)
+                .setLastFetchTimestamp(context.time.milliseconds())
+                .setLastCaughtUpTimestamp(context.time.milliseconds()),
+            new ReplicaState()
+                .setReplicaId(follower1.id())
+                .setReplicaDirectoryId(ReplicaKey.NO_DIRECTORY_ID)
+                .setLogEndOffset(-1L)
+                .setLastFetchTimestamp(-1)
+                .setLastCaughtUpTimestamp(-1));
+        context.assertSentDescribeQuorumResponse(localId, epoch, -1L, 
expectedVoterStates, Collections.emptyList());
+    }
 
-        context.client.scheduleAppend(epoch, Arrays.asList("foo", "bar"));
+    @ParameterizedTest
+    @CsvSource({ "true, true", "true, false", "false, false" })
+    public void testDescribeQuorumWithFollowers(boolean withKip853Rpc, boolean 
withBootstrapSnapshot) throws Exception {
+        int localId = randomReplicaId();
+        int followerId1 = localId + 1;
+        int followerId2 = localId + 2;
+        ReplicaKey local = replicaKey(localId, withBootstrapSnapshot);
+        // local directory id must exist
+        Uuid localDirectoryId = local.directoryId().orElse(Uuid.randomUuid());
+        ReplicaKey bootstrapFollower1 = replicaKey(followerId1, 
withBootstrapSnapshot);
+        // if withBootstrapSnapshot is false, directory ids are still needed 
by the static voter set
+        Uuid followerDirectoryId1 = 
bootstrapFollower1.directoryId().orElse(withKip853Rpc ? Uuid.randomUuid() : 
ReplicaKey.NO_DIRECTORY_ID);
+        ReplicaKey follower1 = ReplicaKey.of(followerId1, 
followerDirectoryId1);
+        ReplicaKey bootstrapFollower2 = replicaKey(followerId2, 
withBootstrapSnapshot);
+        Uuid followerDirectoryId2 = 
bootstrapFollower2.directoryId().orElse(withKip853Rpc ? Uuid.randomUuid() : 
ReplicaKey.NO_DIRECTORY_ID);
+        ReplicaKey follower2 = ReplicaKey.of(followerId2, 
followerDirectoryId2);
+        VoterSet bootstrapVoterSet = VoterSetTest.voterSet(Stream.of(local, 
bootstrapFollower1, bootstrapFollower2));
+        VoterSet staticVoterSet = VoterSetTest.voterSet(Stream.of(local, 
follower1, follower2));
+
+        RaftClientTestContext.Builder builder = new 
RaftClientTestContext.Builder(localId, localDirectoryId)
+            .withKip853Rpc(withKip853Rpc);
+
+        if (withBootstrapSnapshot) {
+            builder.withBootstrapSnapshot(Optional.of(bootstrapVoterSet));
+        } else {
+            builder.withStaticVoters(staticVoterSet);
+        }
+        RaftClientTestContext context = builder.build();
+
+        context.becomeLeader();
+        int epoch = context.currentEpoch();
+
+        // Describe quorum response before any fetches made
+        context.deliverRequest(context.describeQuorumRequest());
+        context.pollUntilResponse();
+        List<ReplicaState> expectedVoterStates = Arrays.asList(
+            new ReplicaState()
+                .setReplicaId(localId)
+                .setReplicaDirectoryId(withBootstrapSnapshot ? 
context.localReplicaKey().directoryId().get() : ReplicaKey.NO_DIRECTORY_ID)
+                .setLogEndOffset(withBootstrapSnapshot ? 3L : 1L)
+                .setLastFetchTimestamp(context.time.milliseconds())
+                .setLastCaughtUpTimestamp(context.time.milliseconds()),
+            new ReplicaState()
+                .setReplicaId(followerId1)
+                .setReplicaDirectoryId(withBootstrapSnapshot ? 
follower1.directoryId().get() : ReplicaKey.NO_DIRECTORY_ID)
+                .setLogEndOffset(-1L)
+                .setLastFetchTimestamp(-1)
+                .setLastCaughtUpTimestamp(-1),
+            new ReplicaState()
+                .setReplicaId(followerId2)
+                .setReplicaDirectoryId(withBootstrapSnapshot ? 
follower2.directoryId().get() : ReplicaKey.NO_DIRECTORY_ID)
+                .setLogEndOffset(-1L)
+                .setLastFetchTimestamp(-1)
+                .setLastCaughtUpTimestamp(-1));
+        context.assertSentDescribeQuorumResponse(localId, epoch, -1L, 
expectedVoterStates, Collections.emptyList());
+
+        // After follower1 makes progress but both followers are not caught up
+        context.time.sleep(100);
+        long fetchOffset = withBootstrapSnapshot ? 3L : 1L;
+        long followerFetchTime1 = context.time.milliseconds();
+        context.deliverRequest(context.fetchRequest(1, follower1, fetchOffset, 
epoch, 0));
+        context.pollUntilResponse();
+        long expectedHW = fetchOffset;
+        context.assertSentFetchPartitionResponse(expectedHW, epoch);
+
+        List<String> records = Arrays.asList("foo", "bar");
+        long nextFetchOffset = fetchOffset + records.size();
+        context.client.scheduleAppend(epoch, records);
         context.client.poll();
 
         context.time.sleep(100);
-        long closeFollowerFetchTime = context.time.milliseconds();
-        context.deliverRequest(context.fetchRequest(epoch, closeFollower, 3L, 
epoch, 0));
+        context.deliverRequest(context.describeQuorumRequest());
         context.pollUntilResponse();
-        context.assertSentFetchPartitionResponse(3L, epoch);
+
+        expectedVoterStates.get(0)
+            .setLogEndOffset(nextFetchOffset)
+            .setLastFetchTimestamp(context.time.milliseconds())
+            .setLastCaughtUpTimestamp(context.time.milliseconds());
+        expectedVoterStates.get(1)
+            .setLogEndOffset(fetchOffset)
+            .setLastFetchTimestamp(followerFetchTime1)
+            .setLastCaughtUpTimestamp(followerFetchTime1);
+        context.assertSentDescribeQuorumResponse(localId, epoch, expectedHW, 
expectedVoterStates, Collections.emptyList());
+
+        // After follower2 catches up to leader
+        context.time.sleep(100);
+        long followerFetchTime2 = context.time.milliseconds();
+        context.deliverRequest(context.fetchRequest(epoch, follower2, 
nextFetchOffset, epoch, 0));
+        context.pollUntilResponse();
+        expectedHW = nextFetchOffset;
+        context.assertSentFetchPartitionResponse(expectedHW, epoch);
+
+        context.time.sleep(100);
+        context.deliverRequest(context.describeQuorumRequest());
+        context.pollUntilResponse();
+
+        expectedVoterStates.get(0)
+            .setLastFetchTimestamp(context.time.milliseconds())
+            .setLastCaughtUpTimestamp(context.time.milliseconds());
+        expectedVoterStates.get(2)
+            .setLogEndOffset(nextFetchOffset)
+            .setLastFetchTimestamp(followerFetchTime2)
+            .setLastCaughtUpTimestamp(followerFetchTime2);
+        context.assertSentDescribeQuorumResponse(localId, epoch, expectedHW, 
expectedVoterStates, Collections.emptyList());
+
+        // Describe quorum returns error if leader loses leadership
+        context.time.sleep(context.checkQuorumTimeoutMs);
+        context.deliverRequest(context.describeQuorumRequest());
+        context.pollUntilResponse();
+        
context.assertSentDescribeQuorumResponse(Errors.NOT_LEADER_OR_FOLLOWER, 0, 0, 
0, Collections.emptyList(), Collections.emptyList());
+    }
+
+    @ParameterizedTest
+    @CsvSource({ "true, true", "true, false", "false, false" })
+    public void testDescribeQuorumWithObserver(boolean withKip853Rpc, boolean 
withBootstrapSnapshot) throws Exception {
+        int localId = randomReplicaId();
+        int followerId = localId + 1;
+        ReplicaKey local = replicaKey(localId, withBootstrapSnapshot);
+        Uuid localDirectoryId = local.directoryId().orElse(Uuid.randomUuid());
+        ReplicaKey bootstrapFollower = replicaKey(followerId, 
withBootstrapSnapshot);
+        Uuid followerDirectoryId = 
bootstrapFollower.directoryId().orElse(withKip853Rpc ? Uuid.randomUuid() : 
ReplicaKey.NO_DIRECTORY_ID);
+        ReplicaKey follower = ReplicaKey.of(followerId, followerDirectoryId);
+        VoterSet bootstrapVoterSet = VoterSetTest.voterSet(Stream.of(local, 
bootstrapFollower));
+        VoterSet staticVoterSet = VoterSetTest.voterSet(Stream.of(local, 
follower));
+
+        RaftClientTestContext.Builder builder = new 
RaftClientTestContext.Builder(localId, localDirectoryId)
+            .withKip853Rpc(withKip853Rpc);
+
+        if (withBootstrapSnapshot) {
+            builder.withBootstrapSnapshot(Optional.of(bootstrapVoterSet));
+        } else {
+            builder.withStaticVoters(staticVoterSet);
+        }
+        RaftClientTestContext context = builder.build();
+
+        context.becomeLeader();
+        int epoch = context.currentEpoch();
+
+        // Update HW to non-initial value
+        context.time.sleep(100);
+        long fetchOffset = withBootstrapSnapshot ? 3L : 1L;
+        long followerFetchTime = context.time.milliseconds();
+        context.deliverRequest(context.fetchRequest(1, follower, fetchOffset, 
epoch, 0));
+        context.pollUntilResponse();
+        long expectedHW = fetchOffset;
+        context.assertSentFetchPartitionResponse(expectedHW, epoch);
 
         // Create observer
-        ReplicaKey observerId = replicaKey(localId + 3, withKip853Rpc);
+        ReplicaKey observer = replicaKey(localId + 2, withKip853Rpc);
+        Uuid observerDirectoryId = 
observer.directoryId().orElse(ReplicaKey.NO_DIRECTORY_ID);
         context.time.sleep(100);
         long observerFetchTime = context.time.milliseconds();
-        context.deliverRequest(context.fetchRequest(epoch, observerId, 0L, 0, 
0));
+        context.deliverRequest(context.fetchRequest(epoch, observer, 0L, 0, 
0));
         context.pollUntilResponse();
-        context.assertSentFetchPartitionResponse(3L, epoch);
+        context.assertSentFetchPartitionResponse(expectedHW, epoch);
 
         context.time.sleep(100);
         context.deliverRequest(context.describeQuorumRequest());
         context.pollUntilResponse();
 
-        // KAFKA-16953 will add support for including the directory id
-        context.assertSentDescribeQuorumResponse(localId, epoch, 3L,
-            Arrays.asList(
-                new ReplicaState()
-                    .setReplicaId(localId)
-                    .setReplicaDirectoryId(ReplicaKey.NO_DIRECTORY_ID)
-                    // As we are appending the records directly to the log,
-                    // the leader end offset hasn't been updated yet.
-                    .setLogEndOffset(3L)
-                    .setLastFetchTimestamp(context.time.milliseconds())
-                    .setLastCaughtUpTimestamp(context.time.milliseconds()),
-                new ReplicaState()
-                    .setReplicaId(laggingFollower.id())
-                    .setReplicaDirectoryId(ReplicaKey.NO_DIRECTORY_ID)
-                    .setLogEndOffset(1L)
-                    .setLastFetchTimestamp(laggingFollowerFetchTime)
-                    .setLastCaughtUpTimestamp(laggingFollowerFetchTime),
-                new ReplicaState()
-                    .setReplicaId(closeFollower.id())
-                    .setReplicaDirectoryId(ReplicaKey.NO_DIRECTORY_ID)
-                    .setLogEndOffset(3L)
-                    .setLastFetchTimestamp(closeFollowerFetchTime)
-                    .setLastCaughtUpTimestamp(closeFollowerFetchTime)),
-            singletonList(
-                new ReplicaState()
-                    .setReplicaId(observerId.id())
-                    .setReplicaDirectoryId(ReplicaKey.NO_DIRECTORY_ID)
-                    .setLogEndOffset(0L)
-                    .setLastFetchTimestamp(observerFetchTime)
-                    .setLastCaughtUpTimestamp(-1L)));
+        List<ReplicaState> expectedVoterStates = Arrays.asList(
+            new ReplicaState()
+                .setReplicaId(localId)
+                .setReplicaDirectoryId(withBootstrapSnapshot ? 
localDirectoryId : ReplicaKey.NO_DIRECTORY_ID)
+                // As we are appending the records directly to the log,
+                // the leader end offset hasn't been updated yet.
+                .setLogEndOffset(fetchOffset)
+                .setLastFetchTimestamp(context.time.milliseconds())
+                .setLastCaughtUpTimestamp(context.time.milliseconds()),
+            new ReplicaState()
+                .setReplicaId(follower.id())
+                .setReplicaDirectoryId(withBootstrapSnapshot ? 
followerDirectoryId : ReplicaKey.NO_DIRECTORY_ID)
+                .setLogEndOffset(fetchOffset)
+                .setLastFetchTimestamp(followerFetchTime)
+                .setLastCaughtUpTimestamp(followerFetchTime));
+        List<ReplicaState> expectedObserverStates = singletonList(
+            new ReplicaState()
+                .setReplicaId(observer.id())
+                .setReplicaDirectoryId(observerDirectoryId)
+                .setLogEndOffset(0L)
+                .setLastFetchTimestamp(observerFetchTime)
+                .setLastCaughtUpTimestamp(-1L));
+        context.assertSentDescribeQuorumResponse(localId, epoch, expectedHW, 
expectedVoterStates, expectedObserverStates);
+
+        // Update observer fetch state
+        context.time.sleep(100);
+        observerFetchTime = context.time.milliseconds();
+        context.deliverRequest(context.fetchRequest(epoch, observer, 
fetchOffset, epoch, 0));
+        context.pollUntilResponse();
+        context.assertSentFetchPartitionResponse(expectedHW, epoch);
+
+        context.time.sleep(100);
+        context.deliverRequest(context.describeQuorumRequest());
+        context.pollUntilResponse();
+
+        expectedVoterStates.get(0)
+            .setLastFetchTimestamp(context.time.milliseconds())
+            .setLastCaughtUpTimestamp(context.time.milliseconds());
+        expectedObserverStates.get(0)
+            .setLogEndOffset(fetchOffset)
+            .setLastFetchTimestamp(observerFetchTime)
+            .setLastCaughtUpTimestamp(observerFetchTime);
+        context.assertSentDescribeQuorumResponse(localId, epoch, expectedHW, 
expectedVoterStates, expectedObserverStates);
+
+        // Observer falls behind
+        context.time.sleep(100);
+        List<String> records = Arrays.asList("foo", "bar");
+        context.client.scheduleAppend(epoch, records);
+        context.client.poll();
+
+        context.deliverRequest(context.describeQuorumRequest());
+        context.pollUntilResponse();
+
+        expectedVoterStates.get(0)
+            .setLogEndOffset(fetchOffset + records.size())
+            .setLastFetchTimestamp(context.time.milliseconds())
+            .setLastCaughtUpTimestamp(context.time.milliseconds());
+        context.assertSentDescribeQuorumResponse(localId, epoch, expectedHW, 
expectedVoterStates, expectedObserverStates);
+
+        // Observer is removed due to inactivity
+        long timeToSleep = LeaderState.OBSERVER_SESSION_TIMEOUT_MS;
+        while (timeToSleep > 0) {
+            // Follower needs to continue polling to keep leader alive
+            followerFetchTime = context.time.milliseconds();
+            context.deliverRequest(context.fetchRequest(epoch, follower, 
fetchOffset, epoch, 0));
+            context.pollUntilResponse();
+            context.assertSentFetchPartitionResponse(expectedHW, epoch);
+
+            context.time.sleep(context.checkQuorumTimeoutMs - 1);
+            timeToSleep = timeToSleep - (context.checkQuorumTimeoutMs - 1);
+        }
+        context.deliverRequest(context.describeQuorumRequest());
+        context.pollUntilResponse();
+
+        expectedVoterStates.get(0)
+            .setLastFetchTimestamp(context.time.milliseconds())
+            .setLastCaughtUpTimestamp(context.time.milliseconds());
+        expectedVoterStates.get(1)
+            .setLastFetchTimestamp(followerFetchTime);
+        context.assertSentDescribeQuorumResponse(localId, epoch, expectedHW, 
expectedVoterStates, Collections.emptyList());
+
+        // No-op for negative node id
+        context.deliverRequest(context.fetchRequest(epoch, ReplicaKey.of(-1, 
ReplicaKey.NO_DIRECTORY_ID), 0L, 0, 0));
+        context.pollUntilResponse();
+        context.assertSentFetchPartitionResponse(expectedHW, epoch);
+        context.deliverRequest(context.describeQuorumRequest());
+        context.pollUntilResponse();
+
+        expectedVoterStates.get(0)
+            .setLastFetchTimestamp(context.time.milliseconds())
+            .setLastCaughtUpTimestamp(context.time.milliseconds());
+        context.assertSentDescribeQuorumResponse(localId, epoch, expectedHW, 
expectedVoterStates, Collections.emptyList());
     }
 
+    @ParameterizedTest
+    @CsvSource({ "true, true", "true, false", "false, false" })
+    public void testDescribeQuorumNonMonotonicFollowerFetch(boolean 
withKip853Rpc, boolean withBootstrapSnapshot) throws Exception {
+        int localId = randomReplicaId();
+        ReplicaKey local = replicaKey(localId, withBootstrapSnapshot);
+        Uuid localDirectoryId = local.directoryId().orElse(Uuid.randomUuid());
+        int followerId = localId + 1;
+        ReplicaKey bootstrapFollower = replicaKey(followerId, 
withBootstrapSnapshot);
+        Uuid followerDirectoryId = 
bootstrapFollower.directoryId().orElse(withKip853Rpc ? Uuid.randomUuid() : 
ReplicaKey.NO_DIRECTORY_ID);
+        ReplicaKey follower = ReplicaKey.of(followerId, followerDirectoryId);
+        VoterSet bootstrapVoterSet = VoterSetTest.voterSet(Stream.of(local, 
bootstrapFollower));
+        VoterSet staticVoterSet = VoterSetTest.voterSet(Stream.of(local, 
follower));
+
+        RaftClientTestContext.Builder builder = new 
RaftClientTestContext.Builder(localId, localDirectoryId)
+            .withKip853Rpc(withKip853Rpc);
+        if (withBootstrapSnapshot) {
+            builder.withBootstrapSnapshot(Optional.of(bootstrapVoterSet));
+        } else {
+            builder.withStaticVoters(staticVoterSet);
+        }
+        RaftClientTestContext context = builder.build();
+
+        context.becomeLeader();
+        int epoch = context.currentEpoch();
+
+        // Update HW to non-initial value
+        context.time.sleep(100);
+        List<String> batch = Arrays.asList("foo", "bar");
+        context.client.scheduleAppend(epoch, batch);
+        context.client.poll();
+        long fetchOffset = withBootstrapSnapshot ? 5L : 3L;
+        long followerFetchTime = context.time.milliseconds();
+        context.deliverRequest(context.fetchRequest(epoch, follower, 
fetchOffset, epoch, 0));
+        context.pollUntilResponse();
+        long expectedHW = fetchOffset;
+        context.assertSentFetchPartitionResponse(expectedHW, epoch);
+
+        context.time.sleep(100);
+        context.deliverRequest(context.describeQuorumRequest());
+        context.pollUntilResponse();
+        List<ReplicaState> expectedVoterStates = Arrays.asList(
+            new ReplicaState()
+                .setReplicaId(localId)
+                .setReplicaDirectoryId(withBootstrapSnapshot ? 
local.directoryId().get() : ReplicaKey.NO_DIRECTORY_ID)
+                .setLogEndOffset(fetchOffset)
+                .setLastFetchTimestamp(context.time.milliseconds())
+                .setLastCaughtUpTimestamp(context.time.milliseconds()),
+            new ReplicaState()
+                .setReplicaId(follower.id())
+                .setReplicaDirectoryId(withBootstrapSnapshot ? 
follower.directoryId().get() : ReplicaKey.NO_DIRECTORY_ID)
+                .setLogEndOffset(fetchOffset)
+                .setLastFetchTimestamp(followerFetchTime)
+                .setLastCaughtUpTimestamp(followerFetchTime));
+        context.assertSentDescribeQuorumResponse(localId, epoch, expectedHW, 
expectedVoterStates, Collections.emptyList());
+
+        // Follower crashes and disk is lost. It fetches an earlier offset to 
rebuild state.
+        // The leader will report an error in the logs, but will not let the 
high watermark rewind
+        context.time.sleep(100);
+        followerFetchTime = context.time.milliseconds();
+        context.deliverRequest(context.fetchRequest(epoch, follower, 
fetchOffset - 1, epoch, 0));
+        context.pollUntilResponse();
+        context.assertSentFetchPartitionResponse(expectedHW, epoch);
+        context.time.sleep(100);
+        context.deliverRequest(context.describeQuorumRequest());
+        context.pollUntilResponse();
+
+        expectedVoterStates.get(0)
+            .setLastFetchTimestamp(context.time.milliseconds())
+            .setLastCaughtUpTimestamp(context.time.milliseconds());
+        expectedVoterStates.get(1)
+            .setLogEndOffset(fetchOffset - batch.size())
+            .setLastFetchTimestamp(followerFetchTime);
+        context.assertSentDescribeQuorumResponse(localId, epoch, expectedHW, 
expectedVoterStates, Collections.emptyList());
+    }
+
+    @ParameterizedTest
+    @ValueSource(booleans = { true, false })
+    public void testStaticVotersIgnoredWithBootstrapSnapshot(boolean 
withKip853Rpc) throws Exception {
+        int localId = randomReplicaId();
+        ReplicaKey local = replicaKey(localId, true);
+        ReplicaKey follower = replicaKey(localId + 1, true);
+        ReplicaKey follower2 = replicaKey(localId + 2, true);
+        // only include one follower in static voter set
+        Set<Integer> staticVoters = Utils.mkSet(localId, follower.id());
+        VoterSet voterSet = VoterSetTest.voterSet(Stream.of(local, follower, 
follower2));
+
+        RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, local.directoryId().get())
+            .withStaticVoters(staticVoters)
+            .withKip853Rpc(withKip853Rpc)
+            .withBootstrapSnapshot(Optional.of(voterSet))
+            .build();
+
+        context.becomeLeader();
+        int epoch = context.currentEpoch();
+        // check describe quorum response has both followers
+        context.deliverRequest(context.describeQuorumRequest());
+        context.pollUntilResponse();
+        List<ReplicaState> expectedVoterStates = Arrays.asList(
+            new ReplicaState()
+                .setReplicaId(localId)
+                .setReplicaDirectoryId(withKip853Rpc ? 
local.directoryId().get() : ReplicaKey.NO_DIRECTORY_ID)
+                .setLogEndOffset(3L)
+                .setLastFetchTimestamp(context.time.milliseconds())
+                .setLastCaughtUpTimestamp(context.time.milliseconds()),
+            new ReplicaState()
+                .setReplicaId(follower.id())
+                .setReplicaDirectoryId(withKip853Rpc ? 
follower.directoryId().get() : ReplicaKey.NO_DIRECTORY_ID)
+                .setLogEndOffset(-1L)
+                .setLastFetchTimestamp(-1)
+                .setLastCaughtUpTimestamp(-1),
+            new ReplicaState()
+                .setReplicaId(follower2.id())
+                .setReplicaDirectoryId(withKip853Rpc ? 
follower2.directoryId().get() : ReplicaKey.NO_DIRECTORY_ID)
+                .setLogEndOffset(-1L)
+                .setLastFetchTimestamp(-1)
+                .setLastCaughtUpTimestamp(-1));
+        context.assertSentDescribeQuorumResponse(localId, epoch, -1L, 
expectedVoterStates, Collections.emptyList());
+    }
+
+    // After KAFKA-16535 we can test describe quorum output following voter 
removal and addition

Review Comment:
   will remove this comment.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to