jsancio commented on code in PR #16079:
URL: https://github.com/apache/kafka/pull/16079#discussion_r1624780684


##########
raft/src/main/java/org/apache/kafka/raft/LeaderState.java:
##########
@@ -341,9 +346,18 @@ public boolean updateReplicaState(
                     state.nodeId, currentEndOffset.offset, 
fetchOffsetMetadata.offset);
             }
         });
-
-        Optional<LogOffsetMetadata> leaderEndOffsetOpt =
-            voterStates.get(localId).endOffset;
+        Optional<LogOffsetMetadata> leaderEndOffsetOpt;
+        ReplicaState leaderVoterState = voterStates.get(localId);
+        ReplicaState leaderObserverState = observerStates.get(localId);
+        if (leaderVoterState != null) {
+            leaderEndOffsetOpt = leaderVoterState.endOffset;
+        } else if (leaderObserverState != null) {
+            // The leader is not guaranteed to be in the voter set when in the 
process of being removed from the quorum.
+            log.info("Updating end offset for leader {} which is also an 
observer.", localId);
+            leaderEndOffsetOpt = leaderObserverState.endOffset;
+        } else {
+            throw new IllegalStateException("Leader state not found for 
localId " + localId);
+        }

Review Comment:
   In practice isn't this the same as `getOrCreateReplicaState` so we should 
just use that function here.



##########
raft/src/main/java/org/apache/kafka/raft/LeaderState.java:
##########
@@ -341,9 +346,18 @@ public boolean updateReplicaState(
                     state.nodeId, currentEndOffset.offset, 
fetchOffsetMetadata.offset);
             }
         });
-
-        Optional<LogOffsetMetadata> leaderEndOffsetOpt =
-            voterStates.get(localId).endOffset;
+        Optional<LogOffsetMetadata> leaderEndOffsetOpt;

Review Comment:
   If you move this right before the `if` statement and mark it as final it 
makes it more obvious that this variable can take two different values.



##########
raft/src/main/java/org/apache/kafka/raft/LeaderState.java:
##########
@@ -445,6 +463,27 @@ private boolean isVoter(int remoteNodeId) {
         return voterStates.containsKey(remoteNodeId);
     }
 
+    // with Jose's changes this will probably make more sense as VoterSet
+    private void updateVoterSet(Set<Integer> lastVoterSet) {
+        // Remove any voter state that is not in the last voter set. They 
become observers.
+        for (Iterator<Map.Entry<Integer, ReplicaState>> iter = 
voterStates.entrySet().iterator(); iter.hasNext(); ) {
+            Integer nodeId = iter.next().getKey();
+            if (!lastVoterSet.contains(nodeId)) {
+                createObserverState(nodeId);
+                iter.remove();
+            }
+        }
+
+        // Add any voter state that is in the last voter set but not in the 
current voter set. They are removed from
+        // the observerStates if they exist.
+        for (int voterId : lastVoterSet) {
+            if (!voterStates.containsKey(voterId)) {
+                voterStates.put(voterId, new ReplicaState(voterId, false));
+                observerStates.remove(voterId);

Review Comment:
   This comment applies to this code block and the one above.
   
   When moving a replica state from voter to observer and vice versa, we 
shouldn't create a new replica state but instead reuse the replica state for 
the previous hash map.



##########
raft/src/main/java/org/apache/kafka/raft/LeaderState.java:
##########
@@ -445,6 +463,27 @@ private boolean isVoter(int remoteNodeId) {
         return voterStates.containsKey(remoteNodeId);
     }
 
+    // with Jose's changes this will probably make more sense as VoterSet
+    private void updateVoterSet(Set<Integer> lastVoterSet) {
+        // Remove any voter state that is not in the last voter set. They 
become observers.
+        for (Iterator<Map.Entry<Integer, ReplicaState>> iter = 
voterStates.entrySet().iterator(); iter.hasNext(); ) {
+            Integer nodeId = iter.next().getKey();
+            if (!lastVoterSet.contains(nodeId)) {
+                createObserverState(nodeId);

Review Comment:
   The code in `clearInactiveObservers` removes any `ReplicaState` in 
`observerStates` that hasn't been updated after some time. We should make sure 
that the local replica (the leader) is not remove from `observerStates` if it 
exist.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to