ahuang98 commented on code in PR #16637:
URL: https://github.com/apache/kafka/pull/16637#discussion_r1690605706


##########
raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java:
##########
@@ -2762,79 +2764,357 @@ public void testDescribeQuorumNonLeader(boolean 
withKip853Rpc) throws Exception
         assertEquals(Errors.NOT_LEADER_OR_FOLLOWER.message(), 
partitionData.errorMessage());
     }
 
+    @Test
+    public void testDescribeQuorumWithOnlyStaticVoters() throws Exception {
+        int localId = 0;
+        ReplicaKey local = replicaKey(localId, true);
+        ReplicaKey follower1 = replicaKey(1, true);
+        Set<Integer> voters = Utils.mkSet(localId, follower1.id());
+
+        RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, local.directoryId().get())
+            .withStaticVoters(voters)
+            .withKip853Rpc(true)
+            .build();
+
+        context.becomeLeader();
+        int epoch = context.currentEpoch();
+
+        // Describe quorum response will not include directory ids
+        context.deliverRequest(context.describeQuorumRequest());
+        context.pollUntilResponse();
+        List<ReplicaState> expectedVoterStates = Arrays.asList(
+            new ReplicaState()
+                .setReplicaId(localId)
+                .setReplicaDirectoryId(ReplicaKey.NO_DIRECTORY_ID)
+                .setLogEndOffset(1L)
+                .setLastFetchTimestamp(context.time.milliseconds())
+                .setLastCaughtUpTimestamp(context.time.milliseconds()),
+            new ReplicaState()
+                .setReplicaId(follower1.id())
+                .setReplicaDirectoryId(ReplicaKey.NO_DIRECTORY_ID)
+                .setLogEndOffset(-1L)
+                .setLastFetchTimestamp(-1)
+                .setLastCaughtUpTimestamp(-1));
+        context.assertSentDescribeQuorumResponse(localId, epoch, -1L, 
expectedVoterStates, Collections.emptyList());
+    }
+
+
     @ParameterizedTest
     @ValueSource(booleans = { true, false })
-    public void testDescribeQuorum(boolean withKip853Rpc) throws Exception {
-        int localId = randomReplicaId();
-        ReplicaKey closeFollower = replicaKey(localId + 2, withKip853Rpc);
-        ReplicaKey laggingFollower = replicaKey(localId + 1, withKip853Rpc);
-        Set<Integer> voters = Utils.mkSet(localId, closeFollower.id(), 
laggingFollower.id());
-
-        RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
-            .withKip853Rpc(withKip853Rpc)
-            .build();
+    public void testDescribeQuorumWithFollowers(boolean withKip853Rpc) throws 
Exception {
+        int localId = 0;
+        ReplicaKey local = replicaKey(localId, true);
+        Uuid localDirectoryId = local.directoryId().get();
+        ReplicaKey follower1 = replicaKey(1, true);
+        Uuid followerDirectoryId1 = follower1.directoryId().get();
+        ReplicaKey follower2 = replicaKey(2, false);
+        Set<Integer> voters = Utils.mkSet(localId, follower1.id(), 
follower2.id());
+        VoterSet voterSet = VoterSetTest.voterSet(Stream.of(local, follower1, 
follower2));
+
+        RaftClientTestContext.Builder builder = new 
RaftClientTestContext.Builder(localId, localDirectoryId)
+            .withStaticVoters(voters)
+            .withKip853Rpc(withKip853Rpc);
+
+        if (withKip853Rpc) {
+            builder.withBootstrapSnapshot(Optional.of(voterSet));
+        }
+        RaftClientTestContext context = builder.build();
 
         context.becomeLeader();
         int epoch = context.currentEpoch();
 
-        long laggingFollowerFetchTime = context.time.milliseconds();
-        context.deliverRequest(context.fetchRequest(1, laggingFollower, 1L, 
epoch, 0));
+        // Describe quorum response before any fetches made
+        context.deliverRequest(context.describeQuorumRequest());
         context.pollUntilResponse();
-        context.assertSentFetchPartitionResponse(1L, epoch);
+        List<ReplicaState> expectedVoterStates = Arrays.asList(
+            new ReplicaState()
+                .setReplicaId(localId)
+                .setReplicaDirectoryId(withKip853Rpc ? localDirectoryId : 
ReplicaKey.NO_DIRECTORY_ID)
+                // the leader will write bootstrap snapshot records (kraft 
version and voters) to the log if withKip853Rpc
+                .setLogEndOffset(withKip853Rpc ? 3L : 1L)
+                .setLastFetchTimestamp(context.time.milliseconds())
+                .setLastCaughtUpTimestamp(context.time.milliseconds()),
+            new ReplicaState()
+                .setReplicaId(follower1.id())
+                .setReplicaDirectoryId(withKip853Rpc ? followerDirectoryId1 : 
ReplicaKey.NO_DIRECTORY_ID)
+                .setLogEndOffset(-1L)
+                .setLastFetchTimestamp(-1)
+                .setLastCaughtUpTimestamp(-1),
+            new ReplicaState()
+                .setReplicaId(follower2.id())
+                .setReplicaDirectoryId(ReplicaKey.NO_DIRECTORY_ID)
+                .setLogEndOffset(-1L)
+                .setLastFetchTimestamp(-1)
+                .setLastCaughtUpTimestamp(-1));
+        context.assertSentDescribeQuorumResponse(localId, epoch, -1L, 
expectedVoterStates, Collections.emptyList());
+
+        // After follower1 makes progress but both followers are not caught up
+        context.time.sleep(100);
+        // withKip853Rpc leader will write bootstrap snapshot records (kraft 
version and voters) to the log
+        long fetchOffset = withKip853Rpc ? 3L : 1L;
+        long followerFetchTime1 = context.time.milliseconds();
+        context.deliverRequest(context.fetchRequest(1, follower1, fetchOffset, 
epoch, 0));
+        context.pollUntilResponse();
+        long expectedHW = fetchOffset;
+        context.assertSentFetchPartitionResponse(expectedHW, epoch);
 
-        context.client.scheduleAppend(epoch, Arrays.asList("foo", "bar"));
+        List<String> records = Arrays.asList("foo", "bar");
+        long nextFetchOffset = fetchOffset + records.size();
+        context.client.scheduleAppend(epoch, records);
         context.client.poll();
+        context.deliverRequest(context.describeQuorumRequest());
+        context.pollUntilResponse();
 
+        expectedVoterStates.get(0)
+            .setLogEndOffset(nextFetchOffset)
+            .setLastFetchTimestamp(context.time.milliseconds())
+            .setLastCaughtUpTimestamp(context.time.milliseconds());
+        expectedVoterStates.get(1)
+            .setLogEndOffset(fetchOffset)
+            .setLastFetchTimestamp(followerFetchTime1)
+            .setLastCaughtUpTimestamp(followerFetchTime1);

Review Comment:
   opted for adding a sleep



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to