showuon commented on code in PR #12548:
URL: https://github.com/apache/kafka/pull/12548#discussion_r952117995


##########
clients/src/main/java/org/apache/kafka/clients/admin/QuorumInfo.java:
##########
@@ -63,20 +76,26 @@ public boolean equals(Object o) {
         if (this == o) return true;
         if (o == null || getClass() != o.getClass()) return false;
         QuorumInfo that = (QuorumInfo) o;
-        return leaderId.equals(that.leaderId)
-            && voters.equals(that.voters)
-            && observers.equals(that.observers);
+        return leaderId == that.leaderId
+            && leaderEpoch == that.leaderEpoch
+            && highWatermark == that.highWatermark
+            && Objects.equals(highWatermarkUpdateTimeMs, 
that.highWatermarkUpdateTimeMs)
+            && Objects.equals(voters, that.voters)
+            && Objects.equals(observers, that.observers);

Review Comment:
   nice catch!



##########
raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java:
##########
@@ -302,21 +302,112 @@ public void 
testGetNonLeaderFollowersByFetchOffsetDescending() {
     }
 
     @Test
-    public void testGetVoterStates() {
-        int node1 = 1;
-        int node2 = 2;
+    public void testDescribeQuorumWithSingleVoter() {
+        MockTime time = new MockTime();
         long leaderStartOffset = 10L;
         long leaderEndOffset = 15L;
 
-        LeaderState<?> state = setUpLeaderAndFollowers(node1, node2, 
leaderStartOffset, leaderEndOffset);
+        LeaderState<?> state = newLeaderState(mkSet(localId), 
leaderStartOffset);
+
+        // Until we have updated local state, high watermark should be 
uninitialized
+        assertEquals(Optional.empty(), state.highWatermark());
+        DescribeQuorumResponseData.PartitionData partitionData = 
state.describeQuorum(time.milliseconds());
+        assertEquals(-1, partitionData.highWatermark());
+        assertEquals(-1, partitionData.highWatermarkUpdateTimeMs());
+        assertEquals(localId, partitionData.leaderId());
+        assertEquals(epoch, partitionData.leaderEpoch());
+        assertEquals(Collections.emptyList(), partitionData.observers());
+        assertEquals(1, partitionData.currentVoters().size());
+        assertEquals(new DescribeQuorumResponseData.ReplicaState()
+                .setReplicaId(localId)
+                .setLogEndOffset(-1)
+                .setLastFetchTimestamp(time.milliseconds())
+                .setLastCaughtUpTimestamp(time.milliseconds()),
+            partitionData.currentVoters().get(0));
+
+
+        // Now update the high watermark and verify that describe output
+        long highWatermarkUpdateTimeMs = time.milliseconds();
+        assertTrue(state.updateLocalState(highWatermarkUpdateTimeMs, new 
LogOffsetMetadata(leaderEndOffset)));
+        assertEquals(Optional.of(new LogOffsetMetadata(leaderEndOffset)), 
state.highWatermark());
+
+        time.sleep(500);
+
+        partitionData = state.describeQuorum(time.milliseconds());
+        assertEquals(leaderEndOffset, partitionData.highWatermark());
+        assertEquals(highWatermarkUpdateTimeMs, 
partitionData.highWatermarkUpdateTimeMs());
+        assertEquals(localId, partitionData.leaderId());
+        assertEquals(epoch, partitionData.leaderEpoch());
+        assertEquals(Collections.emptyList(), partitionData.observers());
+        assertEquals(1, partitionData.currentVoters().size());
+        assertEquals(new DescribeQuorumResponseData.ReplicaState()
+                .setReplicaId(localId)
+                .setLogEndOffset(leaderEndOffset)
+                .setLastFetchTimestamp(time.milliseconds())
+                .setLastCaughtUpTimestamp(time.milliseconds()),
+            partitionData.currentVoters().get(0));
+    }
+
+    @Test
+    public void testDescribeQuorumWithMultipleVoters() {
+        MockTime time = new MockTime();
+        int activeFollowerId = 1;
+        int inactiveFollowerId = 2;
+        long leaderStartOffset = 10L;
+        long leaderEndOffset = 15L;
 
-        assertEquals(mkMap(
-            mkEntry(localId, leaderEndOffset),
-            mkEntry(node1, leaderStartOffset),
-            mkEntry(node2, leaderEndOffset)
-        ), state.quorumResponseVoterStates(0)
-            .stream()
-            
.collect(Collectors.toMap(DescribeQuorumResponseData.ReplicaState::replicaId, 
DescribeQuorumResponseData.ReplicaState::logEndOffset)));
+        LeaderState<?> state = newLeaderState(mkSet(localId, activeFollowerId, 
inactiveFollowerId), leaderStartOffset);
+        assertFalse(state.updateLocalState(time.milliseconds(), new 
LogOffsetMetadata(leaderEndOffset)));
+        assertEquals(Optional.empty(), state.highWatermark());
+
+        long activeFollowerFetchTimeMs = time.milliseconds();
+        assertTrue(state.updateReplicaState(activeFollowerId, 
activeFollowerFetchTimeMs, new LogOffsetMetadata(leaderEndOffset)));
+        assertEquals(Optional.of(new LogOffsetMetadata(leaderEndOffset)), 
state.highWatermark());
+
+        time.sleep(500);
+
+        DescribeQuorumResponseData.PartitionData partitionData = 
state.describeQuorum(time.milliseconds());
+        assertEquals(leaderEndOffset, partitionData.highWatermark());
+        assertEquals(activeFollowerFetchTimeMs, 
partitionData.highWatermarkUpdateTimeMs());
+        assertEquals(localId, partitionData.leaderId());
+        assertEquals(epoch, partitionData.leaderEpoch());
+        assertEquals(Collections.emptyList(), partitionData.observers());
+
+        List<DescribeQuorumResponseData.ReplicaState> voterStates = 
partitionData.currentVoters();
+        assertEquals(3, voterStates.size());
+
+        DescribeQuorumResponseData.ReplicaState leaderState = 
voterStates.stream()
+            .filter(voterState -> voterState.replicaId() == localId)
+            .findFirst()
+            .orElseThrow(() -> new AssertionError(""));

Review Comment:
   Forgot to add the error message here and below.



##########
clients/src/main/java/org/apache/kafka/clients/admin/QuorumInfo.java:
##########
@@ -24,32 +24,45 @@
  * This class is used to describe the state of the quorum received in 
DescribeQuorumResponse.
  */
 public class QuorumInfo {
-    private final Integer leaderId;
-    private final Integer leaderEpoch;
-    private final Long highWatermark;
+    private final int leaderId;
+    private final long leaderEpoch;

Review Comment:
   The `leaderEpoch` is in int32 type, any reason we change to use `long` here?
   
   
https://github.com/apache/kafka/blob/add4ca6c7f1b289477aa7d6e918f5d22b78088fe/clients/src/main/resources/common/message/DescribeQuorumResponse.json#L37-L38



##########
raft/src/main/java/org/apache/kafka/raft/LeaderState.java:
##########
@@ -304,7 +311,7 @@ public long epochStartOffset() {
         return epochStartOffset;
     }
 
-    private ReplicaState getReplicaState(int remoteNodeId) {
+    private ReplicaState getOrCreateReplicaState(int remoteNodeId) {

Review Comment:
   nice renaming



##########
core/src/main/scala/kafka/admin/MetadataQuorumCommand.scala:
##########
@@ -158,14 +158,15 @@ object MetadataQuorumCommand {
         -1
       }
     println(
-      s"""|ClusterId:              $clusterId
-          |LeaderId:               ${quorumInfo.leaderId}
-          |LeaderEpoch:            ${quorumInfo.leaderEpoch}
-          |HighWatermark:          ${quorumInfo.highWatermark}
-          |MaxFollowerLag:         $maxFollowerLag
-          |MaxFollowerLagTimeMs:   $maxFollowerLagTimeMs
-          |CurrentVoters:          
${quorumInfo.voters.asScala.map(_.replicaId).mkString("[", ",", "]")}
-          |CurrentObservers:       
${quorumInfo.observers.asScala.map(_.replicaId).mkString("[", ",", "]")}
+      s"""|ClusterId:                  $clusterId
+          |LeaderId:                   ${quorumInfo.leaderId}
+          |LeaderEpoch:                ${quorumInfo.leaderEpoch}
+          |HighWatermark:              ${quorumInfo.highWatermark}
+          |HighWatermarkUpdateTimeMs:  ${quorumInfo.highWatermarkUpdateTimeMs}

Review Comment:
   Since this is a script output, I think it'd better we did some 
pre-processing to avoid printing out something like: `OptionalLong[12345]` or 
`OptionalLong.empty`. 
   



##########
raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java:
##########
@@ -326,35 +417,57 @@ private LeaderState<?> setUpLeaderAndFollowers(int 
follower1,
         LeaderState<?> state = newLeaderState(mkSet(localId, follower1, 
follower2), leaderStartOffset);
         state.updateLocalState(0, new LogOffsetMetadata(leaderEndOffset));
         assertEquals(Optional.empty(), state.highWatermark());
-        state.updateReplicaState(follower1, 0, new 
LogOffsetMetadata(leaderStartOffset), leaderEndOffset);
-        state.updateReplicaState(follower2, 0, new 
LogOffsetMetadata(leaderEndOffset), leaderEndOffset);
+        state.updateReplicaState(follower1, 0, new 
LogOffsetMetadata(leaderStartOffset));
+        state.updateReplicaState(follower2, 0, new 
LogOffsetMetadata(leaderEndOffset));
         return state;
     }
 
     @Test
-    public void testGetObserverStatesWithObserver() {
+    public void testDescribeQuorumWithObservers() {
+        MockTime time = new MockTime();
         int observerId = 10;
         long epochStartOffset = 10L;
 
         LeaderState<?> state = newLeaderState(mkSet(localId), 
epochStartOffset);
-        long timestamp = 20L;
-        assertFalse(state.updateReplicaState(observerId, timestamp, new 
LogOffsetMetadata(epochStartOffset), epochStartOffset + 10));
-
-        assertEquals(Collections.singletonMap(observerId, epochStartOffset),
-                state.quorumResponseObserverStates(timestamp)
-                    .stream()
-                    
.collect(Collectors.toMap(DescribeQuorumResponseData.ReplicaState::replicaId, 
DescribeQuorumResponseData.ReplicaState::logEndOffset)));
+        long highWatermarkUpdateTime = time.milliseconds();
+        assertTrue(state.updateLocalState(time.milliseconds(), new 
LogOffsetMetadata(epochStartOffset + 1)));
+        assertEquals(Optional.of(new LogOffsetMetadata(epochStartOffset + 1)), 
state.highWatermark());
+
+        time.sleep(500);
+        long observerFetchTimeMs = time.milliseconds();
+        assertFalse(state.updateReplicaState(observerId, observerFetchTimeMs, 
new LogOffsetMetadata(epochStartOffset + 1)));
+
+        time.sleep(500);
+        DescribeQuorumResponseData.PartitionData partitionData = 
state.describeQuorum(time.milliseconds());
+        assertEquals(epochStartOffset + 1, partitionData.highWatermark());
+        assertEquals(highWatermarkUpdateTime, 
partitionData.highWatermarkUpdateTimeMs());
+        assertEquals(localId, partitionData.leaderId());
+        assertEquals(epoch, partitionData.leaderEpoch());
+
+        List<DescribeQuorumResponseData.ReplicaState> observerStates = 
partitionData.observers();
+        assertEquals(1, observerStates.size());

Review Comment:
   Should we also verify `voters` here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to