ppatierno commented on code in PR #20859:
URL: https://github.com/apache/kafka/pull/20859#discussion_r2533364658


##########
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##########
@@ -501,6 +503,11 @@ public void initialize(
         logger.info("Reading KRaft snapshot and log as part of the 
initialization");
         partitionState.updateState();
         logger.info("Starting voters are {}", partitionState.lastVoterSet());
+        if (nodeId.isPresent()) {
+            // if starting voters contain node id, it can't join to the cluster
+            // because it already in.

Review Comment:
   ```suggestion
               // because it is already in.
   ```



##########
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##########
@@ -3356,30 +3366,51 @@ private boolean 
shouldSendAddOrRemoveVoterRequest(FollowerState state, long curr
             quorumConfig.autoJoin() && 
state.hasUpdateVoterSetPeriodExpired(currentTimeMs);
     }
 
+    private boolean shouldSendAddVoterRequest(FollowerState state, long 
currentTimeMs) {
+        return canJoin && autoJoinEnable(state, currentTimeMs);
+    }
+
+    private boolean shouldSendRemoveVoterRequest(FollowerState state, long 
currentTimeMs) {
+        final var localReplicaKey = quorum.localReplicaKeyOrThrow();
+        final var voters = partitionState.lastVoterSet();
+
+        if (voters.voterIds().contains(localReplicaKey.id())) {
+            if (autoJoinEnable(state, currentTimeMs)) {
+                canJoin = true;
+                return true;
+            }
+        }
+        return false;
+    }
+
+
     private long pollFollowerAsObserver(FollowerState state, long 
currentTimeMs) {
         GracefulShutdown shutdown = this.shutdown.get();
+        final RequestSendResult sendResult;
+
         if (shutdown != null) {
             // If we are an observer, then we can shutdown immediately. We 
want to
             // skip potentially sending any add or remove voter RPCs.
             return 0;
-        } else if (shouldSendAddOrRemoveVoterRequest(state, currentTimeMs)) {
+        } else if (nodeId().isPresent() && shouldSendRemoveVoterRequest(state, 
currentTimeMs)) {

Review Comment:
   Why are we using the accessor method `nodeId()` instead of the member 
variable directly here, as in most of the other places? Same comment for the 
change on line 3417.



##########
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##########
@@ -3347,7 +3357,7 @@ private long pollFollowerAsVoter(FollowerState state, 
long currentTimeMs) {
         );
     }
 
-    private boolean shouldSendAddOrRemoveVoterRequest(FollowerState state, 
long currentTimeMs) {
+    private boolean maybeSendAutoJoinRequest(FollowerState state, long 
currentTimeMs) {

Review Comment:
   isn't the method name a little bit misleading? It let me think that it's 
going to be a "AutoJoin" request (at protocol/API level happening) while it's 
not the case. Maybe we could name it just as `maybeAutoJoin`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to